1 /*****************************************************************************\
2  *  burst_buffer_datawarp.c - Plugin for managing a Cray DataWarp burst_buffer
3  *****************************************************************************
4  *  Copyright (C) 2014-2018 SchedMD LLC.
5  *  Written by Morris Jette <jette@schedmd.com>
6  *
7  *  This file is part of Slurm, a resource management program.
8  *  For details, see <https://slurm.schedmd.com/>.
9  *  Please also read the included file: DISCLAIMER.
10  *
11  *  Slurm is free software; you can redistribute it and/or modify it under
12  *  the terms of the GNU General Public License as published by the Free
13  *  Software Foundation; either version 2 of the License, or (at your option)
14  *  any later version.
15  *
16  *  In addition, as a special exception, the copyright holders give permission
17  *  to link the code of portions of this program with the OpenSSL library under
18  *  certain conditions as described in each individual source file, and
19  *  distribute linked combinations including the two. You must obey the GNU
20  *  General Public License in all respects for all of the code used other than
21  *  OpenSSL. If you modify file(s) with this exception, you may extend this
22  *  exception to your version of the file(s), but you are not obligated to do
23  *  so. If you do not wish to do so, delete this exception statement from your
24  *  version.  If you delete this exception statement from all source files in
25  *  the program, then also delete it here.
26  *
27  *  Slurm is distributed in the hope that it will be useful, but WITHOUT ANY
28  *  WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
29  *  FOR A PARTICULAR PURPOSE.  See the GNU General Public License for more
30  *  details.
31  *
32  *  You should have received a copy of the GNU General Public License along
33  *  with Slurm; if not, write to the Free Software Foundation, Inc.,
34  *  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301  USA.
35 \*****************************************************************************/
36 
37 #include "config.h"
38 
39 #define _GNU_SOURCE	/* For POLLRDHUP */
40 #include <ctype.h>
41 #include <poll.h>
42 #include <stdlib.h>
43 #include <unistd.h>
44 #include <sys/stat.h>
45 #include <sys/types.h>
46 
47 #if HAVE_JSON_C_INC
48 #  include <json-c/json.h>
49 #elif HAVE_JSON_INC
50 #  include <json/json.h>
51 #endif
52 
53 #include "slurm/slurm.h"
54 
55 #include "src/common/assoc_mgr.h"
56 #include "src/common/bitstring.h"
57 #include "src/common/fd.h"
58 #include "src/common/list.h"
59 #include "src/common/macros.h"
60 #include "src/common/pack.h"
61 #include "src/common/parse_config.h"
62 #include "src/common/run_command.h"
63 #include "src/common/slurm_protocol_api.h"
64 #include "src/common/slurm_protocol_defs.h"
65 #include "src/common/timers.h"
66 #include "src/common/uid.h"
67 #include "src/common/xmalloc.h"
68 #include "src/common/xstring.h"
69 #include "src/slurmctld/agent.h"
70 #include "src/slurmctld/job_scheduler.h"
71 #include "src/slurmctld/locks.h"
72 #include "src/slurmctld/node_scheduler.h"
73 #include "src/slurmctld/reservation.h"
74 #include "src/slurmctld/slurmctld.h"
75 #include "src/slurmctld/state_save.h"
76 #include "src/slurmctld/trigger_mgr.h"
77 #include "src/plugins/burst_buffer/common/burst_buffer_common.h"
78 
79 #define _DEBUG 0	/* Detailed debugging information */
80 #define TIME_SLOP 60	/* Time allowed to synchronize operations between
81 			 * threads */
82 #define MAX_RETRY_CNT 2	/* Hold job if "pre_run" operation fails more than
83 			 * 2 times */
84 
85 /* Script line types */
86 #define LINE_OTHER 0
87 #define LINE_BB    1
88 #define LINE_DW    2
89 
90 /*
91  * These variables are required by the burst buffer plugin interface.  If they
92  * are not found in the plugin, the plugin loader will ignore it.
93  *
94  * plugin_name - a string giving a human-readable description of the
95  * plugin.  There is no maximum length, but the symbol must refer to
96  * a valid string.
97  *
98  * plugin_type - a string suggesting the type of the plugin or its
99  * applicability to a particular form of data or method of data handling.
100  * If the low-level plugin API is used, the contents of this string are
101  * unimportant and may be anything.  Slurm uses the higher-level plugin
102  * interface which requires this string to be of the form
103  *
104  *      <application>/<method>
105  *
106  * where <application> is a description of the intended application of
107  * the plugin (e.g., "burst_buffer" for Slurm burst_buffer) and <method> is a
108  * description of how this plugin satisfies that application.  Slurm will only
109  * load a burst_buffer plugin if the plugin_type string has a prefix of
110  * "burst_buffer/".
111  *
112  * plugin_version - an unsigned 32-bit integer containing the Slurm version
113  * (major.minor.micro combined into a single number).
114  */
115 const char plugin_name[]        = "burst_buffer datawarp plugin";
116 const char plugin_type[]        = "burst_buffer/datawarp";
117 const uint32_t plugin_version   = SLURM_VERSION_NUMBER;
118 
119 /* Most state information is in a common structure so that we can more
120  * easily use common functions from multiple burst buffer plugins */
121 static bb_state_t	bb_state;
122 static uint32_t		last_persistent_id = 1;
123 static char *		state_save_loc = NULL;
124 
125 /* These are defined here so when we link with something other than
126  * the slurmctld we will have these symbols defined.  They will get
127  * overwritten when linking with the slurmctld.
128  */
129 #if defined (__APPLE__)
130 extern uint16_t accounting_enforce __attribute__((weak_import));
131 extern void *acct_db_conn  __attribute__((weak_import));
132 #else
133 uint16_t accounting_enforce = 0;
134 void *acct_db_conn = NULL;
135 #endif
136 
137 
138 /* Description of each Cray DW configuration entry
139  */
140 typedef struct bb_configs {
141 	uint32_t id;
142 	uint32_t instance;
143 } bb_configs_t;
144 
145 /* Description of each Cray DW instance entry, including persistent buffers
146  */
147 typedef struct bb_instances {
148 	uint32_t id;
149 	uint64_t bytes;
150 	uint32_t session;
151 } bb_instances_t;
152 
153 /* Description of each Cray DW pool entry
154  */
155 typedef struct bb_pools {
156 	char *id;
157 	char *units;
158 	uint64_t granularity;
159 	uint64_t quantity;
160 	uint64_t free;
161 } bb_pools_t;
162 
163 /* Description of each Cray DW pool entry
164  */
165 typedef struct bb_sessions {
166 	uint32_t created;
167 	uint32_t id;
168 	char    *token;
169 	bool     used;
170 	uint32_t user_id;
171 } bb_sessions_t;
172 
173 typedef struct {
174 	char   **args;
175 	uint32_t job_id;
176 	uint32_t timeout;
177 	uint32_t user_id;
178 } pre_run_args_t;
179 
180 typedef struct {
181 	char   **args1;
182 	char   **args2;
183 	uint64_t bb_size;
184 	uint32_t job_id;
185 	char    *pool;
186 	uint32_t user_id;
187 } stage_args_t;
188 
189 typedef struct create_buf_data {
190 	char *access;		/* Access mode */
191 	bool hurry;		/* Set to destroy in a hurry (no stage-out) */
192 	uint32_t job_id;	/* Job ID to use */
193 	char *job_script;	/* Path to job script */
194 	char *name;		/* Name of the persistent burst buffer */
195 	char *pool;		/* Name of pool in which to create the buffer */
196 	uint64_t size;		/* Size in bytes */
197 	char *type;		/* Access type */
198 	uint32_t user_id;
199 } create_buf_data_t;
200 
201 #define BB_UNITS_BYTES 1
202 struct bb_total_size {
203 	int units;
204 	uint64_t capacity;
205 };
206 
207 static void	_add_bb_to_script(char **script_body,
208 				  const char *burst_buffer_file);
209 static int	_alloc_job_bb(job_record_t *job_ptr, bb_job_t *bb_job,
210 			      bool job_ready);
211 static void	_apply_limits(void);
212 static void *	_bb_agent(void *args);
213 static void	_bb_free_configs(bb_configs_t *ents, int num_ent);
214 static void	_bb_free_instances(bb_instances_t *ents, int num_ent);
215 static void	_bb_free_pools(bb_pools_t *ents, int num_ent);
216 static void	_bb_free_sessions(bb_sessions_t *ents, int num_ent);
217 static bb_configs_t *_bb_get_configs(int *num_ent, bb_state_t *state_ptr,
218 				     uint32_t timeout);
219 static bb_instances_t *_bb_get_instances(int *num_ent, bb_state_t *state_ptr,
220 					 uint32_t timeout);
221 static bb_pools_t *_bb_get_pools(int *num_ent, bb_state_t *state_ptr,
222 				 uint32_t timeout);
223 static bb_sessions_t *_bb_get_sessions(int *num_ent, bb_state_t *state_ptr,
224 				       uint32_t timeout);
225 static int	_build_bb_script(job_record_t *job_ptr, char *script_file);
226 static int	_create_bufs(job_record_t *job_ptr, bb_job_t *bb_job,
227 			     bool job_ready);
228 static void *	_create_persistent(void *x);
229 static void *	_destroy_persistent(void *x);
230 static void	_free_create_args(create_buf_data_t *create_args);
231 static bb_job_t *_get_bb_job(job_record_t *job_ptr);
232 static bool	_have_dw_cmd_opts(bb_job_t *bb_job);
233 static void	_job_queue_del(void *x);
234 static bb_configs_t *_json_parse_configs_array(json_object *jobj, char *key,
235 					       int *num);
236 static bb_instances_t *_json_parse_instances_array(json_object *jobj, char *key,
237 						   int *num);
238 static struct bb_pools *_json_parse_pools_array(json_object *jobj, char *key,
239 						int *num);
240 static struct bb_sessions *_json_parse_sessions_array(json_object *jobj,
241 						      char *key, int *num);
242 static void	_json_parse_configs_object(json_object *jobj,
243 					   bb_configs_t *ent);
244 static void	_json_parse_instances_object(json_object *jobj,
245 					     bb_instances_t *ent);
246 static void	_json_parse_pools_object(json_object *jobj, bb_pools_t *ent);
247 static void	_json_parse_sessions_object(json_object *jobj,
248 					    bb_sessions_t *ent);
249 static struct bb_total_size *_json_parse_real_size(json_object *j);
250 static void	_log_script_argv(char **script_argv, char *resp_msg);
251 static void	_load_state(bool init_config);
252 static int	_open_part_state_file(char **state_file);
253 static int	_parse_bb_opts(job_desc_msg_t *job_desc, uint64_t *bb_size,
254 			       uid_t submit_uid);
255 static void	_parse_config_links(json_object *instance, bb_configs_t *ent);
256 static void	_parse_instance_capacity(json_object *instance,
257 					 bb_instances_t *ent);
258 static void	_parse_instance_links(json_object *instance,
259 				      bb_instances_t *ent);
260 static void	_pick_alloc_account(bb_alloc_t *bb_alloc);
261 static void	_purge_bb_files(uint32_t job_id, job_record_t *job_ptr);
262 static void	_purge_vestigial_bufs(void);
263 static void	_python2json(char *buf);
264 static void	_recover_bb_state(void);
265 static int	_queue_stage_in(job_record_t *job_ptr, bb_job_t *bb_job);
266 static int	_queue_stage_out(job_record_t *job_ptr, bb_job_t *bb_job);
267 static void	_queue_teardown(uint32_t job_id, uint32_t user_id, bool hurry);
268 static void	_reset_buf_state(uint32_t user_id, uint32_t job_id, char *name,
269 				 int new_state, uint64_t buf_size);
270 static void	_save_bb_state(void);
271 static void	_set_assoc_mgr_ptrs(bb_alloc_t *bb_alloc);
272 static void *	_start_pre_run(void *x);
273 static void *	_start_stage_in(void *x);
274 static void *	_start_stage_out(void *x);
275 static void *	_start_teardown(void *x);
276 static void	_test_config(void);
277 static bool	_test_persistent_use_ready(bb_job_t *bb_job,
278 					   job_record_t *job_ptr);
279 static int	_test_size_limit(job_record_t *job_ptr, bb_job_t *bb_job);
280 static void	_timeout_bb_rec(void);
281 static int	_write_file(char *file_name, char *buf);
282 static int	_write_nid_file(char *file_name, char *node_list,
283 				job_record_t *job_ptr);
284 static int	_xlate_batch(job_desc_msg_t *job_desc);
285 static int	_xlate_interactive(job_desc_msg_t *job_desc);
286 
287 /* Convert a Python string to real JSON format. Specifically replace single
288  * quotes with double quotes and strip leading "u" before the single quotes.
289  * See: https://github.com/stedolan/jq/issues/312 */
_python2json(char * buf)290 static void _python2json(char *buf)
291 {
292 	bool quoted = false;
293 	int i, o;
294 
295 	if (!buf)
296 		return;
297 	for (i = 0, o = 0; ; i++) {
298 		if (buf[i] == '\'') {
299 			buf[o++] = '\"';
300 			quoted = !quoted;
301 		} else if ((buf[i] == 'u') && (buf[i+1] == '\'') && !quoted) {
302 			/* Skip over unicode flag */
303 		} else {
304 			buf[o++] = buf[i];
305 			if (buf[i] == '\0')
306 				break;
307 		}
308 	}
309 }
310 
311 /* Log a command's arguments. */
_log_script_argv(char ** script_argv,char * resp_msg)312 static void _log_script_argv(char **script_argv, char *resp_msg)
313 {
314 	char *cmd_line = NULL;
315 	int i;
316 
317 	if (!bb_state.bb_config.debug_flag)
318 		return;
319 
320 	for (i = 0; script_argv[i]; i++) {
321 		if (i)
322 			xstrcat(cmd_line, " ");
323 		xstrcat(cmd_line, script_argv[i]);
324 	}
325 	info("%s", cmd_line);
326 	if (resp_msg && resp_msg[0])
327 		info("%s", resp_msg);
328 	xfree(cmd_line);
329 }
330 
_job_queue_del(void * x)331 static void _job_queue_del(void *x)
332 {
333 	bb_job_queue_rec_t *job_rec = (bb_job_queue_rec_t *) x;
334 	if (job_rec) {
335 		xfree(job_rec);
336 	}
337 }
338 
339 /* Purge files we have created for the job.
340  * bb_state.bb_mutex is locked on function entry.
341  * job_ptr may be NULL if not found */
_purge_bb_files(uint32_t job_id,job_record_t * job_ptr)342 static void _purge_bb_files(uint32_t job_id, job_record_t *job_ptr)
343 
344 {
345 	char *hash_dir = NULL, *job_dir = NULL;
346 	char *script_file = NULL, *path_file = NULL, *client_nids_file = NULL;
347 	char *exec_host_file = NULL;
348 	int hash_inx;
349 
350 	hash_inx = job_id % 10;
351 	xstrfmtcat(hash_dir, "%s/hash.%d", state_save_loc, hash_inx);
352 	(void) mkdir(hash_dir, 0700);
353 	xstrfmtcat(job_dir, "%s/job.%u", hash_dir, job_id);
354 	(void) mkdir(job_dir, 0700);
355 
356 	xstrfmtcat(client_nids_file, "%s/client_nids", job_dir);
357 	(void) unlink(client_nids_file);
358 	xfree(client_nids_file);
359 
360 	xstrfmtcat(exec_host_file, "%s/exec_host", job_dir);
361 	(void) unlink(exec_host_file);
362 	xfree(exec_host_file);
363 
364 	xstrfmtcat(path_file, "%s/pathfile", job_dir);
365 	(void) unlink(path_file);
366 	xfree(path_file);
367 
368 	if (!job_ptr || (job_ptr->batch_flag == 0)) {
369 		xstrfmtcat(script_file, "%s/script", job_dir);
370 		(void) unlink(script_file);
371 		xfree(script_file);
372 	}
373 
374 	(void) unlink(job_dir);
375 	xfree(job_dir);
376 	xfree(hash_dir);
377 }
378 
379 /* Validate that our configuration is valid for this plugin type */
_test_config(void)380 static void _test_config(void)
381 {
382 	if (!bb_state.bb_config.get_sys_state) {
383 		debug("%s: %s: GetSysState is NULL", plugin_type, __func__);
384 		bb_state.bb_config.get_sys_state =
385 			xstrdup("/opt/cray/dw_wlm/default/bin/dw_wlm_cli");
386 	}
387 	if (!bb_state.bb_config.get_sys_status) {
388 		debug("%s: %s: GetSysStatus is NULL", plugin_type, __func__);
389 		bb_state.bb_config.get_sys_status =
390 			xstrdup("/opt/cray/dws/default/bin/dwstat");
391 	}
392 }
393 
394 /* Allocate resources to a job and begin setup/stage-in */
_alloc_job_bb(job_record_t * job_ptr,bb_job_t * bb_job,bool job_ready)395 static int _alloc_job_bb(job_record_t *job_ptr, bb_job_t *bb_job,
396 			 bool job_ready)
397 {
398 	int rc = SLURM_SUCCESS;
399 
400 	if (bb_state.bb_config.debug_flag) {
401 		info("%s: %s: start job allocate %pJ",
402 		     plugin_type, __func__, job_ptr);
403 	}
404 
405 	if (bb_job->buf_cnt &&
406 	    (_create_bufs(job_ptr, bb_job, job_ready) > 0))
407 		return EAGAIN;
408 
409 	if (bb_job->state < BB_STATE_STAGING_IN) {
410 		bb_job->state = BB_STATE_STAGING_IN;
411 		rc = _queue_stage_in(job_ptr, bb_job);
412 		if (rc != SLURM_SUCCESS) {
413 			bb_job->state = BB_STATE_TEARDOWN;
414 			_queue_teardown(job_ptr->job_id, job_ptr->user_id,
415 					true);
416 		}
417 	}
418 
419 	return rc;
420 }
421 
422 /* Perform periodic background activities */
_bb_agent(void * args)423 static void *_bb_agent(void *args)
424 {
425 	/* Locks: write job */
426 	slurmctld_lock_t job_write_lock = {
427 		NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
428 
429 	while (!bb_state.term_flag) {
430 		bb_sleep(&bb_state, AGENT_INTERVAL);
431 		if (!bb_state.term_flag) {
432 			_load_state(false);	/* Has own locking */
433 			lock_slurmctld(job_write_lock);
434 			slurm_mutex_lock(&bb_state.bb_mutex);
435 			_timeout_bb_rec();
436 			slurm_mutex_unlock(&bb_state.bb_mutex);
437 			unlock_slurmctld(job_write_lock);
438 		}
439 		_save_bb_state();	/* Has own locks excluding file write */
440 	}
441 
442 	return NULL;
443 }
444 
445 /* Given a request size and a pool name (or NULL name for default pool),
446  * return the required buffer size (rounded up by granularity) */
_set_granularity(uint64_t orig_size,char * bb_pool)447 static uint64_t _set_granularity(uint64_t orig_size, char *bb_pool)
448 {
449 	burst_buffer_pool_t *pool_ptr;
450 	uint64_t new_size;
451 	int i;
452 
453 	if (!bb_pool || !xstrcmp(bb_pool, bb_state.bb_config.default_pool)) {
454 		new_size = bb_granularity(orig_size,
455 					  bb_state.bb_config.granularity);
456 		return new_size;
457 	}
458 
459 	for (i = 0, pool_ptr = bb_state.bb_config.pool_ptr;
460 	     i < bb_state.bb_config.pool_cnt; i++, pool_ptr++) {
461 		if (!xstrcmp(bb_pool, pool_ptr->name)) {
462 			new_size = bb_granularity(orig_size,
463 						  pool_ptr->granularity);
464 			return new_size;
465 		}
466 	}
467 	debug("Could not find pool %s", bb_pool);
468 	return orig_size;
469 }
470 
471 /* Return the burst buffer size specification of a job
472  * RET size data structure or NULL of none found
473  * NOTE: delete return value using _del_bb_size() */
_get_bb_job(job_record_t * job_ptr)474 static bb_job_t *_get_bb_job(job_record_t *job_ptr)
475 {
476 	char *bb_specs, *bb_hurry, *bb_name, *bb_type, *bb_access, *bb_pool;
477 	char *end_ptr = NULL, *save_ptr = NULL, *sub_tok, *tok;
478 	bool have_bb = false;
479 	uint64_t tmp_cnt;
480 	int inx;
481 	bb_job_t *bb_job;
482 
483 	if ((job_ptr->burst_buffer == NULL) ||
484 	    (job_ptr->burst_buffer[0] == '\0'))
485 		return NULL;
486 
487 	if ((bb_job = bb_job_find(&bb_state, job_ptr->job_id)))
488 		return bb_job;	/* Cached data */
489 
490 	bb_job = bb_job_alloc(&bb_state, job_ptr->job_id);
491 	bb_job->account = xstrdup(job_ptr->account);
492 	if (job_ptr->part_ptr)
493 		bb_job->partition = xstrdup(job_ptr->part_ptr->name);
494 	if (job_ptr->qos_ptr)
495 		bb_job->qos = xstrdup(job_ptr->qos_ptr->name);
496 	bb_job->state = BB_STATE_PENDING;
497 	bb_job->user_id = job_ptr->user_id;
498 	bb_specs = xstrdup(job_ptr->burst_buffer);
499 	tok = strtok_r(bb_specs, "\n", &save_ptr);
500 	while (tok) {
501 		uint32_t bb_flag = 0;
502 		if (tok[0] != '#') {
503 			tok = strtok_r(NULL, "\n", &save_ptr);
504 			continue;
505 		}
506 		if ((tok[1] == 'B') && (tok[2] == 'B'))
507 			bb_flag = BB_FLAG_BB_OP;
508 		else if ((tok[1] == 'D') && (tok[2] == 'W'))
509 			bb_flag = BB_FLAG_DW_OP;
510 
511 		/*
512 		 * Effective Slurm v18.08 and CLE6.0UP06 the create_persistent
513 		 * and destroy_persistent functions are directly supported by
514 		 * dw_wlm_cli. Support "#BB" format for backward compatibility.
515 		 */
516 		if (bb_flag != 0) {
517 			tok += 3;
518 			while (isspace(tok[0]))
519 				tok++;
520 		}
521 
522 		/*
523 		 * Is % symbol replacement required? Only done on "#DW" / "#BB"
524 		 * lines.
525 		 */
526 		if (bb_flag && strchr(tok, (int) '%'))
527 			bb_job->need_symbol_replacement = true;
528 
529 		if (bb_flag == BB_FLAG_BB_OP) {
530 			if (!xstrncmp(tok, "create_persistent", 17)) {
531 				have_bb = true;
532 				bb_access = NULL;
533 				bb_name = NULL;
534 				bb_pool = NULL;
535 				bb_type = NULL;
536 				if ((sub_tok = strstr(tok, "access_mode="))) {
537 					bb_access = xstrdup(sub_tok + 12);
538 					sub_tok = strchr(bb_access, ' ');
539 					if (sub_tok)
540 						sub_tok[0] = '\0';
541 				} else if ((sub_tok = strstr(tok, "access="))) {
542 					bb_access = xstrdup(sub_tok + 7);
543 					sub_tok = strchr(bb_access, ' ');
544 					if (sub_tok)
545 						sub_tok[0] = '\0';
546 				}
547 				if ((sub_tok = strstr(tok, "capacity="))) {
548 					tmp_cnt = bb_get_size_num(sub_tok+9, 1);
549 				} else {
550 					tmp_cnt = 0;
551 				}
552 				if ((sub_tok = strstr(tok, "name="))) {
553 					bb_name = xstrdup(sub_tok + 5);
554 					sub_tok = strchr(bb_name, ' ');
555 					if (sub_tok)
556 						sub_tok[0] = '\0';
557 				}
558 				if ((sub_tok = strstr(tok, "pool="))) {
559 					bb_pool = xstrdup(sub_tok + 5);
560 					sub_tok = strchr(bb_pool, ' ');
561 					if (sub_tok)
562 						sub_tok[0] = '\0';
563 				} else {
564 					bb_pool = xstrdup(
565 						bb_state.bb_config.default_pool);
566 				}
567 				if ((sub_tok = strstr(tok, "type="))) {
568 					bb_type = xstrdup(sub_tok + 5);
569 					sub_tok = strchr(bb_type, ' ');
570 					if (sub_tok)
571 						sub_tok[0] = '\0';
572 				}
573 				inx = bb_job->buf_cnt++;
574 				bb_job->buf_ptr = xrealloc(bb_job->buf_ptr,
575 							   sizeof(bb_buf_t) *
576 							   bb_job->buf_cnt);
577 				bb_job->buf_ptr[inx].access = bb_access;
578 				bb_job->buf_ptr[inx].create = true;
579 				bb_job->buf_ptr[inx].flags = bb_flag;
580 				//bb_job->buf_ptr[inx].hurry = false;
581 				bb_job->buf_ptr[inx].name = bb_name;
582 				bb_job->buf_ptr[inx].pool = bb_pool;
583 				tmp_cnt = _set_granularity(tmp_cnt, bb_pool);
584 				bb_job->buf_ptr[inx].size = tmp_cnt;
585 				bb_job->buf_ptr[inx].state = BB_STATE_PENDING;
586 				bb_job->buf_ptr[inx].type = bb_type;
587 				//bb_job->buf_ptr[inx].use = false;
588 				bb_job->persist_add += tmp_cnt;
589 			} else if (!xstrncmp(tok, "destroy_persistent", 18)) {
590 				have_bb = true;
591 				bb_name = NULL;
592 				if ((sub_tok = strstr(tok, "name="))) {
593 					bb_name = xstrdup(sub_tok + 5);
594 					sub_tok = strchr(bb_name, ' ');
595 					if (sub_tok)
596 						sub_tok[0] = '\0';
597 				}
598 				/* if ((sub_tok = strstr(tok, "type="))) { */
599 				/* 	bb_type = xstrdup(sub_tok + 5); */
600 				/* 	sub_tok = strchr(bb_type, ' '); */
601 				/* 	if (sub_tok) */
602 				/* 		sub_tok[0] = '\0'; */
603 				/* } */
604 				bb_hurry = strstr(tok, "hurry");
605 				inx = bb_job->buf_cnt++;
606 				bb_job->buf_ptr = xrealloc(bb_job->buf_ptr,
607 							   sizeof(bb_buf_t) *
608 							   bb_job->buf_cnt);
609 				//bb_job->buf_ptr[inx].access = NULL;
610 				//bb_job->buf_ptr[inx].create = false;
611 				bb_job->buf_ptr[inx].destroy = true;
612 				bb_job->buf_ptr[inx].flags = bb_flag;
613 				bb_job->buf_ptr[inx].hurry = (bb_hurry != NULL);
614 				bb_job->buf_ptr[inx].name = bb_name;
615 				//bb_job->buf_ptr[inx].pool = NULL;
616 				//bb_job->buf_ptr[inx].size = 0;
617 				bb_job->buf_ptr[inx].state = BB_STATE_PENDING;
618 				//bb_job->buf_ptr[inx].type = NULL;
619 				//bb_job->buf_ptr[inx].use = false;
620 			} else {
621 				/* Ignore other (future) options */
622 			}
623 		}
624 		if (bb_flag == BB_FLAG_DW_OP) {
625 			if (!xstrncmp(tok, "jobdw", 5)) {
626 				have_bb = true;
627 				if ((sub_tok = strstr(tok, "capacity="))) {
628 					tmp_cnt = bb_get_size_num(sub_tok+9, 1);
629 				} else {
630 					tmp_cnt = 0;
631 				}
632 				if ((sub_tok = strstr(tok, "pool="))) {
633 					xfree(bb_job->job_pool);
634 					bb_job->job_pool = xstrdup(sub_tok + 5);
635 					sub_tok = strchr(bb_job->job_pool, ' ');
636 					if (sub_tok)
637 						sub_tok[0] = '\0';
638 				} else {
639 					bb_job->job_pool = xstrdup(
640 						bb_state.bb_config.default_pool);
641 				}
642 				tmp_cnt = _set_granularity(tmp_cnt,
643 							   bb_job->job_pool);
644 				bb_job->req_size += tmp_cnt;
645 				bb_job->total_size += tmp_cnt;
646 				bb_job->use_job_buf = true;
647 			} else if (!xstrncmp(tok, "persistentdw", 12)) {
648 				/* Persistent buffer use */
649 				have_bb = true;
650 				bb_name = NULL;
651 				if ((sub_tok = strstr(tok, "name="))) {
652 					bb_name = xstrdup(sub_tok + 5);
653 					sub_tok = strchr(bb_name, ' ');
654 					if (sub_tok)
655 						sub_tok[0] = '\0';
656 				}
657 				inx = bb_job->buf_cnt++;
658 				bb_job->buf_ptr = xrealloc(bb_job->buf_ptr,
659 							   sizeof(bb_buf_t) *
660 							   bb_job->buf_cnt);
661 				//bb_job->buf_ptr[inx].access = NULL;
662 				//bb_job->buf_ptr[inx].create = false;
663 				//bb_job->buf_ptr[inx].destroy = false;
664 				//bb_job->buf_ptr[inx].hurry = false;
665 				bb_job->buf_ptr[inx].name = bb_name;
666 				//bb_job->buf_ptr[inx].size = 0;
667 				bb_job->buf_ptr[inx].state = BB_STATE_PENDING;
668 				//bb_job->buf_ptr[inx].type = NULL;
669 				bb_job->buf_ptr[inx].use = true;
670 			} else if (!xstrncmp(tok, "swap", 4)) {
671 				have_bb = true;
672 				tok += 4;
673 				while (isspace(tok[0]))
674 					tok++;
675 				bb_job->swap_size = strtol(tok, &end_ptr, 10);
676 				if (job_ptr->details &&
677 				    job_ptr->details->max_nodes) {
678 					bb_job->swap_nodes =
679 						job_ptr->details->max_nodes;
680 				} else if (job_ptr->details) {
681 					bb_job->swap_nodes =
682 						job_ptr->details->min_nodes;
683 				} else {
684 					bb_job->swap_nodes = 1;
685 				}
686 				tmp_cnt = (uint64_t) bb_job->swap_size *
687 					  bb_job->swap_nodes;
688 				if ((sub_tok = strstr(tok, "pool="))) {
689 					xfree(bb_job->job_pool);
690 					bb_job->job_pool = xstrdup(sub_tok + 5);
691 					sub_tok = strchr(bb_job->job_pool, ' ');
692 					if (sub_tok)
693 						sub_tok[0] = '\0';
694 				} else if (!bb_job->job_pool) {
695 					bb_job->job_pool = xstrdup(
696 						bb_state.bb_config.default_pool);
697 				}
698 				tmp_cnt = _set_granularity(tmp_cnt,
699 							   bb_job->job_pool);
700 				bb_job->req_size += tmp_cnt;
701 				bb_job->total_size += tmp_cnt;
702 				bb_job->use_job_buf = true;
703 			} else {
704 				/* Ignore stage-in, stage-out, etc. */
705 			}
706 		}
707 		tok = strtok_r(NULL, "\n", &save_ptr);
708 	}
709 	xfree(bb_specs);
710 
711 	if (!have_bb) {
712 		xfree(job_ptr->state_desc);
713 		job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
714 		xstrfmtcat(job_ptr->state_desc,
715 			   "%s: Invalid burst buffer spec (%s)",
716 			   plugin_type, job_ptr->burst_buffer);
717 		job_ptr->priority = 0;
718 		info("Invalid burst buffer spec for %pJ (%s)",
719 		     job_ptr, job_ptr->burst_buffer);
720 		bb_job_del(&bb_state, job_ptr->job_id);
721 		return NULL;
722 	}
723 	if (!bb_job->job_pool)
724 		bb_job->job_pool = xstrdup(bb_state.bb_config.default_pool);
725 	if (bb_state.bb_config.debug_flag)
726 		bb_job_log(&bb_state, bb_job);
727 	return bb_job;
728 }
729 
730 /* At slurmctld start up time, for every currently active burst buffer,
731  * update that user's limit. Also log every recovered buffer */
_apply_limits(void)732 static void _apply_limits(void)
733 {
734 	bool emulate_cray = false;
735 	bb_alloc_t *bb_alloc;
736 	int i;
737 
738 	if (bb_state.bb_config.flags & BB_FLAG_EMULATE_CRAY)
739 		emulate_cray = true;
740 
741 	for (i = 0; i < BB_HASH_SIZE; i++) {
742 		bb_alloc = bb_state.bb_ahash[i];
743 		while (bb_alloc) {
744 			info("Recovered buffer Name:%s User:%u Pool:%s Size:%"PRIu64,
745 			     bb_alloc->name, bb_alloc->user_id,
746 			     bb_alloc->pool, bb_alloc->size);
747 			_set_assoc_mgr_ptrs(bb_alloc);
748 			bb_limit_add(bb_alloc->user_id, bb_alloc->size,
749 				     bb_alloc->pool, &bb_state, emulate_cray);
750 			bb_alloc = bb_alloc->next;
751 		}
752 	}
753 }
754 
755 /* Write current burst buffer state to a file so that we can preserve account,
756  * partition, and QOS information of persistent burst buffers as there is no
757  * place to store that information within the DataWarp data structures */
_save_bb_state(void)758 static void _save_bb_state(void)
759 {
760 	static time_t last_save_time = 0;
761 	static int high_buffer_size = 16 * 1024;
762 	time_t save_time = time(NULL);
763 	bb_alloc_t *bb_alloc;
764 	uint32_t rec_count = 0;
765 	Buf buffer;
766 	char *old_file = NULL, *new_file = NULL, *reg_file = NULL;
767 	int i, count_offset, offset, state_fd;
768 	int error_code = 0;
769 	uint16_t protocol_version = SLURM_PROTOCOL_VERSION;
770 
771 	if ((bb_state.last_update_time <= last_save_time) &&
772 	    !bb_state.term_flag)
773 		return;
774 
775 	/* Build buffer with name/account/partition/qos information for all
776 	 * named burst buffers so we can preserve limits across restarts */
777 	buffer = init_buf(high_buffer_size);
778 	pack16(protocol_version, buffer);
779 	count_offset = get_buf_offset(buffer);
780 	pack32(rec_count, buffer);
781 	if (bb_state.bb_ahash) {
782 		slurm_mutex_lock(&bb_state.bb_mutex);
783 		for (i = 0; i < BB_HASH_SIZE; i++) {
784 			bb_alloc = bb_state.bb_ahash[i];
785 			while (bb_alloc) {
786 				if (bb_alloc->name) {
787 					packstr(bb_alloc->account,	buffer);
788 					pack_time(bb_alloc->create_time,buffer);
789 					pack32(bb_alloc->id,		buffer);
790 					packstr(bb_alloc->name,		buffer);
791 					packstr(bb_alloc->partition,	buffer);
792 					packstr(bb_alloc->pool,		buffer);
793 					packstr(bb_alloc->qos,		buffer);
794 					pack32(bb_alloc->user_id,	buffer);
795 					if (bb_state.bb_config.flags &
796 					    BB_FLAG_EMULATE_CRAY)
797 						pack64(bb_alloc->size,	buffer);
798 					rec_count++;
799 				}
800 				bb_alloc = bb_alloc->next;
801 			}
802 		}
803 		save_time = time(NULL);
804 		slurm_mutex_unlock(&bb_state.bb_mutex);
805 		offset = get_buf_offset(buffer);
806 		set_buf_offset(buffer, count_offset);
807 		pack32(rec_count, buffer);
808 		set_buf_offset(buffer, offset);
809 	}
810 
811 	xstrfmtcat(old_file, "%s/%s", slurmctld_conf.state_save_location,
812 		   "burst_buffer_cray_state.old");
813 	xstrfmtcat(reg_file, "%s/%s", slurmctld_conf.state_save_location,
814 		   "burst_buffer_cray_state");
815 	xstrfmtcat(new_file, "%s/%s", slurmctld_conf.state_save_location,
816 		   "burst_buffer_cray_state.new");
817 
818 	state_fd = creat(new_file, 0600);
819 	if (state_fd < 0) {
820 		error("%s: %s: Can't save state, error creating file %s, %m",
821 		      plugin_type, __func__, new_file);
822 		error_code = errno;
823 	} else {
824 		int pos = 0, nwrite = get_buf_offset(buffer), amount, rc;
825 		char *data = (char *)get_buf_data(buffer);
826 		high_buffer_size = MAX(nwrite, high_buffer_size);
827 		while (nwrite > 0) {
828 			amount = write(state_fd, &data[pos], nwrite);
829 			if ((amount < 0) && (errno != EINTR)) {
830 				error("Error writing file %s, %m", new_file);
831 				break;
832 			}
833 			nwrite -= amount;
834 			pos    += amount;
835 		}
836 
837 		rc = fsync_and_close(state_fd, "burst_buffer_cray");
838 		if (rc && !error_code)
839 			error_code = rc;
840 	}
841 	if (error_code)
842 		(void) unlink(new_file);
843 	else {			/* file shuffle */
844 		last_save_time = save_time;
845 		(void) unlink(old_file);
846 		if (link(reg_file, old_file)) {
847 			debug4("unable to create link for %s -> %s: %m",
848 			       reg_file, old_file);
849 		}
850 		(void) unlink(reg_file);
851 		if (link(new_file, reg_file)) {
852 			debug4("unable to create link for %s -> %s: %m",
853 			       new_file, reg_file);
854 		}
855 		(void) unlink(new_file);
856 	}
857 	xfree(old_file);
858 	xfree(reg_file);
859 	xfree(new_file);
860 	free_buf(buffer);
861 }
862 
863 /* Open the partition state save file, or backup if necessary.
864  * state_file IN - the name of the state save file used
865  * RET the file description to read from or error code
866  */
_open_part_state_file(char ** state_file)867 static int _open_part_state_file(char **state_file)
868 {
869 	int state_fd;
870 	struct stat stat_buf;
871 
872 	*state_file = xstrdup(slurmctld_conf.state_save_location);
873 	xstrcat(*state_file, "/burst_buffer_cray_state");
874 	state_fd = open(*state_file, O_RDONLY);
875 	if (state_fd < 0) {
876 		error("Could not open burst buffer state file %s: %m",
877 		      *state_file);
878 	} else if (fstat(state_fd, &stat_buf) < 0) {
879 		error("Could not stat burst buffer state file %s: %m",
880 		      *state_file);
881 		(void) close(state_fd);
882 	} else if (stat_buf.st_size < 4) {
883 		error("Burst buffer state file %s too small", *state_file);
884 		(void) close(state_fd);
885 	} else	/* Success */
886 		return state_fd;
887 
888 	error("NOTE: Trying backup burst buffer state save file. "
889 	      "Information may be lost!");
890 	xstrcat(*state_file, ".old");
891 	state_fd = open(*state_file, O_RDONLY);
892 	return state_fd;
893 }
894 
895 /* Return true if the burst buffer name is that of a job (i.e. numeric) and
896  * and that job is complete. Otherwise return false. */
_is_complete_job(char * name)897 static bool _is_complete_job(char *name)
898 {
899 	char *end_ptr = NULL;
900 	uint32_t job_id = 0;
901 	job_record_t *job_ptr;
902 
903 	if (name && (name[0] >='0') && (name[0] <='9')) {
904 		job_id = strtol(name, &end_ptr, 10);
905 		job_ptr = find_job_record(job_id);
906 		if (!job_ptr || IS_JOB_COMPLETED(job_ptr))
907 			return true;
908 	}
909 	return false;
910 }
911 
912 /* Recover saved burst buffer state and use it to preserve account, partition,
913  * and QOS information for persistent burst buffers. */
_recover_bb_state(void)914 static void _recover_bb_state(void)
915 {
916 	char *state_file = NULL, *data = NULL;
917 	int data_allocated, data_read = 0;
918 	uint16_t protocol_version = NO_VAL16;
919 	uint32_t data_size = 0, rec_count = 0, name_len = 0;
920 	uint32_t id = 0, user_id = 0;
921 	uint64_t size = 0;
922 	int i, state_fd;
923 	char *account = NULL, *name = NULL;
924 	char *partition = NULL, *pool = NULL, *qos = NULL;
925 	char *end_ptr = NULL;
926 	time_t create_time = 0;
927 	bb_alloc_t *bb_alloc;
928 	Buf buffer;
929 
930 	state_fd = _open_part_state_file(&state_file);
931 	if (state_fd < 0) {
932 		info("No burst buffer state file (%s) to recover",
933 		     state_file);
934 		xfree(state_file);
935 		return;
936 	}
937 	data_allocated = BUF_SIZE;
938 	data = xmalloc(data_allocated);
939 	while (1) {
940 		data_read = read(state_fd, &data[data_size], BUF_SIZE);
941 		if (data_read < 0) {
942 			if  (errno == EINTR)
943 				continue;
944 			else {
945 				error("Read error on %s: %m", state_file);
946 				break;
947 			}
948 		} else if (data_read == 0)     /* eof */
949 			break;
950 		data_size      += data_read;
951 		data_allocated += data_read;
952 		xrealloc(data, data_allocated);
953 	}
954 	close(state_fd);
955 	xfree(state_file);
956 
957 	buffer = create_buf(data, data_size);
958 	safe_unpack16(&protocol_version, buffer);
959 	if (protocol_version == NO_VAL16) {
960 		if (!ignore_state_errors)
961 			fatal("Can not recover burst_buffer/datawarp state, data version incompatible, start with '-i' to ignore this. Warning: using -i will lose the data that can't be recovered.");
962 		error("**********************************************************************");
963 		error("Can not recover burst_buffer/datawarp state, data version incompatible");
964 		error("**********************************************************************");
965 		return;
966 	}
967 
968 	safe_unpack32(&rec_count, buffer);
969 	for (i = 0; i < rec_count; i++) {
970 		if (protocol_version >= SLURM_MIN_PROTOCOL_VERSION) {
971 			safe_unpackstr_xmalloc(&account,   &name_len, buffer);
972 			safe_unpack_time(&create_time, buffer);
973 			safe_unpack32(&id, buffer);
974 			safe_unpackstr_xmalloc(&name,      &name_len, buffer);
975 			safe_unpackstr_xmalloc(&partition, &name_len, buffer);
976 			safe_unpackstr_xmalloc(&pool,      &name_len, buffer);
977 			safe_unpackstr_xmalloc(&qos,       &name_len, buffer);
978 			safe_unpack32(&user_id, buffer);
979 			if (bb_state.bb_config.flags & BB_FLAG_EMULATE_CRAY)
980 				safe_unpack64(&size, buffer);
981 		}
982 
983 		if ((bb_state.bb_config.flags & BB_FLAG_EMULATE_CRAY) &&
984 		    _is_complete_job(name)) {
985 			info("%s: %s, Ignoring burst buffer state for completed job %s",
986 			     plugin_type, __func__, name);
987 			bb_alloc = NULL;
988 		} else if (bb_state.bb_config.flags & BB_FLAG_EMULATE_CRAY) {
989 			bb_alloc = bb_alloc_name_rec(&bb_state, name, user_id);
990 			bb_alloc->id = id;
991 			last_persistent_id = MAX(last_persistent_id, id);
992 			if (name && (name[0] >='0') && (name[0] <='9')) {
993 				bb_alloc->job_id = strtol(name, &end_ptr, 10);
994 				bb_alloc->array_job_id = bb_alloc->job_id;
995 				bb_alloc->array_task_id = NO_VAL;
996 			}
997 			bb_alloc->seen_time = time(NULL);
998 			bb_alloc->size = size;
999 		} else {
1000 			bb_alloc = bb_find_name_rec(name, user_id, &bb_state);
1001 		}
1002 		if (bb_alloc) {
1003 			if (bb_state.bb_config.debug_flag) {
1004 				info("Recovered burst buffer %s from user %u",
1005 				     bb_alloc->name, bb_alloc->user_id);
1006 			}
1007 			xfree(bb_alloc->account);
1008 			bb_alloc->account = account;
1009 			account = NULL;
1010 			bb_alloc->create_time = create_time;
1011 			xfree(bb_alloc->partition);
1012 			bb_alloc->partition = partition;
1013 			partition = NULL;
1014 			xfree(bb_alloc->pool);
1015 			bb_alloc->pool = pool;
1016 			pool = NULL;
1017 			xfree(bb_alloc->qos);
1018 			bb_alloc->qos = qos;
1019 			qos = NULL;
1020 		}
1021 		xfree(account);
1022 		xfree(name);
1023 		xfree(partition);
1024 		xfree(pool);
1025 		xfree(qos);
1026 	}
1027 
1028 	info("Recovered state of %d burst buffers", rec_count);
1029 	free_buf(buffer);
1030 	return;
1031 
1032 unpack_error:
1033 	if (!ignore_state_errors)
1034 		fatal("Incomplete burst buffer data checkpoint file, start with '-i' to ignore this. Warning: using -i will lose the data that can't be recovered.");
1035 	error("Incomplete burst buffer data checkpoint file");
1036 	xfree(account);
1037 	xfree(name);
1038 	xfree(partition);
1039 	xfree(qos);
1040 	free_buf(buffer);
1041 	return;
1042 }
1043 
1044 /* We just found an unexpected session, set default account, QOS, & partition.
1045  * Copy the information from any currently existing session for the same user.
1046  * If none found, use his default account and QOS.
1047  * NOTE: assoc_mgr_locks need to be locked with
1048  * assoc_mgr_lock_t assoc_locks = { READ_LOCK, NO_LOCK, READ_LOCK, NO_LOCK,
1049  *				    NO_LOCK, NO_LOCK, NO_LOCK };
1050  * before calling this.
1051  */
_pick_alloc_account(bb_alloc_t * bb_alloc)1052 static void _pick_alloc_account(bb_alloc_t *bb_alloc)
1053 {
1054 	slurmdb_assoc_rec_t assoc_rec;
1055 	slurmdb_qos_rec_t   qos_rec;
1056 	bb_alloc_t *bb_ptr = NULL;
1057 
1058 	bb_ptr = bb_state.bb_ahash[bb_alloc->user_id % BB_HASH_SIZE];
1059 	while (bb_ptr) {
1060 		if ((bb_ptr          != bb_alloc) &&
1061 		    (bb_ptr->user_id == bb_alloc->user_id)) {
1062 			xfree(bb_alloc->account);
1063 			bb_alloc->account   = xstrdup(bb_ptr->account);
1064 			bb_alloc->assoc_ptr = bb_ptr->assoc_ptr;
1065 			xfree(bb_alloc->partition);
1066 			bb_alloc->partition = xstrdup(bb_ptr->partition);
1067 			xfree(bb_alloc->qos);
1068 			bb_alloc->qos       = xstrdup(bb_ptr->qos);
1069 			bb_alloc->qos_ptr = bb_ptr->qos_ptr;
1070 			xfree(bb_alloc->assocs);
1071 			bb_alloc->assocs    = xstrdup(bb_ptr->assocs);
1072 			return;
1073 		}
1074 		bb_ptr = bb_ptr->next;
1075 	}
1076 
1077 	/* Set default for this user */
1078 	bb_alloc->partition = xstrdup(default_part_name);
1079 	memset(&assoc_rec, 0, sizeof(slurmdb_assoc_rec_t));
1080 	memset(&qos_rec, 0, sizeof(slurmdb_qos_rec_t));
1081 	assoc_rec.partition = default_part_name;
1082 	assoc_rec.uid = bb_alloc->user_id;
1083 
1084 	if (assoc_mgr_fill_in_assoc(acct_db_conn, &assoc_rec,
1085 				    accounting_enforce,
1086 				    &bb_alloc->assoc_ptr,
1087 				    true) == SLURM_SUCCESS) {
1088 		xfree(bb_alloc->account);
1089 		bb_alloc->account   = xstrdup(assoc_rec.acct);
1090 		xfree(bb_alloc->assocs);
1091 		if (bb_alloc->assoc_ptr)
1092 			bb_alloc->assocs =
1093 				xstrdup_printf(",%u,", bb_alloc->assoc_ptr->id);
1094 
1095 		assoc_mgr_get_default_qos_info(bb_alloc->assoc_ptr, &qos_rec);
1096 		if (assoc_mgr_fill_in_qos(acct_db_conn, &qos_rec,
1097 					  accounting_enforce,
1098 					  &bb_alloc->qos_ptr,
1099 					  true) == SLURM_SUCCESS) {
1100 			xfree(bb_alloc->qos);
1101 			if (bb_alloc->qos_ptr)
1102 				bb_alloc->qos =
1103 					xstrdup(bb_alloc->qos_ptr->name);
1104 		}
1105 	}
1106 }
1107 
1108 /* For a given user/partition/account, set it's assoc_ptr */
_set_assoc_mgr_ptrs(bb_alloc_t * bb_alloc)1109 static void _set_assoc_mgr_ptrs(bb_alloc_t *bb_alloc)
1110 {
1111 	/* read locks on assoc */
1112 	assoc_mgr_lock_t assoc_locks =
1113 		{ .assoc = READ_LOCK, .qos = READ_LOCK, .user = READ_LOCK };
1114 	slurmdb_assoc_rec_t assoc_rec;
1115 	slurmdb_qos_rec_t qos_rec;
1116 
1117 	memset(&assoc_rec, 0, sizeof(slurmdb_assoc_rec_t));
1118 	assoc_rec.acct      = bb_alloc->account;
1119 	assoc_rec.partition = bb_alloc->partition;
1120 	assoc_rec.uid       = bb_alloc->user_id;
1121 	assoc_mgr_lock(&assoc_locks);
1122 	if (assoc_mgr_fill_in_assoc(acct_db_conn, &assoc_rec,
1123 				    accounting_enforce,
1124 				    &bb_alloc->assoc_ptr,
1125 				    true) == SLURM_SUCCESS) {
1126 		xfree(bb_alloc->assocs);
1127 		if (bb_alloc->assoc_ptr) {
1128 			bb_alloc->assocs =
1129 				xstrdup_printf(",%u,", bb_alloc->assoc_ptr->id);
1130 		}
1131 	}
1132 
1133 	memset(&qos_rec, 0, sizeof(slurmdb_qos_rec_t));
1134 	qos_rec.name = bb_alloc->qos;
1135 	if (assoc_mgr_fill_in_qos(acct_db_conn, &qos_rec, accounting_enforce,
1136 				  &bb_alloc->qos_ptr, true) != SLURM_SUCCESS)
1137 		verbose("%s: %s: Invalid QOS name: %s",
1138 			plugin_type, __func__, bb_alloc->qos);
1139 
1140 	assoc_mgr_unlock(&assoc_locks);
1141 }
1142 
1143 /*
1144  * Determine the current actual burst buffer state.
1145  */
_load_state(bool init_config)1146 static void _load_state(bool init_config)
1147 {
1148 	static bool first_run = true;
1149 	burst_buffer_pool_t *pool_ptr;
1150 	bb_configs_t *configs;
1151 	bb_instances_t *instances;
1152 	bb_pools_t *pools;
1153 	bb_sessions_t *sessions;
1154 	bb_alloc_t *bb_alloc;
1155 	job_record_t *job_ptr;
1156 	int num_configs = 0, num_instances = 0, num_pools = 0, num_sessions = 0;
1157 	int i, j, pools_inx;
1158 	char *end_ptr = NULL;
1159 	time_t now = time(NULL);
1160 	uint32_t timeout;
1161 	assoc_mgr_lock_t assoc_locks = { .assoc = READ_LOCK,
1162 					 .qos = READ_LOCK,
1163 					 .user = READ_LOCK };
1164 	bool found_pool;
1165 	bitstr_t *pools_bitmap;
1166 
1167 	slurm_mutex_lock(&bb_state.bb_mutex);
1168 	timeout = bb_state.bb_config.other_timeout * 1000;
1169 	slurm_mutex_unlock(&bb_state.bb_mutex);
1170 
1171 	/*
1172 	 * Load the pools information
1173 	 */
1174 	pools = _bb_get_pools(&num_pools, &bb_state, timeout);
1175 	if (pools == NULL) {
1176 		error("%s: %s: failed to find DataWarp entries, what now?",
1177 		      plugin_type, __func__);
1178 		return;
1179 	}
1180 
1181 	pools_bitmap = bit_alloc(bb_state.bb_config.pool_cnt + num_pools);
1182 	slurm_mutex_lock(&bb_state.bb_mutex);
1183 	if (!bb_state.bb_config.default_pool && (num_pools > 0)) {
1184 		info("%s: %s: Setting DefaultPool to %s",
1185 		     plugin_type, __func__, pools[0].id);
1186 		bb_state.bb_config.default_pool = xstrdup(pools[0].id);
1187 	}
1188 
1189 	for (i = 0; i < num_pools; i++) {
1190 		/* ID: "bytes" */
1191 		if (xstrcmp(pools[i].id,
1192 			    bb_state.bb_config.default_pool) == 0) {
1193 			bb_state.bb_config.granularity = pools[i].granularity;
1194 			bb_state.total_space = pools[i].quantity *
1195 					       pools[i].granularity;
1196 			bb_state.unfree_space = pools[i].quantity -
1197 						pools[i].free;
1198 			bb_state.unfree_space *= pools[i].granularity;
1199 			continue;
1200 		}
1201 
1202 		found_pool = false;
1203 		pool_ptr = bb_state.bb_config.pool_ptr;
1204 		for (j = 0; j < bb_state.bb_config.pool_cnt; j++, pool_ptr++) {
1205 			if (!xstrcmp(pool_ptr->name, pools[i].id)) {
1206 				found_pool = true;
1207 				break;
1208 			}
1209 		}
1210 		if (!found_pool) {
1211 			if (!first_run) {
1212 				info("%s: %s: Newly reported pool %s",
1213 				     plugin_type, __func__, pools[i].id);
1214 			}
1215 			bb_state.bb_config.pool_ptr
1216 				= xrealloc(bb_state.bb_config.pool_ptr,
1217 					   sizeof(burst_buffer_pool_t) *
1218 					   (bb_state.bb_config.pool_cnt + 1));
1219 			pool_ptr = bb_state.bb_config.pool_ptr +
1220 				   bb_state.bb_config.pool_cnt;
1221 			pool_ptr->name = xstrdup(pools[i].id);
1222 			bb_state.bb_config.pool_cnt++;
1223 		}
1224 
1225 		pools_inx = pool_ptr - bb_state.bb_config.pool_ptr;
1226 		bit_set(pools_bitmap, pools_inx);
1227 		pool_ptr->total_space = pools[i].quantity *
1228 					pools[i].granularity;
1229 		pool_ptr->granularity = pools[i].granularity;
1230 		pool_ptr->unfree_space = pools[i].quantity - pools[i].free;
1231 		pool_ptr->unfree_space *= pools[i].granularity;
1232 	}
1233 
1234 	pool_ptr = bb_state.bb_config.pool_ptr;
1235 	for (j = 0; j < bb_state.bb_config.pool_cnt; j++, pool_ptr++) {
1236 		if (bit_test(pools_bitmap, j) || (pool_ptr->total_space == 0))
1237 			continue;
1238 		error("%s: %s: Pool %s no longer reported by system, setting size to zero",
1239 		      plugin_type, __func__,  pool_ptr->name);
1240 		pool_ptr->total_space  = 0;
1241 		pool_ptr->used_space   = 0;
1242 		pool_ptr->unfree_space = 0;
1243 	}
1244 	first_run = false;
1245 	slurm_mutex_unlock(&bb_state.bb_mutex);
1246 	FREE_NULL_BITMAP(pools_bitmap);
1247 	_bb_free_pools(pools, num_pools);
1248 
1249 	/*
1250 	 * Load the instances information
1251 	 */
1252 	instances = _bb_get_instances(&num_instances, &bb_state, timeout);
1253 	if (instances == NULL) {
1254 		if (bb_state.bb_config.debug_flag)
1255 			debug("%s: %s: No DataWarp instances found",
1256 			      plugin_type, __func__);
1257 		num_instances = 0;	/* Redundant, but fixes CLANG bug */
1258 	}
1259 	sessions = _bb_get_sessions(&num_sessions, &bb_state, timeout);
1260 	assoc_mgr_lock(&assoc_locks);
1261 	slurm_mutex_lock(&bb_state.bb_mutex);
1262 	bb_state.last_load_time = time(NULL);
1263 	for (i = 0; i < num_sessions; i++) {
1264 		if (!init_config) {
1265 			bb_alloc = bb_find_name_rec(sessions[i].token,
1266 						    sessions[i].user_id,
1267 						    &bb_state);
1268 			if (bb_alloc) {
1269 				bb_alloc->seen_time = bb_state.last_load_time;
1270 				continue;
1271 			}
1272 			if (difftime(now, sessions[i].created) <
1273 			    bb_state.bb_config.other_timeout) {
1274 				/* Newly created in other thread. Give that
1275 				 * thread a chance to add the entry */
1276 				continue;
1277 			}
1278 			error("%s: %s: Unexpected burst buffer found: %s",
1279 			      plugin_type, __func__, sessions[i].token);
1280 		}
1281 
1282 		bb_alloc = bb_alloc_name_rec(&bb_state, sessions[i].token,
1283 					     sessions[i].user_id);
1284 		bb_alloc->create_time = sessions[i].created;
1285 		bb_alloc->id = sessions[i].id;
1286 		if ((sessions[i].token != NULL)   &&
1287 		    (sessions[i].token[0] >= '0') &&
1288 		    (sessions[i].token[0] <= '9')) {
1289 			bb_alloc->job_id =
1290 				strtol(sessions[i].token, &end_ptr, 10);
1291 			job_ptr = find_job_record(bb_alloc->job_id);
1292 			if (job_ptr) {
1293 				bb_alloc->array_job_id = job_ptr->array_job_id;
1294 				bb_alloc->array_task_id =job_ptr->array_task_id;
1295 			} else {
1296 				bb_alloc->array_task_id = NO_VAL;
1297 			}
1298 		}
1299 		for (j = 0; j < num_instances; j++) {
1300 			if (sessions[i].id != instances[j].session)
1301 				continue;
1302 			bb_alloc->size += instances[j].bytes;
1303 		}
1304 		bb_alloc->seen_time = bb_state.last_load_time;
1305 
1306 		if (!init_config) {	/* Newly found buffer */
1307 			_pick_alloc_account(bb_alloc);
1308 			bb_limit_add(bb_alloc->user_id, bb_alloc->size,
1309 				     bb_alloc->pool, &bb_state, false);
1310 		}
1311 		if (bb_alloc->job_id == 0)
1312 			bb_post_persist_create(NULL, bb_alloc, &bb_state);
1313 	}
1314 	slurm_mutex_unlock(&bb_state.bb_mutex);
1315 	assoc_mgr_unlock(&assoc_locks);
1316 	_bb_free_sessions(sessions, num_sessions);
1317 	_bb_free_instances(instances, num_instances);
1318 
1319 	if (!init_config)
1320 		return;
1321 
1322 	/*
1323 	 * Load the configurations information
1324 	 * NOTE: This information is currently unused
1325 	 */
1326 	configs = _bb_get_configs(&num_configs, &bb_state, timeout);
1327 	if (configs == NULL) {
1328 		info("%s: %s: No DataWarp configurations found",
1329 		     plugin_type, __func__);
1330 		num_configs = 0;
1331 	}
1332 	_bb_free_configs(configs, num_configs);
1333 
1334 	_recover_bb_state();
1335 	_apply_limits();
1336 	bb_state.last_update_time = time(NULL);
1337 
1338 	return;
1339 }
1340 
1341 /* Write an string representing the NIDs of a job's nodes to an arbitrary
1342  * file location
1343  * RET 0 or Slurm error code
1344  */
_write_nid_file(char * file_name,char * node_list,job_record_t * job_ptr)1345 static int _write_nid_file(char *file_name, char *node_list,
1346 			   job_record_t *job_ptr)
1347 {
1348 #if defined(HAVE_NATIVE_CRAY)
1349 	char *tmp, *sep, *buf = NULL;
1350 	int i, j, rc;
1351 
1352 	xassert(file_name);
1353 	tmp = xstrdup(node_list);
1354 	/* Remove any trailing "]" */
1355 	sep = strrchr(tmp, ']');
1356 	if (sep)
1357 		sep[0] = '\0';
1358 	/* Skip over "nid[" or "nid" */
1359 	sep = strchr(tmp, '[');
1360 	if (sep) {
1361 		sep++;
1362 	} else {
1363 		sep = tmp;
1364 		for (i = 0; !isdigit(sep[0]) && sep[0]; i++)
1365 			sep++;
1366 	}
1367 	/* Copy numeric portion */
1368 	buf = xmalloc(strlen(sep) + 1);
1369 	for (i = 0, j = 0; sep[i]; i++) {
1370 		/* Skip leading zeros */
1371 		if ((sep[i] == '0') && isdigit(sep[i+1]))
1372 			continue;
1373 		/* Copy significant digits and separator */
1374 		while (sep[i]) {
1375 			if (sep[i] == ',') {
1376 				buf[j++] = '\n';
1377 				break;
1378 			}
1379 			buf[j++] = sep[i];
1380 			if (sep[i] == '-')
1381 				break;
1382 			i++;
1383 		}
1384 		if (!sep[i])
1385 			break;
1386 	}
1387 	xfree(tmp);
1388 
1389 	if (buf[0]) {
1390 		rc = _write_file(file_name, buf);
1391 	} else {
1392 		error("%s: %s: %pJ has node list without numeric component (%s)",
1393 		      plugin_type, __func__, job_ptr, node_list);
1394 		rc = EINVAL;
1395 	}
1396 	xfree(buf);
1397 	return rc;
1398 #else
1399 	char *tok, *buf = NULL;
1400 	int rc;
1401 
1402 	xassert(file_name);
1403 	if (node_list && node_list[0]) {
1404 		hostlist_t hl = hostlist_create(node_list);
1405 		while ((tok = hostlist_shift(hl))) {
1406 			xstrfmtcat(buf, "%s\n", tok);
1407 			free(tok);
1408 		}
1409 		hostlist_destroy(hl);
1410 		rc = _write_file(file_name, buf);
1411 		xfree(buf);
1412 	} else {
1413 		error("%s: %s: %pJ lacks a node list",
1414 		      plugin_type, __func__, job_ptr);
1415 		rc = EINVAL;
1416 	}
1417 	return rc;
1418 #endif
1419 }
1420 
1421 /* Write an arbitrary string to an arbitrary file name */
_write_file(char * file_name,char * buf)1422 static int _write_file(char *file_name, char *buf)
1423 {
1424 	int amount, fd, nwrite, pos;
1425 
1426 	(void) unlink(file_name);
1427 	fd = creat(file_name, 0600);
1428 	if (fd < 0) {
1429 		error("Error creating file %s, %m", file_name);
1430 		return errno;
1431 	}
1432 
1433 	if (!buf) {
1434 		error("%s: %s: buf is NULL", plugin_type, __func__);
1435 		return SLURM_ERROR;
1436 	}
1437 
1438 	nwrite = strlen(buf);
1439 	pos = 0;
1440 	while (nwrite > 0) {
1441 		amount = write(fd, &buf[pos], nwrite);
1442 		if ((amount < 0) && (errno != EINTR)) {
1443 			error("Error writing file %s, %m", file_name);
1444 			close(fd);
1445 			return ESLURM_WRITING_TO_FILE;
1446 		}
1447 		nwrite -= amount;
1448 		pos    += amount;
1449 	}
1450 
1451 	(void) close(fd);
1452 	return SLURM_SUCCESS;
1453 }
1454 
_queue_stage_in(job_record_t * job_ptr,bb_job_t * bb_job)1455 static int _queue_stage_in(job_record_t *job_ptr, bb_job_t *bb_job)
1456 {
1457 	char *hash_dir = NULL, *job_dir = NULL, *job_pool;
1458 	char *client_nodes_file_nid = NULL;
1459 	char **setup_argv, **data_in_argv;
1460 	stage_args_t *stage_args;
1461 	int hash_inx = job_ptr->job_id % 10;
1462 	int rc = SLURM_SUCCESS;
1463 	pthread_t tid;
1464 
1465 	xstrfmtcat(hash_dir, "%s/hash.%d", state_save_loc, hash_inx);
1466 	(void) mkdir(hash_dir, 0700);
1467 	xstrfmtcat(job_dir, "%s/job.%u", hash_dir, job_ptr->job_id);
1468 	if (job_ptr->sched_nodes) {
1469 		xstrfmtcat(client_nodes_file_nid, "%s/client_nids", job_dir);
1470 		if (_write_nid_file(client_nodes_file_nid,
1471 				    job_ptr->sched_nodes, job_ptr))
1472 			xfree(client_nodes_file_nid);
1473 	}
1474 	setup_argv = xcalloc(20, sizeof(char *));	/* NULL terminated */
1475 	setup_argv[0] = xstrdup("dw_wlm_cli");
1476 	setup_argv[1] = xstrdup("--function");
1477 	setup_argv[2] = xstrdup("setup");
1478 	setup_argv[3] = xstrdup("--token");
1479 	xstrfmtcat(setup_argv[4], "%u", job_ptr->job_id);
1480 	setup_argv[5] = xstrdup("--caller");
1481 	setup_argv[6] = xstrdup("SLURM");
1482 	setup_argv[7] = xstrdup("--user");
1483 	xstrfmtcat(setup_argv[8], "%u", job_ptr->user_id);
1484 	setup_argv[9] = xstrdup("--groupid");
1485 	xstrfmtcat(setup_argv[10], "%u", job_ptr->group_id);
1486 	setup_argv[11] = xstrdup("--capacity");
1487 	if (bb_job->job_pool)
1488 		job_pool = bb_job->job_pool;
1489 	else
1490 		job_pool = bb_state.bb_config.default_pool;
1491 	xstrfmtcat(setup_argv[12], "%s:%s",
1492 		   job_pool, bb_get_size_str(bb_job->total_size));
1493 	setup_argv[13] = xstrdup("--job");
1494 	setup_argv[14] = bb_handle_job_script(job_ptr, bb_job);
1495 	if (client_nodes_file_nid) {
1496 #if defined(HAVE_NATIVE_CRAY)
1497 		setup_argv[15] = xstrdup("--nidlistfile");
1498 #else
1499 		setup_argv[15] = xstrdup("--nodehostnamefile");
1500 #endif
1501 		setup_argv[16] = xstrdup(client_nodes_file_nid);
1502 	}
1503 	bb_limit_add(job_ptr->user_id, bb_job->total_size, job_pool, &bb_state,
1504 		     true);
1505 
1506 	data_in_argv = xcalloc(10, sizeof(char *));	/* NULL terminated */
1507 	data_in_argv[0] = xstrdup("dw_wlm_cli");
1508 	data_in_argv[1] = xstrdup("--function");
1509 	data_in_argv[2] = xstrdup("data_in");
1510 	data_in_argv[3] = xstrdup("--token");
1511 	xstrfmtcat(data_in_argv[4], "%u", job_ptr->job_id);
1512 	data_in_argv[5] = xstrdup("--job");
1513 	data_in_argv[6] = bb_handle_job_script(job_ptr, bb_job);
1514 
1515 	stage_args = xmalloc(sizeof(stage_args_t));
1516 	stage_args->bb_size = bb_job->total_size;
1517 	stage_args->job_id  = job_ptr->job_id;
1518 	stage_args->pool    = xstrdup(job_pool);
1519 	stage_args->user_id = job_ptr->user_id;
1520 	stage_args->args1   = setup_argv;
1521 	stage_args->args2   = data_in_argv;
1522 
1523 	slurm_thread_create(&tid, _start_stage_in, stage_args);
1524 
1525 	xfree(hash_dir);
1526 	xfree(job_dir);
1527 	xfree(client_nodes_file_nid);
1528 	return rc;
1529 }
1530 
_update_system_comment(job_record_t * job_ptr,char * operation,char * resp_msg,bool update_database)1531 static void _update_system_comment(job_record_t *job_ptr, char *operation,
1532 				   char *resp_msg, bool update_database)
1533 {
1534 	char *sep = NULL;
1535 
1536 	if (job_ptr->system_comment &&
1537 	    (strlen(job_ptr->system_comment) >= 1024)) {
1538 		/* Avoid filling comment with repeated BB failures */
1539 		return;
1540 	}
1541 
1542 	if (job_ptr->system_comment)
1543 		xstrftimecat(sep, "\n%x %X");
1544 	else
1545 		xstrftimecat(sep, "%x %X");
1546 	xstrfmtcat(job_ptr->system_comment, "%s %s: %s: %s",
1547 		   sep, plugin_type, operation, resp_msg);
1548 	xfree(sep);
1549 
1550 	if (update_database) {
1551 		slurmdb_job_cond_t job_cond;
1552 		slurmdb_job_rec_t job_rec;
1553 		slurmdb_selected_step_t selected_step;
1554 		List ret_list;
1555 
1556 		memset(&job_cond, 0, sizeof(slurmdb_job_cond_t));
1557 		memset(&job_rec, 0, sizeof(slurmdb_job_rec_t));
1558 		memset(&selected_step, 0, sizeof(slurmdb_selected_step_t));
1559 
1560 		selected_step.array_task_id = NO_VAL;
1561 		selected_step.jobid = job_ptr->job_id;
1562 		selected_step.het_job_offset = NO_VAL;
1563 		selected_step.stepid = NO_VAL;
1564 		job_cond.step_list = list_create(NULL);
1565 		list_append(job_cond.step_list, &selected_step);
1566 
1567 		job_cond.flags = JOBCOND_FLAG_NO_WAIT |
1568 			JOBCOND_FLAG_DBD_UID |
1569 			JOBCOND_FLAG_NO_DEFAULT_USAGE;
1570 
1571 		job_cond.cluster_list = list_create(NULL);
1572 		list_append(job_cond.cluster_list, slurmctld_conf.cluster_name);
1573 
1574 		job_cond.usage_start = job_ptr->details->submit_time;
1575 
1576 		job_rec.system_comment = job_ptr->system_comment;
1577 
1578 		ret_list = acct_storage_g_modify_job(
1579 			acct_db_conn, slurmctld_conf.slurm_user_id,
1580 			&job_cond, &job_rec);
1581 
1582 		FREE_NULL_LIST(job_cond.cluster_list);
1583 		FREE_NULL_LIST(job_cond.step_list);
1584 		FREE_NULL_LIST(ret_list);
1585 	}
1586 }
1587 
_start_stage_in(void * x)1588 static void *_start_stage_in(void *x)
1589 {
1590 	stage_args_t *stage_args = (stage_args_t *) x;
1591 	char **setup_argv, **size_argv, **data_in_argv;
1592 	char *resp_msg = NULL, *resp_msg2 = NULL, *op = NULL;
1593 	uint64_t real_size = 0;
1594 	int rc = SLURM_SUCCESS, status = 0, timeout;
1595 	slurmctld_lock_t job_write_lock =
1596 		{ NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
1597 	job_record_t *job_ptr;
1598 	bb_alloc_t *bb_alloc = NULL;
1599 	bb_job_t *bb_job;
1600 	bool get_real_size = false;
1601 	DEF_TIMERS;
1602 	track_script_rec_add(stage_args->job_id, 0, pthread_self());
1603 
1604 	setup_argv   = stage_args->args1;
1605 	data_in_argv = stage_args->args2;
1606 
1607 	timeout = bb_state.bb_config.other_timeout * 1000;
1608 	op = "setup";
1609 	START_TIMER;
1610 	resp_msg = run_command("setup",
1611 			       bb_state.bb_config.get_sys_state,
1612 			       setup_argv, timeout, pthread_self(),
1613 			       &status);
1614 	END_TIMER;
1615 	info("%s: %s: setup for job JobId=%u ran for %s",
1616 	     plugin_type, __func__, stage_args->job_id, TIME_STR);
1617 
1618 	if (track_script_broadcast(pthread_self(), status)) {
1619 		/* I was killed by slurmtrack, bail out right now */
1620 		info("%s: %s: setup for JobId=%u terminated by slurmctld",
1621 		     plugin_type, __func__, stage_args->job_id);
1622 		free_command_argv(setup_argv);
1623 		free_command_argv(data_in_argv);
1624 		xfree(resp_msg);
1625 		xfree(stage_args->pool);
1626 		xfree(stage_args);
1627 		track_script_remove(pthread_self());
1628 		return NULL;
1629 	}
1630 	track_script_reset_cpid(pthread_self(), 0);
1631 
1632 	_log_script_argv(setup_argv, resp_msg);
1633 	lock_slurmctld(job_write_lock);
1634 	slurm_mutex_lock(&bb_state.bb_mutex);
1635 	/*
1636 	 * The buffer's actual size may be larger than requested by the user.
1637 	 * Remove limit here and restore limit based upon actual size below
1638 	 * (assuming buffer allocation succeeded, or just leave it out).
1639 	 */
1640 	bb_limit_rem(stage_args->user_id, stage_args->bb_size, stage_args->pool,
1641 		     &bb_state);
1642 
1643 	if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
1644 		trigger_burst_buffer();
1645 		error("%s: %s: setup for JobId=%u status:%u response:%s",
1646 		      plugin_type, __func__, stage_args->job_id, status,
1647 		      resp_msg);
1648 		rc = SLURM_ERROR;
1649 		job_ptr = find_job_record(stage_args->job_id);
1650 		if (job_ptr)
1651 			_update_system_comment(job_ptr, "setup", resp_msg, 0);
1652 	} else {
1653 		job_ptr = find_job_record(stage_args->job_id);
1654 		bb_job = bb_job_find(&bb_state, stage_args->job_id);
1655 		if (!job_ptr) {
1656 			error("%s: %s: unable to find job record for JobId=%u",
1657 			      plugin_type, __func__, stage_args->job_id);
1658 			rc = SLURM_ERROR;
1659 		} else if (!bb_job) {
1660 			error("%s: %s: unable to find bb_job record for %pJ",
1661 			      plugin_type, __func__, job_ptr);
1662 		} else {
1663 			bb_job->state = BB_STATE_STAGING_IN;
1664 			bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr);
1665 			if (!bb_alloc && bb_job->total_size) {
1666 				/* Not found (from restart race condtion) and
1667 				 * job buffer has non-zero size */
1668 				bb_alloc = bb_alloc_job(&bb_state, job_ptr,
1669 							bb_job);
1670 				bb_limit_add(stage_args->user_id,
1671 					     bb_job->total_size,
1672 					     stage_args->pool, &bb_state,
1673 					     true);
1674 				bb_alloc->create_time = time(NULL);
1675 			}
1676 		}
1677 	}
1678 	slurm_mutex_unlock(&bb_state.bb_mutex);
1679 	unlock_slurmctld(job_write_lock);
1680 
1681 	if (rc == SLURM_SUCCESS) {
1682 		timeout = bb_state.bb_config.stage_in_timeout * 1000;
1683 		xfree(resp_msg);
1684 
1685 		op = "dws_data_in";
1686 		START_TIMER;
1687 		resp_msg = run_command("dws_data_in",
1688 				       bb_state.bb_config.get_sys_state,
1689 				       data_in_argv, timeout, pthread_self(),
1690 				       &status);
1691 		END_TIMER;
1692 		info("%s: %s: dws_data_in for JobId=%u ran for %s",
1693 		     plugin_type, __func__, stage_args->job_id, TIME_STR);
1694 		if (track_script_broadcast(pthread_self(), status)) {
1695 			/* I was killed by slurmtrack, bail out right now */
1696 			info("%s: %s: dws_data_in for JobId=%u terminated by slurmctld",
1697 			     plugin_type, __func__, stage_args->job_id);
1698 			free_command_argv(setup_argv);
1699 			free_command_argv(data_in_argv);
1700 			xfree(resp_msg);
1701 			xfree(stage_args->pool);
1702 			xfree(stage_args);
1703 			/*
1704 			 * Don't need to free track_script_rec here,
1705 			 * it is handled elsewhere since it still being tracked.
1706 			 */
1707 			return NULL;
1708 		}
1709 		track_script_reset_cpid(pthread_self(), 0);
1710 
1711 		_log_script_argv(data_in_argv, resp_msg);
1712 		if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
1713 		    !strstr(resp_msg, "No matching session")) {
1714 			trigger_burst_buffer();
1715 			error("%s: %s: dws_data_in for JobId=%u status:%u response:%s",
1716 			      plugin_type, __func__, stage_args->job_id, status,
1717 			      resp_msg);
1718 			rc = SLURM_ERROR;
1719 			lock_slurmctld(job_write_lock);
1720 			job_ptr = find_job_record(stage_args->job_id);
1721 			if (job_ptr)
1722 				_update_system_comment(job_ptr, "data_in",
1723 						       resp_msg, 0);
1724 			unlock_slurmctld(job_write_lock);
1725 		}
1726 	}
1727 
1728 	slurm_mutex_lock(&bb_state.bb_mutex);
1729 	bb_job = bb_job_find(&bb_state, stage_args->job_id);
1730 	if (bb_job && bb_job->req_size)
1731 		get_real_size = true;
1732 	slurm_mutex_unlock(&bb_state.bb_mutex);
1733 
1734 	/* Round up job buffer size based upon DW "equalize_fragments"
1735 	 * configuration parameter */
1736 	if (get_real_size) {
1737 		size_argv = xcalloc(10, sizeof(char *)); /* NULL terminated */
1738 		size_argv[0] = xstrdup("dw_wlm_cli");
1739 		size_argv[1] = xstrdup("--function");
1740 		size_argv[2] = xstrdup("real_size");
1741 		size_argv[3] = xstrdup("--token");
1742 		xstrfmtcat(size_argv[4], "%u", stage_args->job_id);
1743 		START_TIMER;
1744 		resp_msg2 = run_command("real_size",
1745 				        bb_state.bb_config.get_sys_state,
1746 				        size_argv, timeout, pthread_self(),
1747 					&status);
1748 		END_TIMER;
1749 		if ((DELTA_TIMER > 200000) ||	/* 0.2 secs */
1750 		    bb_state.bb_config.debug_flag)
1751 			info("%s: %s: real_size ran for %s",
1752 			     plugin_type, __func__, TIME_STR);
1753 
1754 		if (track_script_broadcast(pthread_self(), status)) {
1755 			/* I was killed by slurmtrack, bail out right now */
1756 			info("%s: %s: real_size for JobId=%u terminated by slurmctld",
1757 			     plugin_type, __func__, stage_args->job_id);
1758 			free_command_argv(setup_argv);
1759 			free_command_argv(data_in_argv);
1760 			xfree(resp_msg);
1761 			xfree(resp_msg2);
1762 			free_command_argv(size_argv);
1763 			xfree(stage_args->pool);
1764 			xfree(stage_args);
1765 			/*
1766 			 * Don't need to free track_script_rec here,
1767 			 * it is handled elsewhere since it still being tracked.
1768 			 */
1769 			return NULL;
1770 		}
1771 		track_script_reset_cpid(pthread_self(), 0);
1772 
1773 		/* Use resp_msg2 to preserve resp_msg for error message below */
1774 		_log_script_argv(size_argv, resp_msg2);
1775 
1776 		if (WIFEXITED(status) && (WEXITSTATUS(status) != 0) &&
1777 		    resp_msg2 &&
1778 		    (strncmp(resp_msg2, "invalid function", 16) == 0)) {
1779 			debug("%s: %s: Old dw_wlm_cli does not support real_size function",
1780 			      plugin_type, __func__);
1781 		} else if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
1782 			error("%s: %s: real_size for JobId=%u status:%u response:%s",
1783 			      plugin_type, __func__, stage_args->job_id,
1784 			      status, resp_msg2);
1785 		} else if (resp_msg2 && resp_msg2[0]) {
1786 			json_object *j;
1787 			struct bb_total_size *ent;
1788 			j = json_tokener_parse(resp_msg2);
1789 			if (j == NULL) {
1790 				error("%s: %s: json parser failed on \"%s\"",
1791 				      plugin_type, __func__, resp_msg2);
1792 			} else {
1793 				ent = _json_parse_real_size(j);
1794 				json_object_put(j);	/* Frees json memory */
1795 				if (ent && (ent->units == BB_UNITS_BYTES))
1796 					real_size = ent->capacity;
1797 				xfree(ent);
1798 			}
1799 		}
1800 		xfree(resp_msg2);
1801 		free_command_argv(size_argv);
1802 	}
1803 
1804 	lock_slurmctld(job_write_lock);
1805 	job_ptr = find_job_record(stage_args->job_id);
1806 	if (!job_ptr) {
1807 		error("%s: %s: unable to find job record for JobId=%u",
1808 		      plugin_type, __func__, stage_args->job_id);
1809 	} else if (rc == SLURM_SUCCESS) {
1810 		slurm_mutex_lock(&bb_state.bb_mutex);
1811 		bb_job = bb_job_find(&bb_state, stage_args->job_id);
1812 		if (bb_job)
1813 			bb_job->state = BB_STATE_STAGED_IN;
1814 		if (bb_job && bb_job->total_size) {
1815 			if (real_size > bb_job->req_size) {
1816 				info("%s: %s: %pJ total_size increased from %"PRIu64" to %"PRIu64,
1817 				     plugin_type, __func__, job_ptr,
1818 				     bb_job->req_size, real_size);
1819 				bb_job->total_size = real_size;
1820 			}
1821 			bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr);
1822 			if (bb_alloc) {
1823 				bb_alloc->state = BB_STATE_STAGED_IN;
1824 				bb_alloc->state_time = time(NULL);
1825 				if (bb_state.bb_config.debug_flag) {
1826 					info("%s: %s: Setup/stage-in complete for %pJ",
1827 					     plugin_type, __func__, job_ptr);
1828 				}
1829 				queue_job_scheduler();
1830 				bb_state.last_update_time = time(NULL);
1831 			} else {
1832 				error("%s: %s: unable to find bb_alloc record for %pJ",
1833 				      plugin_type, __func__, job_ptr);
1834 			}
1835 		}
1836 		slurm_mutex_unlock(&bb_state.bb_mutex);
1837 	} else {
1838 		xfree(job_ptr->state_desc);
1839 		job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
1840 		xstrfmtcat(job_ptr->state_desc, "%s: %s: %s",
1841 			   plugin_type, op, resp_msg);
1842 		job_ptr->priority = 0;	/* Hold job */
1843 		bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr);
1844 		if (bb_alloc) {
1845 			bb_alloc->state_time = time(NULL);
1846 			bb_state.last_update_time = time(NULL);
1847 			if (bb_state.bb_config.flags &
1848 			    BB_FLAG_TEARDOWN_FAILURE) {
1849 				bb_alloc->state = BB_STATE_TEARDOWN;
1850 				_queue_teardown(job_ptr->job_id,
1851 						job_ptr->user_id, true);
1852 			} else {
1853 				bb_alloc->state = BB_STATE_ALLOCATED;
1854 			}
1855 		} else {
1856 			_queue_teardown(job_ptr->job_id, job_ptr->user_id,true);
1857 		}
1858 	}
1859 	unlock_slurmctld(job_write_lock);
1860 
1861 	xfree(resp_msg);
1862 	free_command_argv(setup_argv);
1863 	free_command_argv(data_in_argv);
1864 	xfree(stage_args->pool);
1865 	xfree(stage_args);
1866 
1867 	track_script_remove(pthread_self());
1868 
1869 	return NULL;
1870 }
1871 
_queue_stage_out(job_record_t * job_ptr,bb_job_t * bb_job)1872 static int _queue_stage_out(job_record_t *job_ptr, bb_job_t *bb_job)
1873 {
1874 	char *hash_dir = NULL, *job_dir = NULL;
1875 	char **post_run_argv, **data_out_argv;
1876 	stage_args_t *stage_args;
1877 	int hash_inx = bb_job->job_id % 10, rc = SLURM_SUCCESS;
1878 	pthread_t tid;
1879 
1880 	xstrfmtcat(hash_dir, "%s/hash.%d", state_save_loc, hash_inx);
1881 	xstrfmtcat(job_dir, "%s/job.%u", hash_dir, bb_job->job_id);
1882 
1883 	data_out_argv = xcalloc(10, sizeof(char *));	/* NULL terminated */
1884 	data_out_argv[0] = xstrdup("dw_wlm_cli");
1885 	data_out_argv[1] = xstrdup("--function");
1886 	data_out_argv[2] = xstrdup("data_out");
1887 	data_out_argv[3] = xstrdup("--token");
1888 	xstrfmtcat(data_out_argv[4], "%u", bb_job->job_id);
1889 	data_out_argv[5] = xstrdup("--job");
1890 	data_out_argv[6] = bb_handle_job_script(job_ptr, bb_job);
1891 
1892 	post_run_argv = xcalloc(10, sizeof(char *));	/* NULL terminated */
1893 	post_run_argv[0] = xstrdup("dw_wlm_cli");
1894 	post_run_argv[1] = xstrdup("--function");
1895 	post_run_argv[2] = xstrdup("post_run");
1896 	post_run_argv[3] = xstrdup("--token");
1897 	xstrfmtcat(post_run_argv[4], "%u", bb_job->job_id);
1898 	post_run_argv[5] = xstrdup("--job");
1899 	post_run_argv[6] = bb_handle_job_script(job_ptr, bb_job);
1900 
1901 	stage_args = xmalloc(sizeof(stage_args_t));
1902 	stage_args->args1   = data_out_argv;
1903 	stage_args->args2   = post_run_argv;
1904 	stage_args->job_id  = bb_job->job_id;
1905 	stage_args->user_id = bb_job->user_id;
1906 
1907 	slurm_thread_create(&tid, _start_stage_out, stage_args);
1908 
1909 	xfree(hash_dir);
1910 	xfree(job_dir);
1911 	return rc;
1912 }
1913 
_start_stage_out(void * x)1914 static void *_start_stage_out(void *x)
1915 {
1916 	stage_args_t *stage_args = (stage_args_t *)x;
1917 	char **post_run_argv, **data_out_argv, *resp_msg = NULL, *op = NULL;
1918 	int rc = SLURM_SUCCESS, status = 0, timeout;
1919 	slurmctld_lock_t job_write_lock =
1920 		{ NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
1921 	job_record_t *job_ptr;
1922 	bb_alloc_t *bb_alloc = NULL;
1923 	bb_job_t *bb_job = NULL;
1924 	DEF_TIMERS
1925 	track_script_rec_add(stage_args->job_id, 0, pthread_self());
1926 
1927 	data_out_argv = stage_args->args1;
1928 	post_run_argv = stage_args->args2;
1929 
1930 	timeout = bb_state.bb_config.other_timeout * 1000;
1931 	op = "dws_post_run";
1932 	START_TIMER;
1933 	resp_msg = run_command("dws_post_run",
1934 			       bb_state.bb_config.get_sys_state,
1935 			       post_run_argv, timeout, pthread_self(),
1936 			       &status);
1937 	END_TIMER;
1938 	if ((DELTA_TIMER > 500000) ||	/* 0.5 secs */
1939 	    bb_state.bb_config.debug_flag) {
1940 		info("%s: %s: dws_post_run for JobId=%u ran for %s",
1941 		     plugin_type, __func__, stage_args->job_id, TIME_STR);
1942 	}
1943 
1944 	if (track_script_broadcast(pthread_self(), status)) {
1945 		/* I was killed by slurmtrack, bail out right now */
1946 		info("%s: %s: dws_post_run for JobId=%u terminated by slurmctld",
1947 		     plugin_type, __func__, stage_args->job_id);
1948 		free_command_argv(post_run_argv);
1949 		free_command_argv(data_out_argv);
1950 		xfree(resp_msg);
1951 		xfree(stage_args->pool);
1952 		xfree(stage_args);
1953 		track_script_remove(pthread_self());
1954 		return NULL;
1955 	}
1956 	track_script_reset_cpid(pthread_self(), 0);
1957 
1958 	_log_script_argv(post_run_argv, resp_msg);
1959 	lock_slurmctld(job_write_lock);
1960 	job_ptr = find_job_record(stage_args->job_id);
1961 	if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
1962 		trigger_burst_buffer();
1963 		error("%s: %s: dws_post_run for JobId=%u status:%u response:%s",
1964 		      plugin_type, __func__, stage_args->job_id, status,
1965 		      resp_msg);
1966 		rc = SLURM_ERROR;
1967 		if (job_ptr) {
1968 			job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
1969 			xfree(job_ptr->state_desc);
1970 			xstrfmtcat(job_ptr->state_desc, "%s: post_run: %s",
1971 				   plugin_type, resp_msg);
1972 			_update_system_comment(job_ptr, "post_run",
1973 					       resp_msg, 1);
1974 		}
1975 	}
1976 	if (!job_ptr) {
1977 		error("%s: %s: unable to find job record for JobId=%u",
1978 		      plugin_type, __func__, stage_args->job_id);
1979 	} else {
1980 		slurm_mutex_lock(&bb_state.bb_mutex);
1981 		bb_job = _get_bb_job(job_ptr);
1982 		if (bb_job)
1983 			bb_job->state = BB_STATE_STAGING_OUT;
1984 		slurm_mutex_unlock(&bb_state.bb_mutex);
1985 	}
1986 	unlock_slurmctld(job_write_lock);
1987 
1988 	if (rc == SLURM_SUCCESS) {
1989 		timeout = bb_state.bb_config.stage_out_timeout * 1000;
1990 		op = "dws_data_out";
1991 		START_TIMER;
1992 		xfree(resp_msg);
1993 		resp_msg = run_command("dws_data_out",
1994 				       bb_state.bb_config.get_sys_state,
1995 				       data_out_argv, timeout, pthread_self(),
1996 				       &status);
1997 		END_TIMER;
1998 		if ((DELTA_TIMER > 1000000) ||	/* 10 secs */
1999 		    bb_state.bb_config.debug_flag) {
2000 			info("%s: %s: dws_data_out for JobId=%u ran for %s",
2001 			     plugin_type, __func__, stage_args->job_id,
2002 			     TIME_STR);
2003 		}
2004 
2005 		if (track_script_broadcast(pthread_self(), status)) {
2006 			/* I was killed by slurmtrack, bail out right now */
2007 			info("%s: %s: dws_data_out for JobId=%u terminated by slurmctld",
2008 			     plugin_type, __func__, stage_args->job_id);
2009 			free_command_argv(post_run_argv);
2010 			free_command_argv(data_out_argv);
2011 			xfree(resp_msg);
2012 			xfree(stage_args->pool);
2013 			xfree(stage_args);
2014 			track_script_remove(pthread_self());
2015 			return NULL;
2016 		}
2017 		track_script_reset_cpid(pthread_self(), 0);
2018 
2019 		_log_script_argv(data_out_argv, resp_msg);
2020 		if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
2021 		    !strstr(resp_msg, "No matching session")) {
2022 			trigger_burst_buffer();
2023 			error("%s: %s: dws_data_out for JobId=%u status:%u response:%s",
2024 			      plugin_type, __func__, stage_args->job_id,
2025 			      status, resp_msg);
2026 			rc = SLURM_ERROR;
2027 			lock_slurmctld(job_write_lock);
2028 			job_ptr = find_job_record(stage_args->job_id);
2029 			if (job_ptr) {
2030 				job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
2031 				xfree(job_ptr->state_desc);
2032 				xstrfmtcat(job_ptr->state_desc,
2033 					   "%s: stage-out: %s",
2034 					   plugin_type, resp_msg);
2035 				_update_system_comment(job_ptr, "data_out",
2036 						       resp_msg, 1);
2037 			}
2038 			unlock_slurmctld(job_write_lock);
2039 		}
2040 	}
2041 
2042 	lock_slurmctld(job_write_lock);
2043 	job_ptr = find_job_record(stage_args->job_id);
2044 	if (!job_ptr) {
2045 		error("%s: %s: unable to find job record for JobId=%u",
2046 		      plugin_type, __func__, stage_args->job_id);
2047 	} else {
2048 		if (rc != SLURM_SUCCESS) {
2049 			job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
2050 			xfree(job_ptr->state_desc);
2051 			xstrfmtcat(job_ptr->state_desc, "%s: %s: %s",
2052 				   plugin_type, op, resp_msg);
2053 		} else {
2054 			job_ptr->job_state &= (~JOB_STAGE_OUT);
2055 			xfree(job_ptr->state_desc);
2056 			last_job_update = time(NULL);
2057 		}
2058 		slurm_mutex_lock(&bb_state.bb_mutex);
2059 		bb_job = _get_bb_job(job_ptr);
2060 		if ((rc == SLURM_SUCCESS) && bb_job)
2061 			bb_job->state = BB_STATE_TEARDOWN;
2062 		bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr);
2063 		if (bb_alloc) {
2064 			if (rc == SLURM_SUCCESS) {
2065 				if (bb_state.bb_config.debug_flag) {
2066 					info("%s: %s: Stage-out/post-run complete for %pJ",
2067 					     plugin_type, __func__, job_ptr);
2068 				}
2069 				/* bb_alloc->state = BB_STATE_STAGED_OUT; */
2070 				bb_alloc->state = BB_STATE_TEARDOWN;
2071 				bb_alloc->state_time = time(NULL);
2072 			} else {
2073 				if (bb_state.bb_config.flags &
2074 				    BB_FLAG_TEARDOWN_FAILURE) {
2075 					bb_alloc->state = BB_STATE_TEARDOWN;
2076 					_queue_teardown(stage_args->job_id,
2077 							stage_args->user_id,
2078 							false);
2079 				} else
2080 					bb_alloc->state = BB_STATE_STAGED_IN;
2081 				if (bb_state.bb_config.debug_flag) {
2082 					info("%s: %s: Stage-out failed for %pJ",
2083 					     plugin_type, __func__, job_ptr);
2084 				}
2085 			}
2086 			bb_state.last_update_time = time(NULL);
2087 		} else if (bb_job && bb_job->total_size) {
2088 			error("%s: %s: unable to find bb record for %pJ",
2089 			      plugin_type, __func__, job_ptr);
2090 		}
2091 		if (rc == SLURM_SUCCESS) {
2092 			_queue_teardown(stage_args->job_id, stage_args->user_id,
2093 					false);
2094 		}
2095 		slurm_mutex_unlock(&bb_state.bb_mutex);
2096 	}
2097 	unlock_slurmctld(job_write_lock);
2098 
2099 	xfree(resp_msg);
2100 	free_command_argv(post_run_argv);
2101 	free_command_argv(data_out_argv);
2102 	xfree(stage_args);
2103 
2104 	track_script_remove(pthread_self());
2105 
2106 	return NULL;
2107 }
2108 
_queue_teardown(uint32_t job_id,uint32_t user_id,bool hurry)2109 static void _queue_teardown(uint32_t job_id, uint32_t user_id, bool hurry)
2110 {
2111 	struct stat buf;
2112 	char *hash_dir = NULL, *job_script = NULL;
2113 	char **teardown_argv;
2114 	stage_args_t *teardown_args;
2115 	int fd, hash_inx = job_id % 10;
2116 	pthread_t tid;
2117 
2118 	xstrfmtcat(hash_dir, "%s/hash.%d", state_save_loc, hash_inx);
2119 	xstrfmtcat(job_script, "%s/job.%u/script", hash_dir, job_id);
2120 	if (stat(job_script, &buf) == -1) {
2121 		xfree(job_script);
2122 		xstrfmtcat(job_script, "%s/burst_buffer_script",
2123 			   state_save_loc);
2124 		if (stat(job_script, &buf) == -1) {
2125 			fd = creat(job_script, 0755);
2126 			if (fd >= 0) {
2127 				int len;
2128 				char *dummy_script = "#!/bin/bash\nexit 0\n";
2129 				len = strlen(dummy_script) + 1;
2130 				if (write(fd, dummy_script, len) != len) {
2131 					verbose("%s: %s: write(%s): %m",
2132 						plugin_type, __func__,
2133 						job_script);
2134 				}
2135 				close(fd);
2136 			}
2137 		}
2138 	}
2139 
2140 	teardown_argv = xcalloc(10, sizeof(char *));	/* NULL terminated */
2141 	teardown_argv[0] = xstrdup("dw_wlm_cli");
2142 	teardown_argv[1] = xstrdup("--function");
2143 	teardown_argv[2] = xstrdup("teardown");
2144 	teardown_argv[3] = xstrdup("--token");
2145 	xstrfmtcat(teardown_argv[4], "%u", job_id);
2146 	teardown_argv[5] = xstrdup("--job");
2147 	teardown_argv[6] = xstrdup(job_script);
2148 	if (hurry)
2149 		teardown_argv[7] = xstrdup("--hurry");
2150 
2151 	teardown_args = xmalloc(sizeof(stage_args_t));
2152 	teardown_args->job_id  = job_id;
2153 	teardown_args->user_id = user_id;
2154 	teardown_args->args1   = teardown_argv;
2155 
2156 	slurm_thread_create(&tid, _start_teardown, teardown_args);
2157 
2158 	xfree(hash_dir);
2159 	xfree(job_script);
2160 }
2161 
_start_teardown(void * x)2162 static void *_start_teardown(void *x)
2163 {
2164 	static uint32_t previous_job_id = 0;
2165 	stage_args_t *teardown_args = (stage_args_t *)x;
2166 	char **teardown_argv, *resp_msg = NULL;
2167 	int status = 0, timeout;
2168 	job_record_t *job_ptr;
2169 	bb_alloc_t *bb_alloc = NULL;
2170 	bb_job_t *bb_job = NULL;
2171 	/* Locks: write job */
2172 	slurmctld_lock_t job_write_lock = {
2173 		NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
2174 	DEF_TIMERS;
2175 	bool hurry;
2176 	track_script_rec_add(teardown_args->job_id, 0, pthread_self());
2177 
2178 	teardown_argv = teardown_args->args1;
2179 
2180 	if (previous_job_id == teardown_args->job_id)
2181 		sleep(5);
2182 	previous_job_id = teardown_args->job_id;
2183 
2184 	START_TIMER;
2185 	timeout = bb_state.bb_config.other_timeout * 1000;
2186 	resp_msg = run_command("teardown",
2187 			       bb_state.bb_config.get_sys_state,
2188 			       teardown_argv, timeout, pthread_self(),
2189 			       &status);
2190 	END_TIMER;
2191 	info("%s: %s: teardown for JobId=%u ran for %s",
2192 	     plugin_type, __func__, teardown_args->job_id, TIME_STR);
2193 
2194 	if (track_script_broadcast(pthread_self(), status)) {
2195 		/* I was killed by slurmtrack, bail out right now */
2196 		info("%s: %s: teardown for JobId=%u terminated by slurmctld",
2197 		     plugin_type, __func__, teardown_args->job_id);
2198 		xfree(resp_msg);
2199 		free_command_argv(teardown_argv);
2200 		xfree(teardown_args);
2201 		track_script_remove(pthread_self());
2202 		return NULL;
2203 	}
2204 	/* track_script_reset_cpid(pthread_self(), 0); */
2205 
2206 	_log_script_argv(teardown_argv, resp_msg);
2207 
2208 	/*
2209 	 * "Teardown" is run at every termination of every job that _might_
2210 	 * have a burst buffer, so an error of "token not found" should be
2211 	 * fairly common and not indicative of a problem.
2212 	 */
2213 	if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
2214 	    (!resp_msg ||
2215 	     (!strstr(resp_msg, "No matching session") &&
2216 	      !strstr(resp_msg, "token not found")))) {
2217 		lock_slurmctld(job_write_lock);
2218 		slurm_mutex_lock(&bb_state.bb_mutex);
2219 		job_ptr = find_job_record(teardown_args->job_id);
2220 		if (job_ptr &&
2221 		    (bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr))) {
2222 			bb_alloc->state = BB_STATE_TEARDOWN_FAIL;
2223 		}
2224 		slurm_mutex_unlock(&bb_state.bb_mutex);
2225 		unlock_slurmctld(job_write_lock);
2226 
2227 		trigger_burst_buffer();
2228 		error("%s: %s: teardown for JobId=%u status:%u response:%s",
2229 		      plugin_type, __func__, teardown_args->job_id, status,
2230 		      resp_msg);
2231 
2232 
2233 		lock_slurmctld(job_write_lock);
2234 		job_ptr = find_job_record(teardown_args->job_id);
2235 		if (job_ptr) {
2236 			job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
2237 			xfree(job_ptr->state_desc);
2238 			xstrfmtcat(job_ptr->state_desc, "%s: teardown: %s",
2239 				   plugin_type, resp_msg);
2240 			_update_system_comment(job_ptr, "teardown",
2241 					       resp_msg, 0);
2242 		}
2243 		unlock_slurmctld(job_write_lock);
2244 
2245 
2246 		if (!xstrcmp(teardown_argv[7], "--hurry"))
2247 			hurry = true;
2248 		else
2249 			hurry = false;
2250 		_queue_teardown(teardown_args->job_id, teardown_args->user_id,
2251 				hurry);
2252 	} else {
2253 		lock_slurmctld(job_write_lock);
2254 		slurm_mutex_lock(&bb_state.bb_mutex);
2255 		job_ptr = find_job_record(teardown_args->job_id);
2256 		_purge_bb_files(teardown_args->job_id, job_ptr);
2257 		if (job_ptr) {
2258 			if ((bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr))){
2259 				bb_limit_rem(bb_alloc->user_id, bb_alloc->size,
2260 					     bb_alloc->pool, &bb_state);
2261 				(void) bb_free_alloc_rec(&bb_state, bb_alloc);
2262 			}
2263 			if ((bb_job = _get_bb_job(job_ptr)))
2264 				bb_job->state = BB_STATE_COMPLETE;
2265 			job_ptr->job_state &= (~JOB_STAGE_OUT);
2266 			if (!IS_JOB_PENDING(job_ptr) &&	/* No email if requeue */
2267 			    (job_ptr->mail_type & MAIL_JOB_STAGE_OUT)) {
2268 				/*
2269 				 * NOTE: If a job uses multiple burst buffer
2270 				 * plugins, the message will be sent after the
2271 				 * teardown completes in the first plugin
2272 				 */
2273 				mail_job_info(job_ptr, MAIL_JOB_STAGE_OUT);
2274 				job_ptr->mail_type &= (~MAIL_JOB_STAGE_OUT);
2275 			}
2276 		} else {
2277 			/*
2278 			 * This will happen when slurmctld restarts and needs
2279 			 * to clear vestigial buffers
2280 			 */
2281 			char buf_name[32];
2282 			snprintf(buf_name, sizeof(buf_name), "%u",
2283 				 teardown_args->job_id);
2284 			bb_alloc = bb_find_name_rec(buf_name,
2285 						    teardown_args->user_id,
2286 						    &bb_state);
2287 			if (bb_alloc) {
2288 				bb_limit_rem(bb_alloc->user_id, bb_alloc->size,
2289 					     bb_alloc->pool, &bb_state);
2290 				(void) bb_free_alloc_rec(&bb_state, bb_alloc);
2291 			}
2292 
2293 		}
2294 		slurm_mutex_unlock(&bb_state.bb_mutex);
2295 		unlock_slurmctld(job_write_lock);
2296 	}
2297 
2298 	xfree(resp_msg);
2299 	free_command_argv(teardown_argv);
2300 	xfree(teardown_args);
2301 
2302 	track_script_remove(pthread_self());
2303 
2304 	return NULL;
2305 }
2306 
2307 /* Reduced burst buffer space in advanced reservation for resources already
2308  * allocated to jobs. What's left is space reserved for future jobs */
_rm_active_job_bb(char * resv_name,char ** pool_name,int64_t * resv_space,int ds_len)2309 static void _rm_active_job_bb(char *resv_name, char **pool_name,
2310 			      int64_t *resv_space, int ds_len)
2311 {
2312 	ListIterator job_iterator;
2313 	job_record_t *job_ptr;
2314 	bb_job_t *bb_job;
2315 	int i;
2316 
2317 	job_iterator = list_iterator_create(job_list);
2318 	while ((job_ptr = list_next(job_iterator))) {
2319 		if ((job_ptr->burst_buffer == NULL) ||
2320 		    (job_ptr->burst_buffer[0] == '\0') ||
2321 		    (xstrcmp(job_ptr->resv_name, resv_name) == 0))
2322 			continue;
2323 		bb_job = bb_job_find(&bb_state,job_ptr->job_id);
2324 		if (!bb_job || (bb_job->state <= BB_STATE_PENDING) ||
2325 		    (bb_job->state >= BB_STATE_COMPLETE))
2326 			continue;
2327 		for (i = 0; i < ds_len; i++) {
2328 			if (xstrcmp(bb_job->job_pool, pool_name[i]))
2329 				continue;
2330 			if (resv_space[i] >= bb_job->total_size)
2331 				resv_space[i] -= bb_job->total_size;
2332 			else
2333 				resv_space[i] = 0;
2334 			break;
2335 		}
2336 	}
2337 	list_iterator_destroy(job_iterator);
2338 }
2339 
2340 /* Test if a job can be allocated a burst buffer.
2341  * This may preempt currently active stage-in for higher priority jobs.
2342  *
2343  * RET 0: Job can be started now
2344  *     1: Job exceeds configured limits, continue testing with next job
2345  *     2: Job needs more resources than currently available can not start,
2346  *        skip all remaining jobs
2347  */
_test_size_limit(job_record_t * job_ptr,bb_job_t * bb_job)2348 static int _test_size_limit(job_record_t *job_ptr, bb_job_t *bb_job)
2349 {
2350 	int64_t *add_space = NULL, *avail_space = NULL, *granularity = NULL;
2351 	int64_t *preempt_space = NULL, *resv_space = NULL, *total_space = NULL;
2352 	uint64_t unfree_space;
2353 	burst_buffer_info_msg_t *resv_bb = NULL;
2354 	struct preempt_bb_recs *preempt_ptr = NULL;
2355 	char **pool_name, *my_pool;
2356 	int ds_len;
2357 	burst_buffer_pool_t *pool_ptr;
2358 	bb_buf_t *buf_ptr;
2359 	bb_alloc_t *bb_ptr = NULL;
2360 	int i, j, k, rc = 0;
2361 	bool avail_ok, do_preempt, preempt_ok;
2362 	time_t now = time(NULL);
2363 	List preempt_list = NULL;
2364 	ListIterator preempt_iter;
2365 
2366 	xassert(bb_job);
2367 
2368 	/* Initialize data structure */
2369 	ds_len = bb_state.bb_config.pool_cnt + 1;
2370 	add_space = xcalloc(ds_len, sizeof(int64_t));
2371 	avail_space = xcalloc(ds_len, sizeof(int64_t));
2372 	granularity = xcalloc(ds_len, sizeof(int64_t));
2373 	pool_name = xcalloc(ds_len, sizeof(char *));
2374 	preempt_space = xcalloc(ds_len, sizeof(int64_t));
2375 	resv_space = xcalloc(ds_len, sizeof(int64_t));
2376 	total_space = xcalloc(ds_len, sizeof(int64_t));
2377 	for (i = 0, pool_ptr = bb_state.bb_config.pool_ptr;
2378 	     i < bb_state.bb_config.pool_cnt; i++, pool_ptr++) {
2379 		unfree_space = MAX(pool_ptr->used_space,
2380 				   pool_ptr->unfree_space);
2381 		if (pool_ptr->total_space >= unfree_space)
2382 			avail_space[i] = pool_ptr->total_space - unfree_space;
2383 		granularity[i] = pool_ptr->granularity;
2384 		pool_name[i] = pool_ptr->name;
2385 		total_space[i] = pool_ptr->total_space;
2386 	}
2387 	unfree_space = MAX(bb_state.used_space, bb_state.unfree_space);
2388 	if (bb_state.total_space - unfree_space)
2389 		avail_space[i] = bb_state.total_space - unfree_space;
2390 	granularity[i] = bb_state.bb_config.granularity;
2391 	pool_name[i] = bb_state.bb_config.default_pool;
2392 	total_space[i] = bb_state.total_space;
2393 
2394 	/* Determine job size requirements by pool */
2395 	if (bb_job->total_size) {
2396 		for (j = 0; j < ds_len; j++) {
2397 			if (!xstrcmp(bb_job->job_pool, pool_name[j])) {
2398 				add_space[j] += bb_granularity(
2399 							bb_job->total_size,
2400 							granularity[j]);
2401 				break;
2402 			}
2403 		}
2404 	}
2405 	for (i = 0, buf_ptr = bb_job->buf_ptr; i < bb_job->buf_cnt;
2406 	     i++, buf_ptr++) {
2407 		if (!buf_ptr->create || (buf_ptr->state >= BB_STATE_ALLOCATING))
2408 			continue;
2409 		for (j = 0; j < ds_len; j++) {
2410 			if (!xstrcmp(buf_ptr->pool, pool_name[j])) {
2411 				add_space[j] += bb_granularity(buf_ptr->size,
2412 							       granularity[j]);
2413 				break;
2414 			}
2415 		}
2416 	}
2417 
2418 	/* Account for reserved resources. Reduce reservation size for
2419 	 * resources already claimed from the reservation. Assume node reboot
2420 	 * required since we have not selected the compute nodes yet. */
2421 	resv_bb = job_test_bb_resv(job_ptr, now, true);
2422 	if (resv_bb) {
2423 		burst_buffer_info_t *resv_bb_ptr;
2424 		for (i = 0, resv_bb_ptr = resv_bb->burst_buffer_array;
2425 		     i < resv_bb->record_count; i++, resv_bb_ptr++) {
2426 			if (xstrcmp(resv_bb_ptr->name, bb_state.name))
2427 				continue;
2428 			for (j = 0, pool_ptr = resv_bb_ptr->pool_ptr;
2429 			     j < resv_bb_ptr->pool_cnt; j++, pool_ptr++) {
2430 				if (pool_ptr->name) {
2431 					my_pool = pool_ptr->name;
2432 				} else {
2433 					my_pool =
2434 						bb_state.bb_config.default_pool;
2435 				}
2436 				unfree_space = MAX(pool_ptr->used_space,
2437 						   pool_ptr->unfree_space);
2438 				for (k = 0; k < ds_len; k++) {
2439 					if (xstrcmp(my_pool, pool_name[k]))
2440 						continue;
2441 					resv_space[k] += bb_granularity(
2442 							unfree_space,
2443 							granularity[k]);
2444 					break;
2445 				}
2446 			}
2447 			if (resv_bb_ptr->used_space) {
2448 				/* Pool not specified, use default */
2449 				my_pool = bb_state.bb_config.default_pool;
2450 				for (k = 0; k < ds_len; k++) {
2451 					if (xstrcmp(my_pool, pool_name[k]))
2452 						continue;
2453 					resv_space[k] += bb_granularity(
2454 							resv_bb_ptr->used_space,
2455 							granularity[k]);
2456 					break;
2457 				}
2458 			}
2459 #if 1
2460 			/* Is any of this reserved space already taken? */
2461 			_rm_active_job_bb(job_ptr->resv_name,
2462 					  pool_name, resv_space, ds_len);
2463 #endif
2464 		}
2465 	}
2466 
2467 #if _DEBUG
2468 	info("TEST_SIZE_LIMIT for %pJ", job_ptr);
2469 	for (j = 0; j < ds_len; j++) {
2470 		info("POOL:%s ADD:%"PRIu64" AVAIL:%"PRIu64
2471 		     " GRANULARITY:%"PRIu64" RESV:%"PRIu64" TOTAL:%"PRIu64,
2472 		     pool_name[j], add_space[j], avail_space[j], granularity[j],
2473 		     resv_space[j], total_space[j]);
2474 	}
2475 #endif
2476 
2477 	/* Determine if resources currently are available for the job */
2478 	avail_ok = true;
2479 	for (j = 0; j < ds_len; j++) {
2480 		if (add_space[j] > total_space[j]) {
2481 			rc = 1;
2482 			goto fini;
2483 		}
2484 		if ((add_space[j] + resv_space[j]) > avail_space[j])
2485 			avail_ok = false;
2486 	}
2487 	if (avail_ok) {
2488 		rc = 0;
2489 		goto fini;
2490 	}
2491 
2492 	/* Identify candidate burst buffers to revoke for higher priority job */
2493 	preempt_list = list_create(bb_job_queue_del);
2494 	for (i = 0; i < BB_HASH_SIZE; i++) {
2495 		bb_ptr = bb_state.bb_ahash[i];
2496 		while (bb_ptr) {
2497 			if ((bb_ptr->job_id != 0) &&
2498 			    ((bb_ptr->name == NULL) ||
2499 			     ((bb_ptr->name[0] >= '0') &&
2500 			      (bb_ptr->name[0] <= '9'))) &&
2501 			    (bb_ptr->use_time > now) &&
2502 			    (bb_ptr->use_time > job_ptr->start_time)) {
2503 				if (!bb_ptr->pool) {
2504 					bb_ptr->name = xstrdup(
2505 						bb_state.bb_config.default_pool);
2506 				}
2507 				preempt_ptr = xmalloc(sizeof(
2508 						struct preempt_bb_recs));
2509 				preempt_ptr->bb_ptr = bb_ptr;
2510 				preempt_ptr->job_id = bb_ptr->job_id;
2511 				preempt_ptr->pool = bb_ptr->name;
2512 				preempt_ptr->size = bb_ptr->size;
2513 				preempt_ptr->use_time = bb_ptr->use_time;
2514 				preempt_ptr->user_id = bb_ptr->user_id;
2515 				list_push(preempt_list, preempt_ptr);
2516 
2517 				for (j = 0; j < ds_len; j++) {
2518 					if (xstrcmp(bb_ptr->name, pool_name[j]))
2519 						continue;
2520 					preempt_ptr->size = bb_granularity(
2521 								bb_ptr->size,
2522 								granularity[j]);
2523 					preempt_space[j] += preempt_ptr->size;
2524 					break;
2525 				}
2526 			}
2527 			bb_ptr = bb_ptr->next;
2528 		}
2529 	}
2530 
2531 #if _DEBUG
2532 	for (j = 0; j < ds_len; j++) {
2533 		info("POOL:%s ADD:%"PRIu64" AVAIL:%"PRIu64
2534 		     " GRANULARITY:%"PRIu64" PREEMPT:%"PRIu64
2535 		     " RESV:%"PRIu64" TOTAL:%"PRIu64,
2536 		     pool_name[j], add_space[j], avail_space[j], granularity[j],
2537 		     preempt_space[j], resv_space[j], total_space[j]);
2538 	}
2539 #endif
2540 
2541 	/* Determine if sufficient resources available after preemption */
2542 	rc = 2;
2543 	preempt_ok = true;
2544 	for (j = 0; j < ds_len; j++) {
2545 		if ((add_space[j] + resv_space[j]) >
2546 		    (avail_space[j] + preempt_space[j])) {
2547 			preempt_ok = false;
2548 			break;
2549 		}
2550 	}
2551 	if (!preempt_ok)
2552 		goto fini;
2553 
2554 	/* Now preempt/teardown the most appropriate buffers */
2555 	list_sort(preempt_list, bb_preempt_queue_sort);
2556 	preempt_iter = list_iterator_create(preempt_list);
2557 	while ((preempt_ptr = list_next(preempt_iter))) {
2558 		do_preempt = false;
2559 		for (j = 0; j < ds_len; j++) {
2560 			if (xstrcmp(preempt_ptr->pool, pool_name[j]))
2561 				continue;
2562 			if ((add_space[j] + resv_space[j]) > avail_space[j]) {
2563 				avail_space[j] += preempt_ptr->size;
2564 				preempt_space[j] -= preempt_ptr->size;
2565 				do_preempt = true;
2566 			}
2567 			break;
2568 		}
2569 		if (do_preempt) {
2570 			preempt_ptr->bb_ptr->cancelled = true;
2571 			preempt_ptr->bb_ptr->end_time = 0;
2572 			preempt_ptr->bb_ptr->state = BB_STATE_TEARDOWN;
2573 			preempt_ptr->bb_ptr->state_time = time(NULL);
2574 			_queue_teardown(preempt_ptr->job_id,
2575 					preempt_ptr->user_id, true);
2576 			if (bb_state.bb_config.debug_flag) {
2577 				info("%s: %s: Preempting stage-in of JobId=%u for %pJ",
2578 				     plugin_type, __func__,
2579 				     preempt_ptr->job_id, job_ptr);
2580 			}
2581 		}
2582 
2583 	}
2584 	list_iterator_destroy(preempt_iter);
2585 
2586 fini:	xfree(add_space);
2587 	xfree(avail_space);
2588 	xfree(granularity);
2589 	xfree(pool_name);
2590 	xfree(preempt_space);
2591 	xfree(resv_space);
2592 	xfree(total_space);
2593 	if (resv_bb)
2594 		slurm_free_burst_buffer_info_msg(resv_bb);
2595 	FREE_NULL_LIST(preempt_list);
2596 	return rc;
2597 }
2598 
2599 /* Handle timeout of burst buffer events:
2600  * 1. Purge per-job burst buffer records when the stage-out has completed and
2601  *    the job has been purged from Slurm
2602  * 2. Test for StageInTimeout events
2603  * 3. Test for StageOutTimeout events
2604  */
_timeout_bb_rec(void)2605 static void _timeout_bb_rec(void)
2606 {
2607 	bb_alloc_t **bb_pptr, *bb_alloc = NULL;
2608 	job_record_t *job_ptr;
2609 	int i;
2610 
2611 	if (bb_state.bb_config.flags & BB_FLAG_EMULATE_CRAY)
2612 		return;
2613 
2614 	for (i = 0; i < BB_HASH_SIZE; i++) {
2615 		bb_pptr = &bb_state.bb_ahash[i];
2616 		bb_alloc = bb_state.bb_ahash[i];
2617 		while (bb_alloc) {
2618 			if (((bb_alloc->seen_time + TIME_SLOP) <
2619 			     bb_state.last_load_time) &&
2620 			    (bb_alloc->state == BB_STATE_TEARDOWN)) {
2621 				/*
2622 				 * Teardown likely complete, but bb_alloc state
2623 				 * not yet updated; skip the record
2624 				 */
2625 			} else if ((bb_alloc->seen_time + TIME_SLOP) <
2626 				   bb_state.last_load_time) {
2627 				assoc_mgr_lock_t assoc_locks =
2628 					{ .assoc = READ_LOCK,
2629 					  .qos = READ_LOCK };
2630 				/*
2631 				 * assoc_mgr needs locking to call
2632 				 * bb_post_persist_delete
2633 				 */
2634 				if (bb_alloc->job_id == 0) {
2635 					info("%s: %s: Persistent burst buffer %s purged",
2636 					     plugin_type, __func__,
2637 					     bb_alloc->name);
2638 				} else if (bb_state.bb_config.debug_flag) {
2639 					info("%s: %s: burst buffer for JobId=%u purged",
2640 					     plugin_type,
2641 					     __func__, bb_alloc->job_id);
2642 				}
2643 				bb_limit_rem(bb_alloc->user_id, bb_alloc->size,
2644 					     bb_alloc->pool, &bb_state);
2645 
2646 				assoc_mgr_lock(&assoc_locks);
2647 				bb_post_persist_delete(bb_alloc, &bb_state);
2648 				assoc_mgr_unlock(&assoc_locks);
2649 
2650 				*bb_pptr = bb_alloc->next;
2651 				bb_free_alloc_buf(bb_alloc);
2652 				break;
2653 			} else if (bb_alloc->state == BB_STATE_COMPLETE) {
2654 				job_ptr = find_job_record(bb_alloc->job_id);
2655 				if (!job_ptr || IS_JOB_PENDING(job_ptr)) {
2656 					/* Job purged or BB preempted */
2657 					*bb_pptr = bb_alloc->next;
2658 					bb_free_alloc_buf(bb_alloc);
2659 					break;
2660 				}
2661 			}
2662 			bb_pptr = &bb_alloc->next;
2663 			bb_alloc = bb_alloc->next;
2664 		}
2665 	}
2666 }
2667 
2668 /* Perform basic burst_buffer option validation */
_parse_bb_opts(job_desc_msg_t * job_desc,uint64_t * bb_size,uid_t submit_uid)2669 static int _parse_bb_opts(job_desc_msg_t *job_desc, uint64_t *bb_size,
2670 			  uid_t submit_uid)
2671 {
2672 	char *bb_script, *save_ptr = NULL;
2673 	char *bb_name = NULL, *bb_pool, *capacity;
2674 	char *end_ptr = NULL, *sub_tok, *tok;
2675 	uint64_t tmp_cnt, swap_cnt = 0;
2676 	int rc = SLURM_SUCCESS;
2677 	bool enable_persist = false, have_bb = false, have_stage_out = false;
2678 
2679 	xassert(bb_size);
2680 	*bb_size = 0;
2681 
2682 	if (validate_operator(submit_uid) ||
2683 	    (bb_state.bb_config.flags & BB_FLAG_ENABLE_PERSISTENT))
2684 		enable_persist = true;
2685 
2686 	if (job_desc->script)
2687 		rc = _xlate_batch(job_desc);
2688 	else
2689 		rc = _xlate_interactive(job_desc);
2690 	if ((rc != SLURM_SUCCESS) || (!job_desc->burst_buffer))
2691 		return rc;
2692 
2693 	bb_script = xstrdup(job_desc->burst_buffer);
2694 	tok = strtok_r(bb_script, "\n", &save_ptr);
2695 	while (tok) {
2696 		uint32_t bb_flag = 0;
2697 		tmp_cnt = 0;
2698 		if (tok[0] != '#')
2699 			break;	/* Quit at first non-comment */
2700 
2701 		if ((tok[1] == 'B') && (tok[2] == 'B'))
2702 			bb_flag = BB_FLAG_BB_OP;
2703 		else if ((tok[1] == 'D') && (tok[2] == 'W'))
2704 			bb_flag = BB_FLAG_DW_OP;
2705 
2706 		/*
2707 		 * Effective Slurm v18.08 and CLE6.0UP06 the create_persistent
2708 		 * and destroy_persistent functions are directly supported by
2709 		 * dw_wlm_cli. Support "#BB" format for backward compatibility.
2710 		 */
2711 		if (bb_flag == BB_FLAG_BB_OP) {
2712 			tok += 3;
2713 			while (isspace(tok[0]))
2714 				tok++;
2715 			if (!xstrncmp(tok, "create_persistent", 17) &&
2716 			    !enable_persist) {
2717 				info("%s: %s: User %d disabled from creating persistent burst buffer",
2718 				     plugin_type, __func__, submit_uid);
2719 				rc = ESLURM_BURST_BUFFER_PERMISSION;
2720 				break;
2721 			} else if (!xstrncmp(tok, "create_persistent", 17)) {
2722 				have_bb = true;
2723 				bb_name = NULL;
2724 				bb_pool = NULL;
2725 				if ((sub_tok = strstr(tok, "capacity="))) {
2726 					tmp_cnt = bb_get_size_num(sub_tok+9, 1);
2727 				}
2728 				if (tmp_cnt == 0)
2729 					rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
2730 				if ((sub_tok = strstr(tok, "name="))) {
2731 					bb_name = xstrdup(sub_tok + 5);
2732 					if ((sub_tok = strchr(bb_name, ' ')))
2733 						sub_tok[0] = '\0';
2734 				} else {
2735 					rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
2736 				}
2737 				if (!bb_name ||
2738 				    ((bb_name[0] >= '0') &&
2739 				     (bb_name[0] <= '9')))
2740 					rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
2741 				xfree(bb_name);
2742 				if ((sub_tok = strstr(tok, "pool="))) {
2743 					bb_pool = xstrdup(sub_tok + 5);
2744 					if ((sub_tok = strchr(bb_pool, ' ')))
2745 						sub_tok[0] = '\0';
2746 				}
2747 				if (!bb_valid_pool_test(&bb_state, bb_pool))
2748 					rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
2749 				*bb_size += _set_granularity(tmp_cnt, bb_pool);
2750 				xfree(bb_pool);
2751 				if (rc != SLURM_SUCCESS)
2752 					break;
2753 			} else if (!xstrncmp(tok, "destroy_persistent", 18) &&
2754 				   !enable_persist) {
2755 				info("%s: %s: User %d disabled from destroying persistent burst buffer",
2756 				     plugin_type, __func__, submit_uid);
2757 				rc = ESLURM_BURST_BUFFER_PERMISSION;
2758 				break;
2759 			} else if (!xstrncmp(tok, "destroy_persistent", 18)) {
2760 				have_bb = true;
2761 				if (!(sub_tok = strstr(tok, "name="))) {
2762 					rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
2763 					break;
2764 				}
2765 			} else {
2766 				/* Ignore other (future) options */
2767 			}
2768 		}
2769 		if (bb_flag == BB_FLAG_DW_OP) {
2770 			tok += 3;
2771 			while (isspace(tok[0]))
2772 				tok++;
2773 			if (!xstrncmp(tok, "jobdw", 5) &&
2774 			    (capacity = strstr(tok, "capacity="))) {
2775 				bb_pool = NULL;
2776 				have_bb = true;
2777 				tmp_cnt = bb_get_size_num(capacity + 9, 1);
2778 				if (tmp_cnt == 0) {
2779 					rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
2780 					break;
2781 				}
2782 				if ((sub_tok = strstr(tok, "pool="))) {
2783 					bb_pool = xstrdup(sub_tok + 5);
2784 					if ((sub_tok = strchr(bb_pool, ' ')))
2785 						sub_tok[0] = '\0';
2786 				}
2787 				if (!bb_valid_pool_test(&bb_state, bb_pool))
2788 					rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
2789 				*bb_size += _set_granularity(tmp_cnt, bb_pool);
2790 				xfree(bb_pool);
2791 			} else if (!xstrncmp(tok, "persistentdw", 12)) {
2792 				have_bb = true;
2793 			} else if (!xstrncmp(tok, "swap", 4)) {
2794 				bb_pool = NULL;
2795 				have_bb = true;
2796 				tok += 4;
2797 				while (isspace(tok[0]) && (tok[0] != '\0'))
2798 					tok++;
2799 				swap_cnt += strtol(tok, &end_ptr, 10);
2800 				if ((job_desc->max_nodes == 0) ||
2801 				    (job_desc->max_nodes == NO_VAL)) {
2802 					info("%s: %s: user %u submitted job with swap space specification, but no max node count specification",
2803 					     plugin_type, __func__,
2804 					     job_desc->user_id);
2805 					if (job_desc->min_nodes == NO_VAL)
2806 						job_desc->min_nodes = 1;
2807 					job_desc->max_nodes =
2808 						job_desc->min_nodes;
2809 				}
2810 				tmp_cnt = swap_cnt * job_desc->max_nodes;
2811 				if ((sub_tok = strstr(tok, "pool="))) {
2812 					bb_pool = xstrdup(sub_tok + 5);
2813 					if ((sub_tok = strchr(bb_pool, ' ')))
2814 						sub_tok[0] = '\0';
2815 				}
2816 				if (!bb_valid_pool_test(&bb_state, bb_pool))
2817 					rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
2818 				*bb_size += _set_granularity(tmp_cnt, bb_pool);
2819 				xfree(bb_pool);
2820 			} else if (!xstrncmp(tok, "stage_out", 9)) {
2821 				have_stage_out = true;
2822 			} else if (!xstrncmp(tok, "create_persistent", 17) ||
2823 				   !xstrncmp(tok, "destroy_persistent", 18)) {
2824 				/*
2825 				 * Disable support until Slurm v18.08 to prevent
2826 				 * user directed persistent burst buffer changes
2827 				 * outside of Slurm control.
2828 				 */
2829 				rc = ESLURM_BURST_BUFFER_PERMISSION;
2830 				break;
2831 
2832 			}
2833 		}
2834 		tok = strtok_r(NULL, "\n", &save_ptr);
2835 	}
2836 	xfree(bb_script);
2837 
2838 	if (!have_bb)
2839 		rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
2840 
2841 	if (!have_stage_out) {
2842 		/* prevent sending stage out email */
2843 		job_desc->mail_type &= (~MAIL_JOB_STAGE_OUT);
2844 	}
2845 
2846 	return rc;
2847 }
2848 
2849 /* Copy a batch job's burst_buffer options into a separate buffer.
2850  * merge continued lines into a single line */
_xlate_batch(job_desc_msg_t * job_desc)2851 static int _xlate_batch(job_desc_msg_t *job_desc)
2852 {
2853 	char *script, *save_ptr = NULL, *tok;
2854 	int line_type, prev_type = LINE_OTHER;
2855 	bool is_cont = false, has_space = false;
2856 	int len, rc = SLURM_SUCCESS;
2857 
2858 	/*
2859 	 * Any command line --bb options get added to the script
2860 	 */
2861 	if (job_desc->burst_buffer) {
2862 		rc = _xlate_interactive(job_desc);
2863 		if (rc != SLURM_SUCCESS)
2864 			return rc;
2865 		_add_bb_to_script(&job_desc->script, job_desc->burst_buffer);
2866 		xfree(job_desc->burst_buffer);
2867 	}
2868 
2869 	script = xstrdup(job_desc->script);
2870 	tok = strtok_r(script, "\n", &save_ptr);
2871 	while (tok) {
2872 		if (tok[0] != '#')
2873 			break;	/* Quit at first non-comment */
2874 
2875 		if ((tok[1] == 'B') && (tok[2] == 'B'))
2876 			line_type = LINE_BB;
2877 		else if ((tok[1] == 'D') && (tok[2] == 'W'))
2878 			line_type = LINE_DW;
2879 		else
2880 			line_type = LINE_OTHER;
2881 
2882 		if (line_type == LINE_OTHER) {
2883 			is_cont = false;
2884 		} else {
2885 			if (is_cont) {
2886 				if (line_type != prev_type) {
2887 					/*
2888 					 * Mixing "#DW" with "#BB" on same
2889 					 * (continued) line, error
2890 					 */
2891 					rc =ESLURM_INVALID_BURST_BUFFER_REQUEST;
2892 					break;
2893 				}
2894 				tok += 3; 	/* Skip "#DW" or "#BB" */
2895 				while (has_space && isspace(tok[0]))
2896 					tok++;	/* Skip duplicate spaces */
2897 			} else if (job_desc->burst_buffer) {
2898 				xstrcat(job_desc->burst_buffer, "\n");
2899 			}
2900 			prev_type = line_type;
2901 
2902 			len = strlen(tok);
2903 			if (tok[len - 1] == '\\') {
2904 				has_space = isspace(tok[len - 2]);
2905 				tok[strlen(tok) - 1] = '\0';
2906 				is_cont = true;
2907 			} else {
2908 				is_cont = false;
2909 			}
2910 			xstrcat(job_desc->burst_buffer, tok);
2911 		}
2912 		tok = strtok_r(NULL, "\n", &save_ptr);
2913 	}
2914 	xfree(script);
2915 	if (rc != SLURM_SUCCESS)
2916 		xfree(job_desc->burst_buffer);
2917 	return rc;
2918 }
2919 
2920 /* Parse simple interactive burst_buffer options into an format identical to
2921  * burst_buffer options in a batch script file */
_xlate_interactive(job_desc_msg_t * job_desc)2922 static int _xlate_interactive(job_desc_msg_t *job_desc)
2923 {
2924 	char *access = NULL, *bb_copy = NULL, *capacity = NULL, *pool = NULL;
2925 	char *swap = NULL, *type = NULL;
2926 	char *end_ptr = NULL, *sep, *tok;
2927 	uint64_t buf_size = 0, swap_cnt = 0;
2928 	int i, rc = SLURM_SUCCESS, tok_len;
2929 
2930 	if (!job_desc->burst_buffer || (job_desc->burst_buffer[0] == '#'))
2931 		return rc;
2932 
2933 	if (strstr(job_desc->burst_buffer, "create_persistent") ||
2934 	    strstr(job_desc->burst_buffer, "destroy_persistent")) {
2935 		/* Create or destroy of persistent burst buffers NOT supported
2936 		 * via --bb option. Use --bbf or a batch script instead. */
2937 		return ESLURM_INVALID_BURST_BUFFER_REQUEST;
2938 	}
2939 
2940 	bb_copy = xstrdup(job_desc->burst_buffer);
2941 	if ((tok = strstr(bb_copy, "access="))) {
2942 		access = xstrdup(tok + 7);
2943 		sep = strchr(access, ',');
2944 		if (sep)
2945 			sep[0] = '\0';
2946 		sep = strchr(access, ' ');
2947 		if (sep)
2948 			sep[0] = '\0';
2949 		tok_len = strlen(access) + 7;
2950 		memset(tok, ' ', tok_len);
2951 	}
2952 	if ((access == NULL) &&		/* Not set above with "access=" */
2953 	    (tok = strstr(bb_copy, "access_mode="))) {
2954 		access = xstrdup(tok + 12);
2955 		sep = strchr(access, ',');
2956 		if (sep)
2957 			sep[0] = '\0';
2958 		sep = strchr(access, ' ');
2959 		if (sep)
2960 			sep[0] = '\0';
2961 		tok_len = strlen(access) + 12;
2962 		memset(tok, ' ', tok_len);
2963 	}
2964 
2965 	if ((tok = strstr(bb_copy, "capacity="))) {
2966 		buf_size = bb_get_size_num(tok + 9, 1);
2967 		if (buf_size == 0) {
2968 			rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
2969 			goto fini;
2970 		}
2971 		capacity = xstrdup(tok + 9);
2972 		sep = strchr(capacity, ',');
2973 		if (sep)
2974 			sep[0] = '\0';
2975 		sep = strchr(capacity, ' ');
2976 		if (sep)
2977 			sep[0] = '\0';
2978 		tok_len = strlen(capacity) + 9;
2979 		memset(tok, ' ', tok_len);
2980 	}
2981 
2982 
2983 	if ((tok = strstr(bb_copy, "pool="))) {
2984 		pool = xstrdup(tok + 5);
2985 		sep = strchr(pool, ',');
2986 		if (sep)
2987 			sep[0] = '\0';
2988 		sep = strchr(pool, ' ');
2989 		if (sep)
2990 			sep[0] = '\0';
2991 		tok_len = strlen(pool) + 5;
2992 		memset(tok, ' ', tok_len);
2993 	}
2994 
2995 	if ((tok = strstr(bb_copy, "swap="))) {
2996 		swap_cnt = strtol(tok + 5, &end_ptr, 10);
2997 		if (swap_cnt == 0) {
2998 			rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
2999 			goto fini;
3000 		}
3001 		swap = xstrdup(tok + 5);
3002 		sep = strchr(swap, ',');
3003 		if (sep)
3004 			sep[0] = '\0';
3005 		sep = strchr(swap, ' ');
3006 		if (sep)
3007 			sep[0] = '\0';
3008 		tok_len = strlen(swap) + 5;
3009 		memset(tok, ' ', tok_len);
3010 	}
3011 
3012 	if ((tok = strstr(bb_copy, "type="))) {
3013 		type = xstrdup(tok + 5);
3014 		sep = strchr(type, ',');
3015 		if (sep)
3016 			sep[0] = '\0';
3017 		sep = strchr(type, ' ');
3018 		if (sep)
3019 			sep[0] = '\0';
3020 		tok_len = strlen(type) + 5;
3021 		memset(tok, ' ', tok_len);
3022 	}
3023 
3024 	if (rc == SLURM_SUCCESS) {
3025 		/* Look for vestigial content. Treating this as an error would
3026 		 * prevent backward compatibility. Just log it for now. */
3027 		for (i = 0; bb_copy[i]; i++) {
3028 			if (isspace(bb_copy[i]))
3029 				continue;
3030 			verbose("%s: %s: Unrecognized --bb content: %s",
3031 				plugin_type, __func__, bb_copy + i);
3032 //			rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
3033 //			goto fini;
3034 		}
3035 	}
3036 
3037 	if (rc == SLURM_SUCCESS)
3038 		xfree(job_desc->burst_buffer);
3039 	if ((rc == SLURM_SUCCESS) && (swap_cnt || buf_size)) {
3040 		if (swap_cnt) {
3041 			xstrfmtcat(job_desc->burst_buffer,
3042 				   "#DW swap %"PRIu64"GiB", swap_cnt);
3043 			if (pool) {
3044 				xstrfmtcat(job_desc->burst_buffer,
3045 					   " pool=%s", pool);
3046 			}
3047 		}
3048 		if (buf_size) {
3049 			if (job_desc->burst_buffer)
3050 				xstrfmtcat(job_desc->burst_buffer, "\n");
3051 			xstrfmtcat(job_desc->burst_buffer,
3052 				   "#DW jobdw capacity=%s",
3053 				   bb_get_size_str(buf_size));
3054 			if (access) {
3055 				xstrfmtcat(job_desc->burst_buffer,
3056 					   " access_mode=%s", access);
3057 			}
3058 			if (pool) {
3059 				xstrfmtcat(job_desc->burst_buffer,
3060 					   " pool=%s", pool);
3061 			}
3062 			if (type) {
3063 				xstrfmtcat(job_desc->burst_buffer,
3064 					   " type=%s", type);
3065 			}
3066 		}
3067 	}
3068 
3069 fini:	xfree(access);
3070 	xfree(bb_copy);
3071 	xfree(capacity);
3072 	xfree(pool);
3073 	xfree(swap);
3074 	xfree(type);
3075 	return rc;
3076 }
3077 
3078 /* Insert the contents of "burst_buffer_file" into "script_body" */
_add_bb_to_script(char ** script_body,const char * burst_buffer_file)3079 static void  _add_bb_to_script(char **script_body,
3080 			       const char *burst_buffer_file)
3081 {
3082 	char *orig_script = *script_body;
3083 	char *new_script, *sep, save_char;
3084 	char *bb_opt = NULL;
3085 	int i;
3086 
3087 	if (!burst_buffer_file || (burst_buffer_file[0] == '\0'))
3088 		return;	/* No burst buffer file or empty file */
3089 
3090 	if (!orig_script) {
3091 		*script_body = xstrdup(burst_buffer_file);
3092 		return;
3093 	}
3094 
3095 	bb_opt = xstrdup(burst_buffer_file);
3096 	i = strlen(bb_opt) - 1;
3097 	if (bb_opt[i] != '\n')	/* Append new line as needed */
3098 		xstrcat(bb_opt, "\n");
3099 
3100 	if (orig_script[0] != '#') {
3101 		/* Prepend burst buffer file */
3102 		new_script = xstrdup(bb_opt);
3103 		xstrcat(new_script, orig_script);
3104 		xfree(*script_body);
3105 		*script_body = new_script;
3106 		xfree(bb_opt);
3107 		return;
3108 	}
3109 
3110 	sep = strchr(orig_script, '\n');
3111 	if (sep) {
3112 		save_char = sep[1];
3113 		sep[1] = '\0';
3114 		new_script = xstrdup(orig_script);
3115 		xstrcat(new_script, bb_opt);
3116 		sep[1] = save_char;
3117 		xstrcat(new_script, sep + 1);
3118 		xfree(*script_body);
3119 		*script_body = new_script;
3120 		xfree(bb_opt);
3121 		return;
3122 	} else {
3123 		new_script = xstrdup(orig_script);
3124 		xstrcat(new_script, "\n");
3125 		xstrcat(new_script, bb_opt);
3126 		xfree(*script_body);
3127 		*script_body = new_script;
3128 		xfree(bb_opt);
3129 		return;
3130 	}
3131 }
3132 
3133 /* For interactive jobs, build a script containing the relevant DataWarp
3134  * commands, as needed by the Cray API */
_build_bb_script(job_record_t * job_ptr,char * script_file)3135 static int _build_bb_script(job_record_t *job_ptr, char *script_file)
3136 {
3137 	char *out_buf = NULL;
3138 	int rc;
3139 
3140 	xstrcat(out_buf, "#!/bin/bash\n");
3141 	xstrcat(out_buf, job_ptr->burst_buffer);
3142 	rc = _write_file(script_file, out_buf);
3143 	xfree(out_buf);
3144 
3145 	return rc;
3146 }
3147 
3148 /*
3149  * init() is called when the plugin is loaded, before any other functions
3150  * are called.  Read and validate configuration file here. Spawn thread to
3151  * periodically read Datawarp state.
3152  */
init(void)3153 extern int init(void)
3154 {
3155 	slurm_mutex_init(&bb_state.bb_mutex);
3156 	slurm_mutex_lock(&bb_state.bb_mutex);
3157 	bb_load_config(&bb_state, (char *)plugin_type); /* Removes "const" */
3158 	_test_config();
3159 	if (bb_state.bb_config.debug_flag)
3160 		info("%s: %s", plugin_type,  __func__);
3161 	if (!state_save_loc)
3162 		state_save_loc = slurm_get_state_save_location();
3163 	bb_alloc_cache(&bb_state);
3164 	run_command_init();
3165 	slurm_thread_create(&bb_state.bb_thread, _bb_agent, NULL);
3166 	slurm_mutex_unlock(&bb_state.bb_mutex);
3167 
3168 	return SLURM_SUCCESS;
3169 }
3170 
3171 /*
3172  * fini() is called when the plugin is unloaded. Free all memory and shutdown
3173  * threads.
3174  */
fini(void)3175 extern int fini(void)
3176 {
3177 	int pc, last_pc = 0;
3178 
3179 	run_command_shutdown();
3180 	while ((pc = run_command_count()) > 0) {
3181 		if ((last_pc != 0) && (last_pc != pc)) {
3182 			info("%s: waiting for %d running processes",
3183 			     plugin_type, pc);
3184 		}
3185 		last_pc = pc;
3186 		usleep(100000);
3187 	}
3188 
3189 	slurm_mutex_lock(&bb_state.bb_mutex);
3190 	if (bb_state.bb_config.debug_flag)
3191 		info("%s: %s", plugin_type,  __func__);
3192 
3193 	slurm_mutex_lock(&bb_state.term_mutex);
3194 	bb_state.term_flag = true;
3195 	slurm_cond_signal(&bb_state.term_cond);
3196 	slurm_mutex_unlock(&bb_state.term_mutex);
3197 
3198 	if (bb_state.bb_thread) {
3199 		slurm_mutex_unlock(&bb_state.bb_mutex);
3200 		pthread_join(bb_state.bb_thread, NULL);
3201 		slurm_mutex_lock(&bb_state.bb_mutex);
3202 		bb_state.bb_thread = 0;
3203 	}
3204 	bb_clear_config(&bb_state.bb_config, true);
3205 	bb_clear_cache(&bb_state);
3206 	xfree(state_save_loc);
3207 	slurm_mutex_unlock(&bb_state.bb_mutex);
3208 
3209 	return SLURM_SUCCESS;
3210 }
3211 
3212 /* Identify and purge any vestigial buffers (i.e. we have a job buffer, but
3213  * the matching job is either gone or completed OR we have a job buffer and a
3214  * pending job, but don't know the status of stage-in) */
_purge_vestigial_bufs(void)3215 static void _purge_vestigial_bufs(void)
3216 {
3217 	bb_alloc_t *bb_alloc = NULL;
3218 	time_t defer_time = time(NULL) + 60;
3219 	int i;
3220 
3221 	for (i = 0; i < BB_HASH_SIZE; i++) {
3222 		bb_alloc = bb_state.bb_ahash[i];
3223 		while (bb_alloc) {
3224 			job_record_t *job_ptr = NULL;
3225 			if (bb_alloc->job_id)
3226 				job_ptr = find_job_record(bb_alloc->job_id);
3227 			if (bb_alloc->job_id == 0) {
3228 				/* Persistent buffer, do not purge */
3229 			} else if (!job_ptr) {
3230 				info("%s: Purging vestigial buffer for JobId=%u",
3231 				     plugin_type, bb_alloc->job_id);
3232 				_queue_teardown(bb_alloc->job_id,
3233 						bb_alloc->user_id, false);
3234 			} else if (!IS_JOB_STARTED(job_ptr)) {
3235 				/* We do not know the state of file staging,
3236 				 * so teardown the buffer and defer the job
3237 				 * for at least 60 seconds (for the teardown) */
3238 				debug("%s: Purging buffer for pending JobId=%u",
3239 				      plugin_type, bb_alloc->job_id);
3240 				_queue_teardown(bb_alloc->job_id,
3241 						bb_alloc->user_id, true);
3242 				if (job_ptr->details &&
3243 				    (job_ptr->details->begin_time <defer_time)){
3244 					job_ptr->details->begin_time =
3245 						defer_time;
3246 				}
3247 			}
3248 			bb_alloc = bb_alloc->next;
3249 		}
3250 	}
3251 }
3252 
3253 /*
3254  * Return the total burst buffer size in MB
3255  */
bb_p_get_system_size(void)3256 extern uint64_t bb_p_get_system_size(void)
3257 {
3258 	uint64_t size = 0;
3259 
3260 	slurm_mutex_lock(&bb_state.bb_mutex);
3261 	size = bb_state.total_space / (1024 * 1024);	/* bytes to MB */
3262 	slurm_mutex_unlock(&bb_state.bb_mutex);
3263 	return size;
3264 }
3265 
3266 /*
3267  * Load the current burst buffer state (e.g. how much space is available now).
3268  * Run at the beginning of each scheduling cycle in order to recognize external
3269  * changes to the burst buffer state (e.g. capacity is added, removed, fails,
3270  * etc.)
3271  *
3272  * init_config IN - true if called as part of slurmctld initialization
3273  * Returns a Slurm errno.
3274  */
bb_p_load_state(bool init_config)3275 extern int bb_p_load_state(bool init_config)
3276 {
3277 	if (!init_config)
3278 		return SLURM_SUCCESS;
3279 
3280 	/* In practice the Cray APIs are too slow to run inline on each
3281 	 * scheduling cycle. Do so on a periodic basis from _bb_agent(). */
3282 	if (bb_state.bb_config.debug_flag)
3283 		debug("%s: %s", plugin_type,  __func__);
3284 	_load_state(init_config);	/* Has own locking */
3285 	slurm_mutex_lock(&bb_state.bb_mutex);
3286 	bb_set_tres_pos(&bb_state);
3287 	_purge_vestigial_bufs();
3288 	slurm_mutex_unlock(&bb_state.bb_mutex);
3289 
3290 	_save_bb_state();	/* Has own locks excluding file write */
3291 
3292 	return SLURM_SUCCESS;
3293 }
3294 
3295 /*
3296  * Return string containing current burst buffer status
3297  * argc IN - count of status command arguments
3298  * argv IN - status command arguments
3299  * RET status string, release memory using xfree()
3300  */
bb_p_get_status(uint32_t argc,char ** argv)3301 extern char *bb_p_get_status(uint32_t argc, char **argv)
3302 {
3303 	char *status_resp, **script_argv;
3304 	int i, status = 0;
3305 
3306 	script_argv = xcalloc((argc + 2), sizeof(char *));
3307 	script_argv[0] = "dwstat";
3308 	for (i = 0; i < argc; i++)
3309 		script_argv[i + 1] = argv[i];
3310 	status_resp = run_command("dwstat", bb_state.bb_config.get_sys_status,
3311 				  script_argv, 2000, 0, &status);
3312 	if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
3313 		xfree(status_resp);
3314 		status_resp = xstrdup("Error running dwstat\n");
3315 	}
3316 	xfree(script_argv);
3317 
3318 	return status_resp;
3319 }
3320 
3321 /*
3322  * Note configuration may have changed. Handle changes in BurstBufferParameters.
3323  *
3324  * Returns a Slurm errno.
3325  */
bb_p_reconfig(void)3326 extern int bb_p_reconfig(void)
3327 {
3328 	char *old_default_pool;
3329 	int i;
3330 
3331 	slurm_mutex_lock(&bb_state.bb_mutex);
3332 	if (bb_state.bb_config.debug_flag)
3333 		info("%s: %s", plugin_type,  __func__);
3334 	old_default_pool = bb_state.bb_config.default_pool;
3335 	bb_state.bb_config.default_pool = NULL;
3336 	bb_load_config(&bb_state, (char *)plugin_type); /* Remove "const" */
3337 	if (!bb_state.bb_config.default_pool)
3338 		bb_state.bb_config.default_pool = old_default_pool;
3339 	else
3340 		xfree(old_default_pool);
3341 	_test_config();
3342 	slurm_mutex_unlock(&bb_state.bb_mutex);
3343 
3344 	/* reconfig is the place we make sure the pointers are correct */
3345 	for (i = 0; i < BB_HASH_SIZE; i++) {
3346 		bb_alloc_t *bb_alloc = bb_state.bb_ahash[i];
3347 		while (bb_alloc) {
3348 			_set_assoc_mgr_ptrs(bb_alloc);
3349 			bb_alloc = bb_alloc->next;
3350 		}
3351 	}
3352 
3353 	return SLURM_SUCCESS;
3354 }
3355 
3356 /*
3357  * Pack current burst buffer state information for network transmission to
3358  * user (e.g. "scontrol show burst")
3359  *
3360  * Returns a Slurm errno.
3361  */
bb_p_state_pack(uid_t uid,Buf buffer,uint16_t protocol_version)3362 extern int bb_p_state_pack(uid_t uid, Buf buffer, uint16_t protocol_version)
3363 {
3364 	uint32_t rec_count = 0;
3365 
3366 	slurm_mutex_lock(&bb_state.bb_mutex);
3367 	packstr(bb_state.name, buffer);
3368 	bb_pack_state(&bb_state, buffer, protocol_version);
3369 
3370 	if (((bb_state.bb_config.flags & BB_FLAG_PRIVATE_DATA) == 0) ||
3371 	    validate_operator(uid))
3372 		uid = 0;	/* User can see all data */
3373 	rec_count = bb_pack_bufs(uid, &bb_state, buffer, protocol_version);
3374 	(void) bb_pack_usage(uid, &bb_state, buffer, protocol_version);
3375 	if (bb_state.bb_config.debug_flag) {
3376 		debug("%s: %s: record_count:%u",
3377 		      plugin_type, __func__, rec_count);
3378 	}
3379 	slurm_mutex_unlock(&bb_state.bb_mutex);
3380 
3381 	return SLURM_SUCCESS;
3382 }
3383 
3384 /*
3385  * Preliminary validation of a job submit request with respect to burst buffer
3386  * options. Performed after setting default account + qos, but prior to
3387  * establishing job ID or creating script file.
3388  *
3389  * Returns a Slurm errno.
3390  */
bb_p_job_validate(job_desc_msg_t * job_desc,uid_t submit_uid)3391 extern int bb_p_job_validate(job_desc_msg_t *job_desc, uid_t submit_uid)
3392 {
3393 	uint64_t bb_size = 0;
3394 	int i, rc;
3395 
3396 	xassert(job_desc);
3397 	xassert(job_desc->tres_req_cnt);
3398 
3399 	rc = _parse_bb_opts(job_desc, &bb_size, submit_uid);
3400 	if (rc != SLURM_SUCCESS)
3401 		return rc;
3402 
3403 	if ((job_desc->burst_buffer == NULL) ||
3404 	    (job_desc->burst_buffer[0] == '\0'))
3405 		return rc;
3406 
3407 	if (bb_state.bb_config.debug_flag) {
3408 		info("%s: %s: job_user_id:%u, submit_uid:%d",
3409 		     plugin_type, __func__, job_desc->user_id, submit_uid);
3410 		info("%s: %s: burst_buffer:%s",
3411 		     plugin_type,__func__, job_desc->burst_buffer);
3412 	}
3413 
3414 	if (job_desc->user_id == 0) {
3415 		info("%s: %s: User root can not allocate burst buffers",
3416 		     plugin_type, __func__);
3417 		return ESLURM_BURST_BUFFER_PERMISSION;
3418 	}
3419 
3420 	slurm_mutex_lock(&bb_state.bb_mutex);
3421 	if (bb_state.bb_config.allow_users) {
3422 		bool found_user = false;
3423 		for (i = 0; bb_state.bb_config.allow_users[i]; i++) {
3424 			if (job_desc->user_id ==
3425 			    bb_state.bb_config.allow_users[i]) {
3426 				found_user = true;
3427 				break;
3428 			}
3429 		}
3430 		if (!found_user) {
3431 			rc = ESLURM_BURST_BUFFER_PERMISSION;
3432 			goto fini;
3433 		}
3434 	}
3435 
3436 	if (bb_state.bb_config.deny_users) {
3437 		bool found_user = false;
3438 		for (i = 0; bb_state.bb_config.deny_users[i]; i++) {
3439 			if (job_desc->user_id ==
3440 			    bb_state.bb_config.deny_users[i]) {
3441 				found_user = true;
3442 				break;
3443 			}
3444 		}
3445 		if (found_user) {
3446 			rc = ESLURM_BURST_BUFFER_PERMISSION;
3447 			goto fini;
3448 		}
3449 	}
3450 
3451 	if (bb_state.tres_pos > 0) {
3452 		job_desc->tres_req_cnt[bb_state.tres_pos] =
3453 			bb_size / (1024 * 1024);
3454 	}
3455 
3456 fini:	slurm_mutex_unlock(&bb_state.bb_mutex);
3457 
3458 	return rc;
3459 }
3460 
3461 /* Add key=value pairs from "resp_msg" to the job's environment */
_update_job_env(job_record_t * job_ptr,char * file_path)3462 static void _update_job_env(job_record_t *job_ptr, char *file_path)
3463 {
3464 	struct stat stat_buf;
3465 	char *data_buf = NULL, *start, *sep;
3466 	int path_fd, i, inx = 0, env_cnt = 0;
3467 	ssize_t read_size;
3468 
3469 	/* Read the DataWarp generated environment variable file */
3470 	path_fd = open(file_path, 0);
3471 	if (path_fd == -1) {
3472 		error("%s: %s: open error on file %s: %m",
3473 		      plugin_type, __func__, file_path);
3474 		return;
3475 	}
3476 	fd_set_close_on_exec(path_fd);
3477 	if (fstat(path_fd, &stat_buf) == -1) {
3478 		error("%s: %s: stat error on file %s: %m",
3479 		      plugin_type, __func__, file_path);
3480 		stat_buf.st_size = 2048;
3481 	} else if (stat_buf.st_size == 0)
3482 		goto fini;
3483 	data_buf = xmalloc_nz(stat_buf.st_size + 1);
3484 	while (inx < stat_buf.st_size) {
3485 		read_size = read(path_fd, data_buf + inx, stat_buf.st_size);
3486 		if (read_size < 0)
3487 			data_buf[inx] = '\0';
3488 		else
3489 			data_buf[inx + read_size] = '\0';
3490 		if (read_size > 0) {
3491 			inx += read_size;
3492 		} else if (read_size == 0) {	/* EOF */
3493 			break;
3494 		} else if (read_size < 0) {	/* error */
3495 			if ((errno == EAGAIN) || (errno == EINTR))
3496 				continue;
3497 			error("%s: %s: read error on file %s: %m",
3498 			      plugin_type, __func__, file_path);
3499 			break;
3500 		}
3501 	}
3502 	if (bb_state.bb_config.debug_flag)
3503 		info("%s: %s: %s", plugin_type, __func__, data_buf);
3504 
3505 	/* Get count of environment variables in the file */
3506 	env_cnt = 0;
3507 	if (data_buf) {
3508 		for (i = 0; data_buf[i]; i++) {
3509 			if (data_buf[i] == '=')
3510 				env_cnt++;
3511 		}
3512 	}
3513 
3514 	/* Add to supplemental environment variables (in job record) */
3515 	if (env_cnt) {
3516 		job_ptr->details->env_sup =
3517 			xrealloc(job_ptr->details->env_sup,
3518 				 sizeof(char *) *
3519 				 (job_ptr->details->env_cnt + env_cnt));
3520 		start = data_buf;
3521 		for (i = 0; (i < env_cnt) && start[0]; i++) {
3522 			sep = strchr(start, '\n');
3523 			if (sep)
3524 				sep[0] = '\0';
3525 			job_ptr->details->env_sup[job_ptr->details->env_cnt++] =
3526 				xstrdup(start);
3527 			if (sep)
3528 				start = sep + 1;
3529 			else
3530 				break;
3531 		}
3532 	}
3533 
3534 fini:	xfree(data_buf);
3535 	close(path_fd);
3536 }
3537 
3538 /* Return true if #DW options (excludes #BB options) */
_have_dw_cmd_opts(bb_job_t * bb_job)3539 static bool _have_dw_cmd_opts(bb_job_t *bb_job)
3540 {
3541 	int i;
3542 	bb_buf_t *bb_buf;
3543 
3544 	xassert(bb_job);
3545 	if (bb_job->total_size)
3546 		return true;
3547 
3548 	for (i = 0, bb_buf = bb_job->buf_ptr; i < bb_job->buf_cnt;
3549 	     i++, bb_buf++) {
3550 		if (bb_buf->use)
3551 			return true;
3552 	}
3553 
3554 	return false;
3555 }
3556 
3557 /*
3558  * Secondary validation of a job submit request with respect to burst buffer
3559  * options. Performed after establishing job ID and creating script file.
3560  *
3561  * NOTE: We run several DW APIs at job submit time so that we can notify the
3562  * user immediately if there is some error, although that can be a relatively
3563  * slow operation. We have a timeout of 3 seconds on the DW APIs here and log
3564  * any times over 0.2 seconds.
3565  *
3566  * NOTE: We do this work inline so the user can be notified immediately if
3567  * there is some problem with their script.
3568  *
3569  * Returns a Slurm errno.
3570  */
bb_p_job_validate2(job_record_t * job_ptr,char ** err_msg)3571 extern int bb_p_job_validate2(job_record_t *job_ptr, char **err_msg)
3572 {
3573 	char *hash_dir = NULL, *job_dir = NULL, *script_file = NULL;
3574 	char *task_script_file = NULL;
3575 	char *resp_msg = NULL, **script_argv;
3576 	char *dw_cli_path;
3577 	int fd = -1, hash_inx, rc = SLURM_SUCCESS, status = 0;
3578 	bb_job_t *bb_job;
3579 	uint32_t timeout;
3580 	bool using_master_script = false;
3581 	DEF_TIMERS;
3582 
3583 	if ((job_ptr->burst_buffer == NULL) ||
3584 	    (job_ptr->burst_buffer[0] == '\0')) {
3585 		if (job_ptr->details->min_nodes == 0)
3586 			rc = ESLURM_INVALID_NODE_COUNT;
3587 		return rc;
3588 	}
3589 
3590 	/* Initialization */
3591 	slurm_mutex_lock(&bb_state.bb_mutex);
3592 	if (bb_state.last_load_time == 0) {
3593 		/* Assume request is valid for now, can't test it anyway */
3594 		info("%s: %s: Burst buffer down, skip tests for %pJ",
3595 		      plugin_type, __func__, job_ptr);
3596 		slurm_mutex_unlock(&bb_state.bb_mutex);
3597 		return rc;
3598 	}
3599 	bb_job = _get_bb_job(job_ptr);
3600 	if (bb_job == NULL) {
3601 		slurm_mutex_unlock(&bb_state.bb_mutex);
3602 		if (job_ptr->details->min_nodes == 0)
3603 			rc = ESLURM_INVALID_NODE_COUNT;
3604 		return rc;
3605 	}
3606 	if ((job_ptr->details->min_nodes == 0) && bb_job->use_job_buf) {
3607 		slurm_mutex_unlock(&bb_state.bb_mutex);
3608 		return ESLURM_INVALID_BURST_BUFFER_REQUEST;
3609 	}
3610 
3611 	if (!_have_dw_cmd_opts(bb_job)) {
3612 		slurm_mutex_unlock(&bb_state.bb_mutex);
3613 		return rc;
3614 	}
3615 
3616 	if (bb_state.bb_config.debug_flag)
3617 		info("%s: %s: %pJ", plugin_type, __func__, job_ptr);
3618 
3619 	timeout = bb_state.bb_config.validate_timeout * 1000;
3620 	dw_cli_path = xstrdup(bb_state.bb_config.get_sys_state);
3621 	slurm_mutex_unlock(&bb_state.bb_mutex);
3622 
3623 	/* Standard file location for job arrays */
3624 	if ((job_ptr->array_task_id != NO_VAL) &&
3625 	    (job_ptr->array_job_id != job_ptr->job_id)) {
3626 		hash_inx = job_ptr->array_job_id % 10;
3627 		xstrfmtcat(hash_dir, "%s/hash.%d", state_save_loc, hash_inx);
3628 		(void) mkdir(hash_dir, 0700);
3629 		xstrfmtcat(job_dir, "%s/job.%u", hash_dir,
3630 			   job_ptr->array_job_id);
3631 		(void) mkdir(job_dir, 0700);
3632 		xstrfmtcat(script_file, "%s/script", job_dir);
3633 		fd = open(script_file, 0);
3634 		if (fd >= 0) {	/* found the script */
3635 			close(fd);
3636 			using_master_script = true;
3637 		} else {
3638 			xfree(hash_dir);
3639 		}
3640 	} else {
3641 		hash_inx = job_ptr->job_id % 10;
3642 		xstrfmtcat(hash_dir, "%s/hash.%d", state_save_loc, hash_inx);
3643 		(void) mkdir(hash_dir, 0700);
3644 		xstrfmtcat(job_dir, "%s/job.%u", hash_dir, job_ptr->job_id);
3645 		(void) mkdir(job_dir, 0700);
3646 		xstrfmtcat(script_file, "%s/script", job_dir);
3647 		if (job_ptr->batch_flag == 0)
3648 			rc = _build_bb_script(job_ptr, script_file);
3649 	}
3650 
3651 	/* Run "job_process" function, validates user script */
3652 	script_argv = xcalloc(10, sizeof(char *));	/* NULL terminated */
3653 	script_argv[0] = xstrdup("dw_wlm_cli");
3654 	script_argv[1] = xstrdup("--function");
3655 	script_argv[2] = xstrdup("job_process");
3656 	script_argv[3] = xstrdup("--job");
3657 	xstrfmtcat(script_argv[4], "%s", script_file);
3658 	START_TIMER;
3659 	resp_msg = run_command("job_process",
3660 			       bb_state.bb_config.get_sys_state,
3661 			       script_argv, timeout, 0, &status);
3662 	END_TIMER;
3663 	if ((DELTA_TIMER > 200000) ||	/* 0.2 secs */
3664 	    bb_state.bb_config.debug_flag)
3665 		info("%s: %s: job_process ran for %s",
3666 		     plugin_type, __func__, TIME_STR);
3667 	_log_script_argv(script_argv, resp_msg);
3668 	if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
3669 		error("%s: %s: job_process for %pJ status:%u response:%s",
3670 		      plugin_type, __func__, job_ptr, status, resp_msg);
3671 		if (err_msg) {
3672 			xfree(*err_msg);
3673 			xstrfmtcat(*err_msg, "%s: %s", plugin_type, resp_msg);
3674 		}
3675 		rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
3676 	}
3677 	xfree(resp_msg);
3678 	free_command_argv(script_argv);
3679 
3680 	/* Clean-up */
3681 	xfree(hash_dir);
3682 	xfree(job_dir);
3683 	xfree(dw_cli_path);
3684 	if (rc != SLURM_SUCCESS) {
3685 		slurm_mutex_lock(&bb_state.bb_mutex);
3686 		bb_job_del(&bb_state, job_ptr->job_id);
3687 		slurm_mutex_unlock(&bb_state.bb_mutex);
3688 	} else if (using_master_script) {
3689 		/* Job array's need to have script file in the "standard"
3690 		 * location for the remaining logic, make hard link */
3691 		hash_inx = job_ptr->job_id % 10;
3692 		xstrfmtcat(hash_dir, "%s/hash.%d", state_save_loc, hash_inx);
3693 		(void) mkdir(hash_dir, 0700);
3694 		xstrfmtcat(job_dir, "%s/job.%u", hash_dir, job_ptr->job_id);
3695 		xfree(hash_dir);
3696 		(void) mkdir(job_dir, 0700);
3697 		xstrfmtcat(task_script_file, "%s/script", job_dir);
3698 		xfree(job_dir);
3699 		if ((link(script_file, task_script_file) != 0) &&
3700 		    (errno != EEXIST)) {
3701 			error("%s: %s: link(%s,%s): %m",
3702 			      plugin_type, __func__, script_file,
3703 			      task_script_file);
3704 		}
3705 	}
3706 	xfree(task_script_file);
3707 	xfree(script_file);
3708 
3709 	return rc;
3710 }
3711 
_json_parse_real_size(json_object * j)3712 static struct bb_total_size *_json_parse_real_size(json_object *j)
3713 {
3714 	enum json_type type;
3715 	struct json_object_iter iter;
3716 	struct bb_total_size *bb_tot_sz;
3717 	const char *p;
3718 
3719 	bb_tot_sz = xmalloc(sizeof(struct bb_total_size));
3720 	json_object_object_foreachC(j, iter) {
3721 		type = json_object_get_type(iter.val);
3722 		switch (type) {
3723 			case json_type_string:
3724 				if (!xstrcmp(iter.key, "units")) {
3725 					p = json_object_get_string(iter.val);
3726 					if (!xstrcmp(p, "bytes")) {
3727 						bb_tot_sz->units =
3728 							BB_UNITS_BYTES;
3729 					}
3730 				}
3731 				break;
3732 			case json_type_int:
3733 				if (!xstrcmp(iter.key, "capacity")) {
3734 					bb_tot_sz->capacity =
3735 						json_object_get_int64(iter.val);
3736 				}
3737 				break;
3738 			default:
3739 				break;
3740 		}
3741 	}
3742 
3743 	return bb_tot_sz;
3744 }
3745 
3746 /*
3747  * Fill in the tres_cnt (in MB) based off the job record
3748  * NOTE: Based upon job-specific burst buffers, excludes persistent buffers
3749  * IN job_ptr - job record
3750  * IN/OUT tres_cnt - fill in this already allocated array with tres_cnts
3751  * IN locked - if the assoc_mgr tres read locked is locked or not
3752  */
bb_p_job_set_tres_cnt(job_record_t * job_ptr,uint64_t * tres_cnt,bool locked)3753 extern void bb_p_job_set_tres_cnt(job_record_t *job_ptr, uint64_t *tres_cnt,
3754 				  bool locked)
3755 {
3756 	bb_job_t *bb_job;
3757 
3758 	if (!tres_cnt) {
3759 		error("%s: %s: No tres_cnt given when looking at %pJ",
3760 		      plugin_type, __func__, job_ptr);
3761 	}
3762 
3763 	if (bb_state.tres_pos < 0) {
3764 		/* BB not defined in AccountingStorageTRES */
3765 		return;
3766 	}
3767 
3768 	slurm_mutex_lock(&bb_state.bb_mutex);
3769 	if ((bb_job = _get_bb_job(job_ptr))) {
3770 		tres_cnt[bb_state.tres_pos] =
3771 			bb_job->total_size / (1024 * 1024);
3772 	}
3773 	slurm_mutex_unlock(&bb_state.bb_mutex);
3774 }
3775 
3776 /*
3777  * For a given job, return our best guess if when it might be able to start
3778  */
bb_p_job_get_est_start(job_record_t * job_ptr)3779 extern time_t bb_p_job_get_est_start(job_record_t *job_ptr)
3780 {
3781 	time_t est_start = time(NULL);
3782 	bb_job_t *bb_job;
3783 	int rc;
3784 
3785 	if ((job_ptr->burst_buffer == NULL) ||
3786 	    (job_ptr->burst_buffer[0] == '\0'))
3787 		return est_start;
3788 
3789 	if (job_ptr->array_recs &&
3790 	    ((job_ptr->array_task_id == NO_VAL) ||
3791 	     (job_ptr->array_task_id == INFINITE))) {
3792 		est_start += 300;	/* 5 minutes, guess... */
3793 		return est_start;	/* Can't operate on job array struct */
3794 	}
3795 
3796 	slurm_mutex_lock(&bb_state.bb_mutex);
3797 	if (bb_state.last_load_time == 0) {
3798 		est_start += 3600;	/* 1 hour, guess... */
3799 		slurm_mutex_unlock(&bb_state.bb_mutex);
3800 		return est_start;	/* Can't operate on job array struct */
3801 	}
3802 
3803 	if ((bb_job = _get_bb_job(job_ptr)) == NULL) {
3804 		slurm_mutex_unlock(&bb_state.bb_mutex);
3805 		return est_start;
3806 	}
3807 
3808 	if (bb_state.bb_config.debug_flag)
3809 		info("%s: %s: %pJ", plugin_type, __func__, job_ptr);
3810 
3811 	if ((bb_job->persist_add == 0) && (bb_job->swap_size == 0) &&
3812 	    (bb_job->total_size == 0)) {
3813 		/* Only deleting or using persistent buffers */
3814 		if (!_test_persistent_use_ready(bb_job, job_ptr))
3815 			est_start += 60 * 60;	/* one hour, guess... */
3816 	} else if (bb_job->state == BB_STATE_PENDING) {
3817 		rc = _test_size_limit(job_ptr, bb_job);
3818 		if (rc == 0) {		/* Could start now */
3819 			;
3820 		} else if (rc == 1) {	/* Exceeds configured limits */
3821 			est_start += 365 * 24 * 60 * 60;
3822 		} else {		/* No space currently available */
3823 			est_start = MAX(est_start, bb_state.next_end_time);
3824 		}
3825 	} else {	/* Allocation or staging in progress */
3826 		est_start++;
3827 	}
3828 	slurm_mutex_unlock(&bb_state.bb_mutex);
3829 
3830 	return est_start;
3831 }
3832 
3833 /*
3834  * Attempt to allocate resources and begin file staging for pending jobs.
3835  */
bb_p_job_try_stage_in(List job_queue)3836 extern int bb_p_job_try_stage_in(List job_queue)
3837 {
3838 	bb_job_queue_rec_t *job_rec;
3839 	List job_candidates;
3840 	ListIterator job_iter;
3841 	job_record_t *job_ptr;
3842 	bb_job_t *bb_job;
3843 	int rc;
3844 
3845 	slurm_mutex_lock(&bb_state.bb_mutex);
3846 	if (bb_state.bb_config.debug_flag)
3847 		info("%s: %s: Mutex locked", plugin_type,  __func__);
3848 
3849 	if (bb_state.last_load_time == 0) {
3850 		slurm_mutex_unlock(&bb_state.bb_mutex);
3851 		return SLURM_SUCCESS;
3852 	}
3853 
3854 	/* Identify candidates to be allocated burst buffers */
3855 	job_candidates = list_create(_job_queue_del);
3856 	job_iter = list_iterator_create(job_queue);
3857 	while ((job_ptr = list_next(job_iter))) {
3858 		if (!IS_JOB_PENDING(job_ptr) ||
3859 		    (job_ptr->start_time == 0) ||
3860 		    (job_ptr->burst_buffer == NULL) ||
3861 		    (job_ptr->burst_buffer[0] == '\0'))
3862 			continue;
3863 		if (job_ptr->array_recs &&
3864 		    ((job_ptr->array_task_id == NO_VAL) ||
3865 		     (job_ptr->array_task_id == INFINITE)))
3866 			continue;	/* Can't operate on job array struct */
3867 		bb_job = _get_bb_job(job_ptr);
3868 		if (bb_job == NULL)
3869 			continue;
3870 		if (bb_job->state == BB_STATE_COMPLETE)
3871 			bb_job->state = BB_STATE_PENDING;     /* job requeued */
3872 		else if (bb_job->state >= BB_STATE_POST_RUN)
3873 			continue;	/* Requeued job still staging out */
3874 		job_rec = xmalloc(sizeof(bb_job_queue_rec_t));
3875 		job_rec->job_ptr = job_ptr;
3876 		job_rec->bb_job = bb_job;
3877 		list_push(job_candidates, job_rec);
3878 	}
3879 	list_iterator_destroy(job_iter);
3880 
3881 	/* Sort in order of expected start time */
3882 	list_sort(job_candidates, bb_job_queue_sort);
3883 
3884 	bb_set_use_time(&bb_state);
3885 	job_iter = list_iterator_create(job_candidates);
3886 	while ((job_rec = list_next(job_iter))) {
3887 		job_ptr = job_rec->job_ptr;
3888 		bb_job = job_rec->bb_job;
3889 		if (bb_job->state >= BB_STATE_STAGING_IN)
3890 			continue;	/* Job was already allocated a buffer */
3891 
3892 		rc = _test_size_limit(job_ptr, bb_job);
3893 		if (rc == 0)		/* Could start now */
3894 			(void) _alloc_job_bb(job_ptr, bb_job, true);
3895 		else if (rc == 1)	/* Exceeds configured limits */
3896 			continue;
3897 		else			/* No space currently available */
3898 			break;
3899 	}
3900 	list_iterator_destroy(job_iter);
3901 	slurm_mutex_unlock(&bb_state.bb_mutex);
3902 	FREE_NULL_LIST(job_candidates);
3903 
3904 	return SLURM_SUCCESS;
3905 }
3906 
3907 /*
3908  * Determine if a job's burst buffer stage-in is complete
3909  * job_ptr IN - Job to test
3910  * test_only IN - If false, then attempt to allocate burst buffer if possible
3911  *
3912  * RET: 0 - stage-in is underway
3913  *      1 - stage-in complete
3914  *     -1 - stage-in not started or burst buffer in some unexpected state
3915  */
bb_p_job_test_stage_in(job_record_t * job_ptr,bool test_only)3916 extern int bb_p_job_test_stage_in(job_record_t *job_ptr, bool test_only)
3917 {
3918 	bb_job_t *bb_job = NULL;
3919 	int rc = 1;
3920 
3921 	if ((job_ptr->burst_buffer == NULL) ||
3922 	    (job_ptr->burst_buffer[0] == '\0'))
3923 		return 1;
3924 
3925 	if (job_ptr->array_recs &&
3926 	    ((job_ptr->array_task_id == NO_VAL) ||
3927 	     (job_ptr->array_task_id == INFINITE)))
3928 		return -1;	/* Can't operate on job array structure */
3929 
3930 	slurm_mutex_lock(&bb_state.bb_mutex);
3931 	if (bb_state.bb_config.debug_flag) {
3932 		info("%s: %s: %pJ test_only:%d",
3933 		     plugin_type, __func__, job_ptr, (int) test_only);
3934 	}
3935 	if (bb_state.last_load_time != 0)
3936 		bb_job = _get_bb_job(job_ptr);
3937 	if (bb_job && (bb_job->state == BB_STATE_COMPLETE))
3938 		bb_job->state = BB_STATE_PENDING;	/* job requeued */
3939 	if (bb_job == NULL) {
3940 		rc = -1;
3941 	} else if (bb_job->state < BB_STATE_STAGING_IN) {
3942 		/* Job buffer not allocated, create now if space available */
3943 		rc = -1;
3944 		if ((test_only == false) &&
3945 		    (_test_size_limit(job_ptr, bb_job) == 0) &&
3946 		    (_alloc_job_bb(job_ptr, bb_job, false) == SLURM_SUCCESS)) {
3947 			rc = 0;	/* Setup/stage-in in progress */
3948 		}
3949 	} else if (bb_job->state == BB_STATE_STAGING_IN) {
3950 		rc = 0;
3951 	} else if (bb_job->state == BB_STATE_STAGED_IN) {
3952 		rc = 1;
3953 	} else {
3954 		rc = -1;	/* Requeued job still staging in */
3955 	}
3956 
3957 	slurm_mutex_unlock(&bb_state.bb_mutex);
3958 
3959 	return rc;
3960 }
3961 
3962 /* Attempt to claim burst buffer resources.
3963  * At this time, bb_g_job_test_stage_in() should have been run successfully AND
3964  * the compute nodes selected for the job.
3965  *
3966  * Returns a Slurm errno.
3967  */
bb_p_job_begin(job_record_t * job_ptr)3968 extern int bb_p_job_begin(job_record_t *job_ptr)
3969 {
3970 	char *client_nodes_file_nid = NULL, *exec_host_file = NULL;
3971 	pre_run_args_t *pre_run_args;
3972 	char **pre_run_argv = NULL, **script_argv = NULL;
3973 	char *job_dir = NULL, *path_file, *resp_msg;
3974 	int arg_inx, hash_inx, rc = SLURM_SUCCESS, status = 0;
3975 	bb_job_t *bb_job;
3976 	uint32_t timeout;
3977 	bool do_pre_run, set_exec_host;
3978 	DEF_TIMERS;
3979 	pthread_t tid;
3980 
3981 	if ((job_ptr->burst_buffer == NULL) ||
3982 	    (job_ptr->burst_buffer[0] == '\0'))
3983 		return SLURM_SUCCESS;
3984 
3985 	if (((!job_ptr->job_resrcs || !job_ptr->job_resrcs->nodes)) &&
3986 	    (job_ptr->details->min_nodes != 0)) {
3987 		error("%s: %s: %pJ lacks node allocation",
3988 		      plugin_type, __func__, job_ptr);
3989 		return SLURM_ERROR;
3990 	}
3991 
3992 	slurm_mutex_lock(&bb_state.bb_mutex);
3993 	if (bb_state.bb_config.debug_flag)
3994 		info("%s: %s: %pJ",
3995 		     plugin_type, __func__, job_ptr);
3996 
3997 	if (bb_state.last_load_time == 0) {
3998 		info("%s: %s: Burst buffer down, can not start %pJ",
3999 		      plugin_type, __func__, job_ptr);
4000 		slurm_mutex_unlock(&bb_state.bb_mutex);
4001 		return SLURM_ERROR;
4002 	}
4003 	bb_job = _get_bb_job(job_ptr);
4004 	if (!bb_job) {
4005 		error("%s: %s: no job record buffer for %pJ",
4006 		      plugin_type, __func__, job_ptr);
4007 		xfree(job_ptr->state_desc);
4008 		job_ptr->state_desc =
4009 			xstrdup("Could not find burst buffer record");
4010 		job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
4011 		_queue_teardown(job_ptr->job_id, job_ptr->user_id, true);
4012 		slurm_mutex_unlock(&bb_state.bb_mutex);
4013 		return SLURM_ERROR;
4014 	}
4015 	do_pre_run = _have_dw_cmd_opts(bb_job);
4016 
4017 	/* Confirm that persistent burst buffers work has been completed */
4018 	if ((_create_bufs(job_ptr, bb_job, true) > 0)) {
4019 		xfree(job_ptr->state_desc);
4020 		job_ptr->state_desc =
4021 			xstrdup("Error managing persistent burst buffers");
4022 		job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
4023 		_queue_teardown(job_ptr->job_id, job_ptr->user_id, true);
4024 		slurm_mutex_unlock(&bb_state.bb_mutex);
4025 		return SLURM_ERROR;
4026 	}
4027 
4028 	hash_inx = job_ptr->job_id % 10;
4029 	xstrfmtcat(job_dir, "%s/hash.%d/job.%u", state_save_loc, hash_inx,
4030 		   job_ptr->job_id);
4031 	xstrfmtcat(client_nodes_file_nid, "%s/client_nids", job_dir);
4032 	if (do_pre_run)
4033 		bb_job->state = BB_STATE_PRE_RUN;
4034 	else
4035 		bb_job->state = BB_STATE_RUNNING;
4036 	if (bb_state.bb_config.flags & BB_FLAG_SET_EXEC_HOST)
4037 		set_exec_host = true;
4038 	else
4039 		set_exec_host = false;
4040 	slurm_mutex_unlock(&bb_state.bb_mutex);
4041 
4042 	if (job_ptr->job_resrcs && job_ptr->job_resrcs->nodes &&
4043 	    _write_nid_file(client_nodes_file_nid, job_ptr->job_resrcs->nodes,
4044 			    job_ptr)) {
4045 		xfree(client_nodes_file_nid);
4046 	}
4047 	if (set_exec_host && !job_ptr->batch_host && job_ptr->alloc_node) {
4048 		xstrfmtcat(exec_host_file, "%s/exec_host", job_dir);
4049 		if (_write_nid_file(exec_host_file, job_ptr->alloc_node,
4050 				    job_ptr)) {
4051 			xfree(exec_host_file);
4052 		}
4053 	}
4054 
4055 	/* Run "paths" function, get DataWarp environment variables */
4056 	if (do_pre_run) {
4057 		/* Setup "paths" operation */
4058 		timeout = bb_state.bb_config.validate_timeout * 1000;
4059 		script_argv = xcalloc(10, sizeof(char *)); /* NULL terminate */
4060 		script_argv[0] = xstrdup("dw_wlm_cli");
4061 		script_argv[1] = xstrdup("--function");
4062 		script_argv[2] = xstrdup("paths");
4063 		script_argv[3] = xstrdup("--job");
4064 		xstrfmtcat(script_argv[4], "%s/script", job_dir);
4065 		script_argv[5] = xstrdup("--token");
4066 		xstrfmtcat(script_argv[6], "%u", job_ptr->job_id);
4067 		script_argv[7] = xstrdup("--pathfile");
4068 		xstrfmtcat(script_argv[8], "%s/path", job_dir);
4069 		path_file = script_argv[8];
4070 		START_TIMER;
4071 		resp_msg = run_command("paths",
4072 				       bb_state.bb_config.get_sys_state,
4073 				       script_argv, timeout, 0,
4074 				       &status);
4075 		END_TIMER;
4076 		if ((DELTA_TIMER > 200000) ||	/* 0.2 secs */
4077 		    bb_state.bb_config.debug_flag)
4078 			info("%s: %s: paths ran for %s",
4079 			     plugin_type, __func__, TIME_STR);
4080 		_log_script_argv(script_argv, resp_msg);
4081 #if 1
4082 		//FIXME: Cray API returning "job_file_valid True" but exit 1 in some cases
4083 		if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
4084 		    (!resp_msg ||
4085 		     strncmp(resp_msg, "job_file_valid True", 19))) {
4086 #else
4087 		if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
4088 #endif
4089 			error("%s: %s: paths for %pJ status:%u response:%s",
4090 			      plugin_type, __func__, job_ptr, status, resp_msg);
4091 			xfree(resp_msg);
4092 			rc = ESLURM_INVALID_BURST_BUFFER_REQUEST;
4093 			free_command_argv(script_argv);
4094 			goto fini;
4095 		} else {
4096 			_update_job_env(job_ptr, path_file);
4097 			xfree(resp_msg);
4098 		}
4099 		free_command_argv(script_argv);
4100 
4101 		/* Setup "pre_run" operation */
4102 		pre_run_argv = xcalloc(12, sizeof(char *));
4103 		pre_run_argv[0] = xstrdup("dw_wlm_cli");
4104 		pre_run_argv[1] = xstrdup("--function");
4105 		pre_run_argv[2] = xstrdup("pre_run");
4106 		pre_run_argv[3] = xstrdup("--token");
4107 		xstrfmtcat(pre_run_argv[4], "%u", job_ptr->job_id);
4108 		pre_run_argv[5] = xstrdup("--job");
4109 		xstrfmtcat(pre_run_argv[6], "%s/script", job_dir);
4110 		arg_inx = 7;
4111 		if (client_nodes_file_nid) {
4112 #if defined(HAVE_NATIVE_CRAY)
4113 			pre_run_argv[arg_inx++] = xstrdup("--nidlistfile");
4114 #else
4115 			pre_run_argv[arg_inx++] = xstrdup("--nodehostnamefile");
4116 #endif
4117 			pre_run_argv[arg_inx++] =
4118 				xstrdup(client_nodes_file_nid);
4119 		}
4120 		if (exec_host_file) {
4121 #if defined(HAVE_NATIVE_CRAY)
4122 			pre_run_argv[arg_inx++] =
4123 				xstrdup("--jobexecutionnodefilenids");
4124 #else
4125 			pre_run_argv[arg_inx++] =
4126 				xstrdup("--jobexecutionnodefile");
4127 #endif
4128 			pre_run_argv[arg_inx++] =
4129 				xstrdup(exec_host_file);
4130 		}
4131 		pre_run_args = xmalloc(sizeof(pre_run_args_t));
4132 		pre_run_args->args    = pre_run_argv;
4133 		pre_run_args->job_id  = job_ptr->job_id;
4134 		pre_run_args->timeout = bb_state.bb_config.other_timeout * 1000;
4135 		pre_run_args->user_id = job_ptr->user_id;
4136 		if (job_ptr->details) {	/* Defer launch until completion */
4137 			job_ptr->details->prolog_running++;
4138 			job_ptr->job_state |= JOB_CONFIGURING;
4139 		}
4140 
4141 		slurm_thread_create(&tid, _start_pre_run, pre_run_args);
4142 	}
4143 
4144 fini:
4145 	xfree(client_nodes_file_nid);
4146 	xfree(exec_host_file);
4147 	xfree(job_dir);
4148 	return rc;
4149 }
4150 
4151 /* Kill job from CONFIGURING state */
4152 static void _kill_job(job_record_t *job_ptr, bool hold_job)
4153 {
4154 	last_job_update = time(NULL);
4155 	job_ptr->end_time = last_job_update;
4156 	if (hold_job)
4157 		job_ptr->priority = 0;
4158 	build_cg_bitmap(job_ptr);
4159 	job_ptr->exit_code = 1;
4160 	job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
4161 	xfree(job_ptr->state_desc);
4162 	job_ptr->state_desc = xstrdup("Burst buffer pre_run error");
4163 
4164 	job_ptr->job_state  = JOB_REQUEUE;
4165 	job_completion_logger(job_ptr, true);
4166 	job_ptr->job_state = JOB_PENDING | JOB_COMPLETING;
4167 
4168 	deallocate_nodes(job_ptr, false, false, false);
4169 }
4170 
4171 static void *_start_pre_run(void *x)
4172 {
4173 	/* Locks: read job */
4174 	slurmctld_lock_t job_read_lock = {
4175 		NO_LOCK, READ_LOCK, NO_LOCK, NO_LOCK };
4176 	/* Locks: write job */
4177 	slurmctld_lock_t job_write_lock = {
4178 		NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, READ_LOCK };
4179 	pre_run_args_t *pre_run_args = (pre_run_args_t *) x;
4180 	char *resp_msg = NULL;
4181 	bb_job_t *bb_job = NULL;
4182 	int status = 0;
4183 	job_record_t *job_ptr;
4184 	bool run_kill_job = false;
4185 	uint32_t timeout;
4186 	bool hold_job = false, nodes_ready = false;
4187 	DEF_TIMERS;
4188 	track_script_rec_add(pre_run_args->job_id, 0, pthread_self());
4189 
4190 	/* Wait for node boot to complete */
4191 	while (!nodes_ready) {
4192 		lock_slurmctld(job_read_lock);
4193 		job_ptr = find_job_record(pre_run_args->job_id);
4194 		if (!job_ptr || IS_JOB_COMPLETED(job_ptr)) {
4195 			unlock_slurmctld(job_read_lock);
4196 			track_script_remove(pthread_self());
4197 			return NULL;
4198 		}
4199 		if (test_job_nodes_ready(job_ptr))
4200 			nodes_ready = true;
4201 		unlock_slurmctld(job_read_lock);
4202 		if (!nodes_ready)
4203 			sleep(60);
4204 	}
4205 
4206 	timeout = pre_run_args->timeout * 1000;
4207 
4208 	START_TIMER;
4209 	resp_msg = run_command("dws_pre_run",
4210 			       bb_state.bb_config.get_sys_state,
4211 			       pre_run_args->args, timeout, pthread_self(),
4212 			       &status);
4213 	END_TIMER;
4214 
4215 	if (track_script_broadcast(pthread_self(), status)) {
4216 		/* I was killed by slurmtrack, bail out right now */
4217 		info("%s: %s: dws_pre_run for JobId=%u terminated by slurmctld",
4218 		     plugin_type, __func__, pre_run_args->job_id);
4219 		xfree(resp_msg);
4220 		free_command_argv(pre_run_args->args);
4221 		xfree(pre_run_args);
4222 		track_script_remove(pthread_self());
4223 		return NULL;
4224 	}
4225 	/* track_script_reset_cpid(pthread_self(), 0); */
4226 
4227 	lock_slurmctld(job_write_lock);
4228 	slurm_mutex_lock(&bb_state.bb_mutex);
4229 	job_ptr = find_job_record(pre_run_args->job_id);
4230 	if ((DELTA_TIMER > 500000) ||	/* 0.5 secs */
4231 	    bb_state.bb_config.debug_flag) {
4232 		info("%s: %s: dws_pre_run for %pJ ran for %s",
4233 		     plugin_type, __func__, job_ptr, TIME_STR);
4234 	}
4235 	if (job_ptr)
4236 		bb_job = _get_bb_job(job_ptr);
4237 	_log_script_argv(pre_run_args->args, resp_msg);
4238 	if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
4239 		/* Pre-run failure */
4240 		trigger_burst_buffer();
4241 		error("%s: %s: dws_pre_run for %pJ status:%u response:%s",
4242 		      plugin_type, __func__, job_ptr, status, resp_msg);
4243 		if (job_ptr) {
4244 			_update_system_comment(job_ptr, "pre_run", resp_msg, 0);
4245 			if (IS_JOB_RUNNING(job_ptr))
4246 				run_kill_job = true;
4247 			if (bb_job) {
4248 				bb_job->state = BB_STATE_TEARDOWN;
4249 				if (bb_job->retry_cnt++ > MAX_RETRY_CNT)
4250 					hold_job = true;
4251 			}
4252 		}
4253 		_queue_teardown(pre_run_args->job_id, pre_run_args->user_id,
4254 				true);
4255 	} else if (bb_job) {
4256 		/* Pre-run success and the job's BB record exists */
4257 		if (bb_job->state == BB_STATE_ALLOC_REVOKE)
4258 			bb_job->state = BB_STATE_STAGED_IN;
4259 		else
4260 			bb_job->state = BB_STATE_RUNNING;
4261 	}
4262 	if (job_ptr) {
4263 		if (run_kill_job)
4264 			job_ptr->job_state &= ~JOB_CONFIGURING;
4265 		prolog_running_decr(job_ptr);
4266 	}
4267 	slurm_mutex_unlock(&bb_state.bb_mutex);
4268 	if (run_kill_job) {
4269 		/* bb_mutex must be unlocked before calling this */
4270 		_kill_job(job_ptr, hold_job);
4271 	}
4272 	unlock_slurmctld(job_write_lock);
4273 
4274 	xfree(resp_msg);
4275 	free_command_argv(pre_run_args->args);
4276 	xfree(pre_run_args);
4277 
4278 	track_script_remove(pthread_self());
4279 
4280 	return NULL;
4281 }
4282 
4283 /* Revoke allocation, but do not release resources.
4284  * Executed after bb_p_job_begin() if there was an allocation failure.
4285  * Does not release previously allocated resources.
4286  *
4287  * Returns a Slurm errno.
4288  */
4289 extern int bb_p_job_revoke_alloc(job_record_t *job_ptr)
4290 {
4291 	bb_job_t *bb_job = NULL;
4292 	int rc = SLURM_SUCCESS;
4293 
4294 	slurm_mutex_lock(&bb_state.bb_mutex);
4295 	if (job_ptr)
4296 		bb_job = _get_bb_job(job_ptr);
4297 	if (bb_job) {
4298 		if (bb_job->state == BB_STATE_RUNNING)
4299 			bb_job->state = BB_STATE_STAGED_IN;
4300 		else if (bb_job->state == BB_STATE_PRE_RUN)
4301 			bb_job->state = BB_STATE_ALLOC_REVOKE;
4302 	} else {
4303 		rc = SLURM_ERROR;
4304 	}
4305 	slurm_mutex_unlock(&bb_state.bb_mutex);
4306 
4307 	return rc;
4308 }
4309 
4310 /*
4311  * Trigger a job's burst buffer stage-out to begin
4312  *
4313  * Returns a Slurm errno.
4314  */
4315 extern int bb_p_job_start_stage_out(job_record_t *job_ptr)
4316 {
4317 	bb_job_t *bb_job;
4318 
4319 	if ((job_ptr->burst_buffer == NULL) ||
4320 	    (job_ptr->burst_buffer[0] == '\0'))
4321 		return SLURM_SUCCESS;
4322 
4323 	slurm_mutex_lock(&bb_state.bb_mutex);
4324 	if (bb_state.bb_config.debug_flag)
4325 		info("%s: %s: %pJ", plugin_type, __func__, job_ptr);
4326 
4327 	if (bb_state.last_load_time == 0) {
4328 		info("%s: %s: Burst buffer down, can not stage out %pJ",
4329 		      plugin_type, __func__, job_ptr);
4330 		slurm_mutex_unlock(&bb_state.bb_mutex);
4331 		return SLURM_ERROR;
4332 	}
4333 	bb_job = _get_bb_job(job_ptr);
4334 	if (!bb_job) {
4335 		/* No job buffers. Assuming use of persistent buffers only */
4336 		verbose("%s: %s: %pJ bb job record not found",
4337 			plugin_type, __func__, job_ptr);
4338 	} else if (bb_job->state < BB_STATE_RUNNING) {
4339 		/* Job never started. Just teardown the buffer */
4340 		bb_job->state = BB_STATE_TEARDOWN;
4341 		_queue_teardown(job_ptr->job_id, job_ptr->user_id, true);
4342 	} else if (bb_job->state < BB_STATE_POST_RUN) {
4343 		bb_job->state = BB_STATE_POST_RUN;
4344 		job_ptr->job_state |= JOB_STAGE_OUT;
4345 		xfree(job_ptr->state_desc);
4346 		xstrfmtcat(job_ptr->state_desc, "%s: Stage-out in progress",
4347 			   plugin_type);
4348 		_queue_stage_out(job_ptr, bb_job);
4349 	}
4350 	slurm_mutex_unlock(&bb_state.bb_mutex);
4351 
4352 	return SLURM_SUCCESS;
4353 }
4354 
4355 /*
4356  * Determine if a job's burst buffer post_run operation is complete
4357  *
4358  * RET: 0 - post_run is underway
4359  *      1 - post_run complete
4360  *     -1 - fatal error
4361  */
4362 extern int bb_p_job_test_post_run(job_record_t *job_ptr)
4363 {
4364 	bb_job_t *bb_job;
4365 	int rc = -1;
4366 
4367 	if ((job_ptr->burst_buffer == NULL) ||
4368 	    (job_ptr->burst_buffer[0] == '\0'))
4369 		return 1;
4370 
4371 	slurm_mutex_lock(&bb_state.bb_mutex);
4372 	if (bb_state.bb_config.debug_flag)
4373 		info("%s: %s: %pJ", plugin_type, __func__, job_ptr);
4374 
4375 	if (bb_state.last_load_time == 0) {
4376 		info("%s: %s: Burst buffer down, can not post_run %pJ",
4377 		      plugin_type, __func__, job_ptr);
4378 		slurm_mutex_unlock(&bb_state.bb_mutex);
4379 		return -1;
4380 	}
4381 	bb_job = bb_job_find(&bb_state, job_ptr->job_id);
4382 	if (!bb_job) {
4383 		/* No job buffers. Assuming use of persistent buffers only */
4384 		verbose("%s: %s: %pJ bb job record not found",
4385 			plugin_type, __func__, job_ptr);
4386 		rc =  1;
4387 	} else {
4388 		if (bb_job->state < BB_STATE_POST_RUN) {
4389 			rc = -1;
4390 		} else if (bb_job->state > BB_STATE_POST_RUN) {
4391 			rc =  1;
4392 		} else {
4393 			rc =  0;
4394 		}
4395 	}
4396 	slurm_mutex_unlock(&bb_state.bb_mutex);
4397 
4398 	return rc;
4399 }
4400 
4401 /*
4402  * Determine if a job's burst buffer stage-out is complete
4403  *
4404  * RET: 0 - stage-out is underway
4405  *      1 - stage-out complete
4406  *     -1 - fatal error
4407  */
4408 extern int bb_p_job_test_stage_out(job_record_t *job_ptr)
4409 {
4410 	bb_job_t *bb_job;
4411 	int rc = -1;
4412 
4413 	if ((job_ptr->burst_buffer == NULL) ||
4414 	    (job_ptr->burst_buffer[0] == '\0'))
4415 		return 1;
4416 
4417 	slurm_mutex_lock(&bb_state.bb_mutex);
4418 	if (bb_state.bb_config.debug_flag)
4419 		info("%s: %s: %pJ", plugin_type, __func__, job_ptr);
4420 
4421 	if (bb_state.last_load_time == 0) {
4422 		info("%s: %s: Burst buffer down, can not stage-out %pJ",
4423 		      plugin_type, __func__, job_ptr);
4424 		slurm_mutex_unlock(&bb_state.bb_mutex);
4425 		return -1;
4426 	}
4427 	bb_job = bb_job_find(&bb_state, job_ptr->job_id);
4428 	if (!bb_job) {
4429 		/* No job buffers. Assuming use of persistent buffers only */
4430 		verbose("%s: %s: %pJ bb job record not found",
4431 			plugin_type, __func__, job_ptr);
4432 		rc =  1;
4433 	} else {
4434 		if (bb_job->state == BB_STATE_PENDING) {
4435 			/*
4436 			 * No job BB work not started before job was killed.
4437 			 * Alternately slurmctld daemon restarted after the
4438 			 * job's BB work was completed.
4439 			 */
4440 			rc =  1;
4441 		} else if (bb_job->state < BB_STATE_POST_RUN) {
4442 			rc = -1;
4443 		} else if (bb_job->state > BB_STATE_STAGING_OUT) {
4444 			rc =  1;
4445 		} else {
4446 			rc =  0;
4447 		}
4448 	}
4449 	slurm_mutex_unlock(&bb_state.bb_mutex);
4450 
4451 	return rc;
4452 }
4453 
4454 /*
4455  * Terminate any file staging and completely release burst buffer resources
4456  *
4457  * Returns a Slurm errno.
4458  */
4459 extern int bb_p_job_cancel(job_record_t *job_ptr)
4460 {
4461 	bb_job_t *bb_job;
4462 	bb_alloc_t *bb_alloc;
4463 
4464 	slurm_mutex_lock(&bb_state.bb_mutex);
4465 	if (bb_state.bb_config.debug_flag)
4466 		info("%s: %s: %pJ", plugin_type, __func__, job_ptr);
4467 
4468 	if (bb_state.last_load_time == 0) {
4469 		info("%s: %s: Burst buffer down, can not cancel %pJ",
4470 		      plugin_type, __func__, job_ptr);
4471 		slurm_mutex_unlock(&bb_state.bb_mutex);
4472 		return SLURM_ERROR;
4473 	}
4474 
4475 	bb_job = _get_bb_job(job_ptr);
4476 	if (!bb_job) {
4477 		/* Nothing ever allocated, nothing to clean up */
4478 	} else if (bb_job->state == BB_STATE_PENDING) {
4479 		bb_job->state = BB_STATE_COMPLETE;  /* Nothing to clean up */
4480 	} else {
4481 		/* Note: Persistent burst buffer actions already completed
4482 		 * for the job are not reversed */
4483 		bb_job->state = BB_STATE_TEARDOWN;
4484 		bb_alloc = bb_find_alloc_rec(&bb_state, job_ptr);
4485 		if (bb_alloc) {
4486 			bb_alloc->state = BB_STATE_TEARDOWN;
4487 			bb_alloc->state_time = time(NULL);
4488 			bb_state.last_update_time = time(NULL);
4489 
4490 		}
4491 		_queue_teardown(job_ptr->job_id, job_ptr->user_id, true);
4492 	}
4493 	slurm_mutex_unlock(&bb_state.bb_mutex);
4494 
4495 	return SLURM_SUCCESS;
4496 }
4497 
4498 static void _free_create_args(create_buf_data_t *create_args)
4499 {
4500 	if (create_args) {
4501 		xfree(create_args->access);
4502 		xfree(create_args->job_script);
4503 		xfree(create_args->name);
4504 		xfree(create_args->pool);
4505 		xfree(create_args->type);
4506 		xfree(create_args);
4507 	}
4508 }
4509 
4510 /*
4511  * Create/destroy persistent burst buffers
4512  * job_ptr IN - job to operate upon
4513  * bb_job IN - job's burst buffer data
4514  * job_ready IN - if true, job is ready to run now, if false then do not
4515  *                delete persistent buffers
4516  * Returns count of buffer create/destroy requests which are pending
4517  */
4518 static int _create_bufs(job_record_t *job_ptr, bb_job_t *bb_job,
4519 			bool job_ready)
4520 {
4521 	create_buf_data_t *create_args;
4522 	bb_buf_t *buf_ptr;
4523 	bb_alloc_t *bb_alloc;
4524 	int i, hash_inx, rc = 0;
4525 	pthread_t tid;
4526 
4527 	xassert(bb_job);
4528 	for (i = 0, buf_ptr = bb_job->buf_ptr; i < bb_job->buf_cnt;
4529 	     i++, buf_ptr++) {
4530 		if ((buf_ptr->state == BB_STATE_ALLOCATING) ||
4531 		    (buf_ptr->state == BB_STATE_DELETING)) {
4532 			rc++;
4533 		} else if (buf_ptr->state != BB_STATE_PENDING) {
4534 			;	/* Nothing to do */
4535 		} else if ((buf_ptr->flags == BB_FLAG_BB_OP) &&
4536 			   buf_ptr->create) {	/* Create the buffer */
4537 			bb_alloc = bb_find_name_rec(buf_ptr->name,
4538 						    job_ptr->user_id,
4539 						    &bb_state);
4540 			if (bb_alloc &&
4541 			    (bb_alloc->user_id != job_ptr->user_id)) {
4542 				info("Attempt by %pJ user %u to create duplicate persistent burst buffer named %s and currently owned by user %u",
4543 				      job_ptr, job_ptr->user_id,
4544 				      buf_ptr->name, bb_alloc->user_id);
4545 				job_ptr->priority = 0;
4546 				job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
4547 				xfree(job_ptr->state_desc);
4548 				job_ptr->state_desc = xstrdup(
4549 					"Burst buffer create_persistent error");
4550 				buf_ptr->state = BB_STATE_COMPLETE;
4551 				_update_system_comment(job_ptr,
4552 						       "create_persistent",
4553 						       "Duplicate buffer name",
4554 						       0);
4555 				rc++;
4556 				break;
4557 			} else if (bb_alloc) {
4558 				/* Duplicate create likely result of requeue */
4559 				debug("Attempt by %pJ to create duplicate persistent burst buffer named %s",
4560 				      job_ptr, buf_ptr->name);
4561 				buf_ptr->create = false; /* Creation complete */
4562 				if (bb_job->persist_add >= bb_alloc->size) {
4563 					bb_job->persist_add -= bb_alloc->size;
4564 				} else {
4565 					error("%s: %s: Persistent buffer size underflow for %pJ",
4566 					      plugin_type, __func__, job_ptr);
4567 					bb_job->persist_add = 0;
4568 				}
4569 				continue;
4570 			}
4571 			rc++;
4572 			if (!buf_ptr->pool) {
4573 				buf_ptr->pool =
4574 					xstrdup(bb_state.bb_config.default_pool);
4575 			}
4576 			bb_limit_add(job_ptr->user_id, buf_ptr->size,
4577 				     buf_ptr->pool, &bb_state, true);
4578 			bb_job->state = BB_STATE_ALLOCATING;
4579 			buf_ptr->state = BB_STATE_ALLOCATING;
4580 			create_args = xmalloc(sizeof(create_buf_data_t));
4581 			create_args->access = xstrdup(buf_ptr->access);
4582 			create_args->job_id = job_ptr->job_id;
4583 			create_args->name = xstrdup(buf_ptr->name);
4584 			create_args->pool = xstrdup(buf_ptr->pool);
4585 			create_args->size = buf_ptr->size;
4586 			create_args->type = xstrdup(buf_ptr->type);
4587 			create_args->user_id = job_ptr->user_id;
4588 
4589 			slurm_thread_create(&tid, _create_persistent,
4590 					    create_args);
4591 		} else if ((buf_ptr->flags == BB_FLAG_BB_OP) &&
4592 			   buf_ptr->destroy && job_ready) {
4593 			/* Delete the buffer */
4594 			bb_alloc = bb_find_name_rec(buf_ptr->name,
4595 						    job_ptr->user_id,
4596 						    &bb_state);
4597 			if (!bb_alloc) {
4598 				/* Ignore request if named buffer not found */
4599 				info("%s: destroy_persistent: No burst buffer with name '%s' found for %pJ",
4600 				     plugin_type, buf_ptr->name, job_ptr);
4601 				continue;
4602 			}
4603 			rc++;
4604 			if ((bb_alloc->user_id != job_ptr->user_id) &&
4605 			    !validate_super_user(job_ptr->user_id)) {
4606 				info("%s: destroy_persistent: Attempt by user %u %pJ to destroy buffer %s owned by user %u",
4607 				     plugin_type, job_ptr->user_id, job_ptr,
4608 				     buf_ptr->name, bb_alloc->user_id);
4609 				job_ptr->state_reason = FAIL_BURST_BUFFER_OP;
4610 				xstrfmtcat(job_ptr->state_desc,
4611 					   "%s: Delete buffer %s permission "
4612 					   "denied",
4613 					   plugin_type, buf_ptr->name);
4614 				job_ptr->priority = 0;  /* Hold job */
4615 				continue;
4616 			}
4617 
4618 			bb_job->state = BB_STATE_DELETING;
4619 			buf_ptr->state = BB_STATE_DELETING;
4620 			create_args = xmalloc(sizeof(create_buf_data_t));
4621 			create_args->hurry = buf_ptr->hurry;
4622 			create_args->job_id = job_ptr->job_id;
4623 			hash_inx = job_ptr->job_id % 10;
4624 			xstrfmtcat(create_args->job_script,
4625 				   "%s/hash.%d/job.%u/script",
4626 				   state_save_loc, hash_inx, job_ptr->job_id);
4627 			create_args->name = xstrdup(buf_ptr->name);
4628 			create_args->user_id = job_ptr->user_id;
4629 
4630 			slurm_thread_create(&tid, _destroy_persistent,
4631 					    create_args);
4632 		} else if ((buf_ptr->flags == BB_FLAG_BB_OP) &&
4633 			   buf_ptr->destroy) {
4634 			rc++;
4635 		} else if ((buf_ptr->flags != BB_FLAG_BB_OP) &&
4636 			   buf_ptr->use) {
4637 			/*
4638 			 * Persistent buffer not created or destroyed, but used.
4639 			 * Just check for existence
4640 			 */
4641 			bb_alloc = bb_find_name_rec(buf_ptr->name,
4642 						    job_ptr->user_id,
4643 						    &bb_state);
4644 			if (bb_alloc && (bb_alloc->state == BB_STATE_ALLOCATED))
4645 				bb_job->state = BB_STATE_ALLOCATED;
4646 			else
4647 				rc++;
4648 		}
4649 	}
4650 
4651 	return rc;
4652 }
4653 
4654 /* Test for the existence of persistent burst buffers to be used (but not
4655  * created) by this job. Return true of they are all ready */
4656 static bool _test_persistent_use_ready(bb_job_t *bb_job,
4657 				       job_record_t *job_ptr)
4658 {
4659 	int i, not_ready_cnt = 0;
4660 	bb_alloc_t *bb_alloc;
4661 	bb_buf_t *buf_ptr;
4662 
4663 	xassert(bb_job);
4664 	for (i = 0, buf_ptr = bb_job->buf_ptr; i < bb_job->buf_cnt;
4665 	     i++, buf_ptr++) {
4666 		if (buf_ptr->create || buf_ptr->destroy)
4667 			continue;
4668 		bb_alloc = bb_find_name_rec(buf_ptr->name, job_ptr->user_id,
4669 					    &bb_state);
4670 		if (bb_alloc && (bb_alloc->state == BB_STATE_ALLOCATED)) {
4671 			bb_job->state = BB_STATE_ALLOCATED;
4672 		} else {
4673 			not_ready_cnt++;
4674 			break;
4675 		}
4676 	}
4677 	if (not_ready_cnt != 0)
4678 		return false;
4679 	return true;
4680 }
4681 
4682 /* Reset data structures based upon a change in buffer state
4683  * IN user_id - User effected
4684  * IN job_id - Job effected
4685  * IN name - Buffer name
4686  * IN new_state - New buffer state
4687  * IN buf_size - Size of created burst buffer only, used to decrement remaining
4688  *               space requirement for the job
4689  */
4690 static void _reset_buf_state(uint32_t user_id, uint32_t job_id, char *name,
4691 			     int new_state, uint64_t buf_size)
4692 {
4693 	bb_buf_t *buf_ptr;
4694 	bb_job_t *bb_job;
4695 	int i, old_state;
4696 	bool active_buf = false;
4697 
4698 	bb_job = bb_job_find(&bb_state, job_id);
4699 	if (!bb_job) {
4700 		error("%s: %s: Could not find job record for JobId=%u",
4701 		      plugin_type, __func__, job_id);
4702 		return;
4703 	}
4704 
4705 	/* Update the buffer's state in job record */
4706 	for (i = 0, buf_ptr = bb_job->buf_ptr; i < bb_job->buf_cnt;
4707 	     i++, buf_ptr++) {
4708 		if (xstrcmp(name, buf_ptr->name))
4709 			continue;
4710 		old_state = buf_ptr->state;
4711 		buf_ptr->state = new_state;
4712 		if ((old_state == BB_STATE_ALLOCATING) &&
4713 		    (new_state == BB_STATE_PENDING)) {
4714 			bb_limit_rem(user_id, buf_ptr->size, buf_ptr->pool,
4715 				     &bb_state);
4716 		}
4717 		if ((old_state == BB_STATE_DELETING) &&
4718 		    (new_state == BB_STATE_PENDING)) {
4719 			bb_limit_rem(user_id, buf_ptr->size, buf_ptr->pool,
4720 				     &bb_state);
4721 		}
4722 		if ((old_state == BB_STATE_ALLOCATING) &&
4723 		    (new_state == BB_STATE_ALLOCATED)  &&
4724 		    ((name[0] < '0') || (name[0] > '9'))) {
4725 			buf_ptr->create = false;  /* Buffer creation complete */
4726 			if (bb_job->persist_add >= buf_size) {
4727 				bb_job->persist_add -= buf_size;
4728 			} else {
4729 				error("%s: %s: Persistent buffer size underflow for JobId=%u",
4730 				      plugin_type, __func__, job_id);
4731 				bb_job->persist_add = 0;
4732 			}
4733 		}
4734 		break;
4735 	}
4736 
4737 	for (i = 0, buf_ptr = bb_job->buf_ptr; i < bb_job->buf_cnt;
4738 	     i++, buf_ptr++) {
4739 		old_state = buf_ptr->state;
4740 		if ((old_state == BB_STATE_PENDING)    ||
4741 		    (old_state == BB_STATE_ALLOCATING) ||
4742 		    (old_state == BB_STATE_DELETING)   ||
4743 		    (old_state == BB_STATE_TEARDOWN)   ||
4744 		    (old_state == BB_STATE_TEARDOWN_FAIL))
4745 			active_buf = true;
4746 		break;
4747 	}
4748 	if (!active_buf) {
4749 		if (bb_job->state == BB_STATE_ALLOCATING)
4750 			bb_job->state = BB_STATE_ALLOCATED;
4751 		else if (bb_job->state == BB_STATE_DELETING)
4752 			bb_job->state = BB_STATE_DELETED;
4753 		queue_job_scheduler();
4754 	}
4755 }
4756 
4757 /* Create a persistent burst buffer based upon user specifications. */
4758 static void *_create_persistent(void *x)
4759 {
4760 	slurmctld_lock_t job_write_lock =
4761 		{ NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
4762 	create_buf_data_t *create_args = (create_buf_data_t *) x;
4763 	job_record_t *job_ptr;
4764 	bb_alloc_t *bb_alloc;
4765 	char **script_argv, *resp_msg;
4766 	int i, status = 0;
4767 	uint32_t timeout;
4768 	DEF_TIMERS;
4769 	track_script_rec_add(create_args->job_id, 0, pthread_self());
4770 
4771 	script_argv = xcalloc(20, sizeof(char *));	/* NULL terminated */
4772 	script_argv[0] = xstrdup("dw_wlm_cli");
4773 	script_argv[1] = xstrdup("--function");
4774 	script_argv[2] = xstrdup("create_persistent");
4775 	script_argv[3] = xstrdup("-c");
4776 	script_argv[4] = xstrdup("CLI");
4777 	script_argv[5] = xstrdup("-t");		/* name */
4778 	script_argv[6] = xstrdup(create_args->name);
4779 	script_argv[7] = xstrdup("-u");		/* user iD */
4780 	xstrfmtcat(script_argv[8], "%u", create_args->user_id);
4781 	script_argv[9] = xstrdup("-C");		/* configuration */
4782 	xstrfmtcat(script_argv[10], "%s:%"PRIu64"",
4783 		   create_args->pool, create_args->size);
4784 	slurm_mutex_lock(&bb_state.bb_mutex);
4785 	timeout = bb_state.bb_config.other_timeout * 1000;
4786 	slurm_mutex_unlock(&bb_state.bb_mutex);
4787 	i = 11;
4788 	if (create_args->access) {
4789 		script_argv[i++] = xstrdup("-a");
4790 		script_argv[i++] = xstrdup(create_args->access);
4791 	}
4792 	if (create_args->type) {
4793 		script_argv[i++] = xstrdup("-T");
4794 		script_argv[i++] = xstrdup(create_args->type);
4795 	}
4796 	/* NOTE: There is an optional group ID parameter available and
4797 	 * currently not used by Slurm */
4798 
4799 	START_TIMER;
4800 	resp_msg = run_command("create_persistent",
4801 			       bb_state.bb_config.get_sys_state,
4802 			       script_argv, timeout, pthread_self(),
4803 			       &status);
4804 	_log_script_argv(script_argv, resp_msg);
4805 	free_command_argv(script_argv);
4806 	END_TIMER;
4807 	info("create_persistent of %s ran for %s",
4808 	     create_args->name, TIME_STR);
4809 
4810 	if (track_script_broadcast(pthread_self(), status)) {
4811 		/* I was killed by slurmtrack, bail out right now */
4812 		info("%s:%s: create_persistent for JobId=%u terminated by slurmctld",
4813 		     plugin_type, __func__, create_args->job_id);
4814 		xfree(resp_msg);
4815 		_free_create_args(create_args);
4816 		track_script_remove(pthread_self());
4817 		return NULL;
4818 	}
4819 	/* track_script_reset_cpid(pthread_self(), 0); */
4820 
4821 	if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
4822 		trigger_burst_buffer();
4823 		error("%s: %s: For JobId=%u Name=%s status:%u response:%s",
4824 		      plugin_type, __func__, create_args->job_id,
4825 		      create_args->name, status, resp_msg);
4826 		lock_slurmctld(job_write_lock);
4827 		job_ptr = find_job_record(create_args->job_id);
4828 		if (!job_ptr) {
4829 			error("%s: %s: unable to find job record for JobId=%u",
4830 			      plugin_type, __func__, create_args->job_id);
4831 		} else {
4832 			job_ptr->state_reason = FAIL_BAD_CONSTRAINTS;
4833 			job_ptr->priority = 0;
4834 			xfree(job_ptr->state_desc);
4835 			xstrfmtcat(job_ptr->state_desc, "%s: %s: %s",
4836 				   plugin_type, __func__, resp_msg);
4837 			_update_system_comment(job_ptr, "create_persistent",
4838 					       resp_msg, 0);
4839 		}
4840 		slurm_mutex_lock(&bb_state.bb_mutex);
4841 		_reset_buf_state(create_args->user_id, create_args->job_id,
4842 				 create_args->name, BB_STATE_PENDING, 0);
4843 		bb_state.last_update_time = time(NULL);
4844 		slurm_mutex_unlock(&bb_state.bb_mutex);
4845 		unlock_slurmctld(job_write_lock);
4846 	} else if (resp_msg && strstr(resp_msg, "created")) {
4847 		assoc_mgr_lock_t assoc_locks =
4848 			{ .assoc = READ_LOCK, .qos = READ_LOCK };
4849 		lock_slurmctld(job_write_lock);
4850 		job_ptr = find_job_record(create_args->job_id);
4851 		if (!job_ptr) {
4852 			error("%s: %s: unable to find job record for JobId=%u",
4853 			      plugin_type, __func__, create_args->job_id);
4854 		}
4855 		assoc_mgr_lock(&assoc_locks);
4856 		slurm_mutex_lock(&bb_state.bb_mutex);
4857 		_reset_buf_state(create_args->user_id, create_args->job_id,
4858 				 create_args->name, BB_STATE_ALLOCATED,
4859 				 create_args->size);
4860 		bb_alloc = bb_alloc_name_rec(&bb_state, create_args->name,
4861 					     create_args->user_id);
4862 		bb_alloc->size = create_args->size;
4863 		bb_alloc->pool = xstrdup(create_args->pool);
4864 		if (job_ptr) {
4865 			bb_alloc->account   = xstrdup(job_ptr->account);
4866 			if (job_ptr->assoc_ptr) {
4867 				/* Only add the direct association id
4868 				 * here, we don't need to keep track
4869 				 * of the tree.
4870 				 */
4871 				slurmdb_assoc_rec_t *assoc = job_ptr->assoc_ptr;
4872 				bb_alloc->assoc_ptr = assoc;
4873 				xfree(bb_alloc->assocs);
4874 				bb_alloc->assocs = xstrdup_printf(
4875 					",%u,", assoc->id);
4876 			}
4877 			if (job_ptr->qos_ptr) {
4878 				slurmdb_qos_rec_t *qos_ptr = job_ptr->qos_ptr;
4879 				bb_alloc->qos_ptr = qos_ptr;
4880 				bb_alloc->qos = xstrdup(qos_ptr->name);
4881 			}
4882 
4883 			if (job_ptr->part_ptr) {
4884 				bb_alloc->partition =
4885 					xstrdup(job_ptr->part_ptr->name);
4886 			}
4887 		}
4888 		if (bb_state.bb_config.flags & BB_FLAG_EMULATE_CRAY) {
4889 			bb_alloc->create_time = time(NULL);
4890 			bb_alloc->id = ++last_persistent_id;
4891 		} else {
4892 			bb_sessions_t *sessions;
4893 			int  num_sessions = 0;
4894 			sessions = _bb_get_sessions(&num_sessions, &bb_state,
4895 						    timeout);
4896 			for (i = 0; i < num_sessions; i++) {
4897 				if (xstrcmp(sessions[i].token,
4898 					    create_args->name))
4899 					continue;
4900 				bb_alloc->create_time = sessions[i].created;
4901 				bb_alloc->id = sessions[i].id;
4902 				break;
4903 			}
4904 			_bb_free_sessions(sessions, num_sessions);
4905 		}
4906 		(void) bb_post_persist_create(job_ptr, bb_alloc, &bb_state);
4907 		bb_state.last_update_time = time(NULL);
4908 		slurm_mutex_unlock(&bb_state.bb_mutex);
4909 		assoc_mgr_unlock(&assoc_locks);
4910 		unlock_slurmctld(job_write_lock);
4911 	}
4912 	xfree(resp_msg);
4913 	_free_create_args(create_args);
4914 
4915 	track_script_remove(pthread_self());
4916 
4917 	return NULL;
4918 }
4919 
4920 /* Destroy a persistent burst buffer */
4921 static void *_destroy_persistent(void *x)
4922 {
4923 	slurmctld_lock_t job_write_lock =
4924 		{ NO_LOCK, WRITE_LOCK, NO_LOCK, NO_LOCK, NO_LOCK };
4925 	create_buf_data_t *destroy_args = (create_buf_data_t *) x;
4926 	job_record_t *job_ptr;
4927 	bb_alloc_t *bb_alloc;
4928 	char **script_argv, *resp_msg;
4929 	int status = 0;
4930 	uint32_t timeout;
4931 	DEF_TIMERS;
4932 	track_script_rec_add(destroy_args->job_id, 0, pthread_self());
4933 
4934 	slurm_mutex_lock(&bb_state.bb_mutex);
4935 	bb_alloc = bb_find_name_rec(destroy_args->name, destroy_args->user_id,
4936 				    &bb_state);
4937 	if (!bb_alloc) {
4938 		info("%s: destroy_persistent: No burst buffer with name '%s' found for JobId=%u",
4939 		     plugin_type, destroy_args->name, destroy_args->job_id);
4940 	}
4941 	timeout = bb_state.bb_config.other_timeout * 1000;
4942 	slurm_mutex_unlock(&bb_state.bb_mutex);
4943 
4944 	script_argv = xcalloc(10, sizeof(char *));	/* NULL terminated */
4945 	script_argv[0] = xstrdup("dw_wlm_cli");
4946 	script_argv[1] = xstrdup("--function");
4947 	script_argv[2] = xstrdup("teardown");
4948 	script_argv[3] = xstrdup("--token");	/* name */
4949 	script_argv[4] = xstrdup(destroy_args->name);
4950 	script_argv[5] = xstrdup("--job");	/* script */
4951 	script_argv[6] = xstrdup(destroy_args->job_script);
4952 	if (destroy_args->hurry)
4953 		script_argv[7] = xstrdup("--hurry");
4954 
4955 	START_TIMER;
4956 	resp_msg = run_command("destroy_persistent",
4957 			       bb_state.bb_config.get_sys_state,
4958 			       script_argv, timeout, pthread_self(),
4959 			       &status);
4960 	_log_script_argv(script_argv, resp_msg);
4961 	free_command_argv(script_argv);
4962 	END_TIMER;
4963 	info("destroy_persistent of %s ran for %s",
4964 	     destroy_args->name, TIME_STR);
4965 
4966 	if (track_script_broadcast(pthread_self(), status)) {
4967 		/* I was killed by slurmtrack, bail out right now */
4968 		info("%s: %s: destroy_persistent for JobId=%u terminated by slurmctld",
4969 		     plugin_type, __func__, destroy_args->job_id);
4970 		xfree(resp_msg);
4971 		_free_create_args(destroy_args);
4972 
4973 		track_script_remove(pthread_self());
4974 		return NULL;
4975 	}
4976 	/* track_script_reset_cpid(pthread_self(), 0); */
4977 
4978 	if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
4979 		trigger_burst_buffer();
4980 		error("%s: %s: destroy_persistent for JobId=%u Name=%s status:%u response:%s",
4981 		      plugin_type, __func__, destroy_args->job_id,
4982 		      destroy_args->name, status, resp_msg);
4983 		lock_slurmctld(job_write_lock);
4984 		job_ptr = find_job_record(destroy_args->job_id);
4985 		if (!job_ptr) {
4986 			error("%s: %s: unable to find job record for JobId=%u",
4987 			      plugin_type, __func__, destroy_args->job_id);
4988 		} else {
4989 			_update_system_comment(job_ptr, "teardown",
4990 					       resp_msg, 0);
4991 			job_ptr->state_reason = FAIL_BAD_CONSTRAINTS;
4992 			xfree(job_ptr->state_desc);
4993 			xstrfmtcat(job_ptr->state_desc, "%s: %s: %s",
4994 				   plugin_type, __func__, resp_msg);
4995 		}
4996 		slurm_mutex_lock(&bb_state.bb_mutex);
4997 		_reset_buf_state(destroy_args->user_id, destroy_args->job_id,
4998 				 destroy_args->name, BB_STATE_PENDING, 0);
4999 		bb_state.last_update_time = time(NULL);
5000 		slurm_mutex_unlock(&bb_state.bb_mutex);
5001 		unlock_slurmctld(job_write_lock);
5002 	} else {
5003 		assoc_mgr_lock_t assoc_locks =
5004 			{ .assoc = READ_LOCK, .qos = READ_LOCK };
5005 		/* assoc_mgr needs locking to call bb_post_persist_delete */
5006 		if (bb_alloc)
5007 			assoc_mgr_lock(&assoc_locks);
5008 		slurm_mutex_lock(&bb_state.bb_mutex);
5009 		_reset_buf_state(destroy_args->user_id, destroy_args->job_id,
5010 				 destroy_args->name, BB_STATE_DELETED, 0);
5011 
5012 		/* Modify internal buffer record for purging */
5013 		if (bb_alloc) {
5014 			bb_alloc->state = BB_STATE_COMPLETE;
5015 			bb_alloc->job_id = destroy_args->job_id;
5016 			bb_alloc->state_time = time(NULL);
5017 			bb_limit_rem(bb_alloc->user_id, bb_alloc->size,
5018 				     bb_alloc->pool, &bb_state);
5019 
5020 			(void) bb_post_persist_delete(bb_alloc, &bb_state);
5021 
5022 			(void) bb_free_alloc_rec(&bb_state, bb_alloc);
5023 		}
5024 		bb_state.last_update_time = time(NULL);
5025 		slurm_mutex_unlock(&bb_state.bb_mutex);
5026 		if (bb_alloc)
5027 			assoc_mgr_unlock(&assoc_locks);
5028 	}
5029 	xfree(resp_msg);
5030 	_free_create_args(destroy_args);
5031 
5032 	track_script_remove(pthread_self());
5033 
5034 	return NULL;
5035 }
5036 
5037 /* _bb_get_configs()
5038  *
5039  * Handle the JSON stream with configuration info (instance use details).
5040  */
5041 static bb_configs_t *
5042 _bb_get_configs(int *num_ent, bb_state_t *state_ptr, uint32_t timeout)
5043 {
5044 	bb_configs_t *ents = NULL;
5045 	json_object *j;
5046 	json_object_iter iter;
5047 	int status = 0;
5048 	DEF_TIMERS;
5049 	char *resp_msg;
5050 	char **script_argv;
5051 
5052 	script_argv = xcalloc(10, sizeof(char *));	/* NULL terminated */
5053 	script_argv[0] = xstrdup("dw_wlm_cli");
5054 	script_argv[1] = xstrdup("--function");
5055 	script_argv[2] = xstrdup("show_configurations");
5056 
5057 	START_TIMER;
5058 	resp_msg = run_command("show_configurations",
5059 			       state_ptr->bb_config.get_sys_state,
5060 			       script_argv, timeout, 0, &status);
5061 	END_TIMER;
5062 	if (bb_state.bb_config.debug_flag)
5063 		debug("%s: %s: show_configurations ran for %s",
5064 		      plugin_type, __func__, TIME_STR);
5065 	_log_script_argv(script_argv, resp_msg);
5066 	free_command_argv(script_argv);
5067 #if 0
5068 	if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
5069 #else
5070 //FIXME: Cray bug: API returning error if no configurations, use above code when fixed
5071 	if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
5072 	    (!resp_msg || (resp_msg[0] != '{'))) {
5073 #endif
5074 		trigger_burst_buffer();
5075 		error("%s: %s: show_configurations status:%u response:%s",
5076 		      plugin_type, __func__, status, resp_msg);
5077 	}
5078 	if (resp_msg == NULL) {
5079 		info("%s: %s: %s returned no configurations",
5080 		     plugin_type, __func__, state_ptr->bb_config.get_sys_state);
5081 		return ents;
5082 	}
5083 
5084 
5085 	_python2json(resp_msg);
5086 	j = json_tokener_parse(resp_msg);
5087 	if (j == NULL) {
5088 		error("%s: %s: json parser failed on \"%s\"",
5089 		      plugin_type, __func__, resp_msg);
5090 		xfree(resp_msg);
5091 		return ents;
5092 	}
5093 	xfree(resp_msg);
5094 
5095 	json_object_object_foreachC(j, iter) {
5096 		if (ents) {
5097 			error("%s: %s: Multiple configuration objects",
5098 			      plugin_type, __func__);
5099 			break;
5100 		}
5101 		ents = _json_parse_configs_array(j, iter.key, num_ent);
5102 	}
5103 	json_object_put(j);	/* Frees json memory */
5104 
5105 	return ents;
5106 }
5107 
5108 /* _bb_get_instances()
5109  *
5110  * Handle the JSON stream with instance info (resource reservations).
5111  */
5112 static bb_instances_t *
5113 _bb_get_instances(int *num_ent, bb_state_t *state_ptr, uint32_t timeout)
5114 {
5115 	bb_instances_t *ents = NULL;
5116 	json_object *j;
5117 	json_object_iter iter;
5118 	int status = 0;
5119 	DEF_TIMERS;
5120 	char *resp_msg;
5121 	char **script_argv;
5122 
5123 	script_argv = xcalloc(10, sizeof(char *));	/* NULL terminated */
5124 	script_argv[0] = xstrdup("dw_wlm_cli");
5125 	script_argv[1] = xstrdup("--function");
5126 	script_argv[2] = xstrdup("show_instances");
5127 
5128 	START_TIMER;
5129 	resp_msg = run_command("show_instances",
5130 			       state_ptr->bb_config.get_sys_state,
5131 			       script_argv, timeout, 0, &status);
5132 	END_TIMER;
5133 	if (bb_state.bb_config.debug_flag)
5134 		debug("%s: %s: show_instances ran for %s",
5135 		      plugin_type, __func__, TIME_STR);
5136 	_log_script_argv(script_argv, resp_msg);
5137 	free_command_argv(script_argv);
5138 #if 0
5139 	if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
5140 #else
5141 //FIXME: Cray bug: API returning error if no instances, use above code when fixed
5142 	if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
5143 	    (!resp_msg || (resp_msg[0] != '{'))) {
5144 #endif
5145 		trigger_burst_buffer();
5146 		error("%s: %s: show_instances status:%u response:%s",
5147 		      plugin_type, __func__, status, resp_msg);
5148 	}
5149 	if (resp_msg == NULL) {
5150 		info("%s: %s: %s returned no instances",
5151 		     plugin_type, __func__, state_ptr->bb_config.get_sys_state);
5152 		return ents;
5153 	}
5154 
5155 	_python2json(resp_msg);
5156 	j = json_tokener_parse(resp_msg);
5157 	if (j == NULL) {
5158 		error("%s: %s: json parser failed on \"%s\"",
5159 		      plugin_type, __func__, resp_msg);
5160 		xfree(resp_msg);
5161 		return ents;
5162 	}
5163 	xfree(resp_msg);
5164 
5165 	json_object_object_foreachC(j, iter) {
5166 		if (ents) {
5167 			error("%s: %s: Multiple instance objects",
5168 			      plugin_type, __func__);
5169 			break;
5170 		}
5171 		ents = _json_parse_instances_array(j, iter.key, num_ent);
5172 	}
5173 	json_object_put(j);	/* Frees json memory */
5174 
5175 	return ents;
5176 }
5177 
5178 /* _bb_get_pools()
5179  *
5180  * Handle the JSON stream with resource pool info (available resource type).
5181  */
5182 static bb_pools_t *
5183 _bb_get_pools(int *num_ent, bb_state_t *state_ptr, uint32_t timeout)
5184 {
5185 	bb_pools_t *ents = NULL;
5186 	json_object *j;
5187 	json_object_iter iter;
5188 	int status = 0;
5189 	DEF_TIMERS;
5190 	char *resp_msg;
5191 	char **script_argv;
5192 
5193 	script_argv = xcalloc(10, sizeof(char *));	/* NULL terminated */
5194 	script_argv[0] = xstrdup("dw_wlm_cli");
5195 	script_argv[1] = xstrdup("--function");
5196 	script_argv[2] = xstrdup("pools");
5197 
5198 	START_TIMER;
5199 	resp_msg = run_command("pools",
5200 			       state_ptr->bb_config.get_sys_state,
5201 			       script_argv, timeout, 0, &status);
5202 	END_TIMER;
5203 	if (bb_state.bb_config.debug_flag) {
5204 		/* Only log pools data if different to limit volume of logs */
5205 		static uint32_t last_csum = 0;
5206 		uint32_t i, resp_csum = 0;
5207 		debug("%s: %s: pools ran for %s",
5208 		      plugin_type, __func__, TIME_STR);
5209 		for (i = 0; resp_msg[i]; i++)
5210 			resp_csum += ((i * resp_msg[i]) % 1000000);
5211 		if (last_csum != resp_csum)
5212 			_log_script_argv(script_argv, resp_msg);
5213 		last_csum = resp_csum;
5214 	}
5215 	free_command_argv(script_argv);
5216 	if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
5217 		trigger_burst_buffer();
5218 		error("%s: %s: pools status:%u response:%s",
5219 		      plugin_type, __func__, status, resp_msg);
5220 	}
5221 	if (resp_msg == NULL) {
5222 		error("%s: %s: %s returned no pools",
5223 		      plugin_type, __func__,
5224 		      state_ptr->bb_config.get_sys_state);
5225 		return ents;
5226 	}
5227 
5228 	_python2json(resp_msg);
5229 	j = json_tokener_parse(resp_msg);
5230 	if (j == NULL) {
5231 		error("%s: %s: json parser failed on \"%s\"",
5232 		      plugin_type, __func__, resp_msg);
5233 		xfree(resp_msg);
5234 		return ents;
5235 	}
5236 	xfree(resp_msg);
5237 
5238 	json_object_object_foreachC(j, iter) {
5239 		if (ents) {
5240 			error("%s: %s: Multiple pool objects",
5241 			      plugin_type, __func__);
5242 			break;
5243 		}
5244 		ents = _json_parse_pools_array(j, iter.key, num_ent);
5245 	}
5246 	json_object_put(j);	/* Frees json memory */
5247 
5248 	return ents;
5249 }
5250 
5251 static bb_sessions_t *
5252 _bb_get_sessions(int *num_ent, bb_state_t *state_ptr, uint32_t timeout)
5253 {
5254 	bb_sessions_t *ents = NULL;
5255 	json_object *j;
5256 	json_object_iter iter;
5257 	int status = 0;
5258 	DEF_TIMERS;
5259 	char *resp_msg;
5260 	char **script_argv;
5261 
5262 	script_argv = xcalloc(10, sizeof(char *));	/* NULL terminated */
5263 	script_argv[0] = xstrdup("dw_wlm_cli");
5264 	script_argv[1] = xstrdup("--function");
5265 	script_argv[2] = xstrdup("show_sessions");
5266 
5267 	START_TIMER;
5268 	resp_msg = run_command("show_sessions",
5269 			       state_ptr->bb_config.get_sys_state,
5270 			       script_argv, timeout, 0, &status);
5271 	END_TIMER;
5272 	if (bb_state.bb_config.debug_flag)
5273 		debug("%s: %s: show_sessions ran for %s",
5274 		      plugin_type, __func__, TIME_STR);
5275 	_log_script_argv(script_argv, resp_msg);
5276 	free_command_argv(script_argv);
5277 #if 0
5278 	if (!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) {
5279 #else
5280 //FIXME: Cray bug: API returning error if no sessions, use above code when fixed
5281 	if ((!WIFEXITED(status) || (WEXITSTATUS(status) != 0)) &&
5282 	    (!resp_msg || (resp_msg[0] != '{'))) {
5283 #endif
5284 		trigger_burst_buffer();
5285 		error("%s: %s: show_sessions status:%u response:%s",
5286 		      plugin_type, __func__, status, resp_msg);
5287 	}
5288 	if (resp_msg == NULL) {
5289 		info("%s: %s: %s returned no sessions",
5290 		     plugin_type, __func__, state_ptr->bb_config.get_sys_state);
5291 		free_command_argv(script_argv);
5292 		return ents;
5293 	}
5294 
5295 	_python2json(resp_msg);
5296 	j = json_tokener_parse(resp_msg);
5297 	if (j == NULL) {
5298 		error("%s: %s: json parser failed on \"%s\"",
5299 		      plugin_type, __func__, resp_msg);
5300 		xfree(resp_msg);
5301 		return ents;
5302 	}
5303 	xfree(resp_msg);
5304 
5305 	json_object_object_foreachC(j, iter) {
5306 		if (ents) {
5307 			error("%s: %s: Multiple session objects",
5308 			      plugin_type, __func__);
5309 			break;
5310 		}
5311 		ents = _json_parse_sessions_array(j, iter.key, num_ent);
5312 	}
5313 	json_object_put(j);	/* Frees json memory */
5314 
5315 	return ents;
5316 }
5317 
5318 /* _bb_free_configs()
5319  */
5320 static void
5321 _bb_free_configs(bb_configs_t *ents, int num_ent)
5322 {
5323 	xfree(ents);
5324 }
5325 
5326 /* _bb_free_instances()
5327  */
5328 static void
5329 _bb_free_instances(bb_instances_t *ents, int num_ent)
5330 {
5331 	xfree(ents);
5332 }
5333 
5334 /* _bb_free_pools()
5335  */
5336 static void
5337 _bb_free_pools(bb_pools_t *ents, int num_ent)
5338 {
5339 	int i;
5340 
5341 	for (i = 0; i < num_ent; i++) {
5342 		xfree(ents[i].id);
5343 		xfree(ents[i].units);
5344 	}
5345 
5346 	xfree(ents);
5347 }
5348 
5349 /* _bb_free_sessions()
5350  */
5351 static void
5352 _bb_free_sessions(bb_sessions_t *ents, int num_ent)
5353 {
5354 	int i;
5355 
5356 	for (i = 0; i < num_ent; i++) {
5357 		xfree(ents[i].token);
5358 	}
5359 
5360 	xfree(ents);
5361 }
5362 
5363 /* _json_parse_configs_array()
5364  */
5365 static bb_configs_t *
5366 _json_parse_configs_array(json_object *jobj, char *key, int *num)
5367 {
5368 	json_object *jarray;
5369 	int i;
5370 	json_object *jvalue;
5371 	bb_configs_t *ents;
5372 
5373 	jarray = jobj;
5374 	json_object_object_get_ex(jobj, key, &jarray);
5375 
5376 	*num = json_object_array_length(jarray);
5377 	ents = xcalloc(*num, sizeof(bb_configs_t));
5378 
5379 	for (i = 0; i < *num; i++) {
5380 		jvalue = json_object_array_get_idx(jarray, i);
5381 		_json_parse_configs_object(jvalue, &ents[i]);
5382 	}
5383 
5384 	return ents;
5385 }
5386 
5387 /* _json_parse_instances_array()
5388  */
5389 static bb_instances_t *
5390 _json_parse_instances_array(json_object *jobj, char *key, int *num)
5391 {
5392 	json_object *jarray;
5393 	int i;
5394 	json_object *jvalue;
5395 	bb_instances_t *ents;
5396 
5397 	jarray = jobj;
5398 	json_object_object_get_ex(jobj, key, &jarray);
5399 
5400 	*num = json_object_array_length(jarray);
5401 	ents = xcalloc(*num, sizeof(bb_instances_t));
5402 
5403 	for (i = 0; i < *num; i++) {
5404 		jvalue = json_object_array_get_idx(jarray, i);
5405 		_json_parse_instances_object(jvalue, &ents[i]);
5406 	}
5407 
5408 	return ents;
5409 }
5410 
5411 /* _json_parse_pools_array()
5412  */
5413 static bb_pools_t *
5414 _json_parse_pools_array(json_object *jobj, char *key, int *num)
5415 {
5416 	json_object *jarray;
5417 	int i;
5418 	json_object *jvalue;
5419 	bb_pools_t *ents;
5420 
5421 	jarray = jobj;
5422 	json_object_object_get_ex(jobj, key, &jarray);
5423 
5424 	*num = json_object_array_length(jarray);
5425 	ents = xcalloc(*num, sizeof(bb_pools_t));
5426 
5427 	for (i = 0; i < *num; i++) {
5428 		jvalue = json_object_array_get_idx(jarray, i);
5429 		_json_parse_pools_object(jvalue, &ents[i]);
5430 	}
5431 
5432 	return ents;
5433 }
5434 
5435 /* _json_parse_sessions_array()
5436  */
5437 static bb_sessions_t *
5438 _json_parse_sessions_array(json_object *jobj, char *key, int *num)
5439 {
5440 	json_object *jarray;
5441 	int i;
5442 	json_object *jvalue;
5443 	bb_sessions_t *ents;
5444 
5445 	jarray = jobj;
5446 	json_object_object_get_ex(jobj, key, &jarray);
5447 
5448 	*num = json_object_array_length(jarray);
5449 	ents = xcalloc(*num, sizeof(bb_sessions_t));
5450 
5451 	for (i = 0; i < *num; i++) {
5452 		jvalue = json_object_array_get_idx(jarray, i);
5453 		_json_parse_sessions_object(jvalue, &ents[i]);
5454 	}
5455 
5456 	return ents;
5457 }
5458 
5459 /* Parse "links" object in the "configuration" object */
5460 static void
5461 _parse_config_links(json_object *instance, bb_configs_t *ent)
5462 {
5463 	enum json_type type;
5464 	struct json_object_iter iter;
5465 	int x;
5466 
5467 	json_object_object_foreachC(instance, iter) {
5468 		type = json_object_get_type(iter.val);
5469 		switch (type) {
5470 		case json_type_int:
5471 			x = json_object_get_int64(iter.val);
5472 			if (!xstrcmp(iter.key, "instance"))
5473 				ent->instance = x;
5474 			break;
5475 		default:
5476 			break;
5477 		}
5478 	}
5479 }
5480 
5481 /* _json_parse_configs_object()
5482  */
5483 static void
5484 _json_parse_configs_object(json_object *jobj, bb_configs_t *ent)
5485 {
5486 	enum json_type type;
5487 	struct json_object_iter iter;
5488 	int64_t x;
5489 
5490 	json_object_object_foreachC(jobj, iter) {
5491 		type = json_object_get_type(iter.val);
5492 		switch (type) {
5493 		case json_type_object:
5494 			if (xstrcmp(iter.key, "links") == 0)
5495 				_parse_config_links(iter.val, ent);
5496 			break;
5497 		case json_type_int:
5498 			x = json_object_get_int64(iter.val);
5499 			if (xstrcmp(iter.key, "id") == 0) {
5500 				ent->id = x;
5501 			}
5502 			break;
5503 		default:
5504 			break;
5505 		}
5506 	}
5507 }
5508 
5509 /* Parse "capacity" object in the "instance" object */
5510 static void
5511 _parse_instance_capacity(json_object *instance, bb_instances_t *ent)
5512 {
5513 	enum json_type type;
5514 	struct json_object_iter iter;
5515 	int64_t x;
5516 
5517 	json_object_object_foreachC(instance, iter) {
5518 		type = json_object_get_type(iter.val);
5519 		switch (type) {
5520 		case json_type_int:
5521 			x = json_object_get_int64(iter.val);
5522 			if (!xstrcmp(iter.key, "bytes"))
5523 				ent->bytes = x;
5524 			break;
5525 		default:
5526 			break;
5527 		}
5528 	}
5529 }
5530 
5531 /* Parse "links" object in the "instance" object */
5532 static void
5533 _parse_instance_links(json_object *instance, bb_instances_t *ent)
5534 {
5535 	enum json_type type;
5536 	struct json_object_iter iter;
5537 	int64_t x;
5538 
5539 	json_object_object_foreachC(instance, iter) {
5540 		type = json_object_get_type(iter.val);
5541 		switch (type) {
5542 		case json_type_int:
5543 			x = json_object_get_int64(iter.val);
5544 			if (!xstrcmp(iter.key, "session"))
5545 				ent->session = x;
5546 			break;
5547 		default:
5548 			break;
5549 		}
5550 	}
5551 }
5552 
5553 /* _json_parse_instances_object()
5554  */
5555 static void
5556 _json_parse_instances_object(json_object *jobj, bb_instances_t *ent)
5557 {
5558 	enum json_type type;
5559 	struct json_object_iter iter;
5560 	int64_t x;
5561 
5562 	json_object_object_foreachC(jobj, iter) {
5563 		type = json_object_get_type(iter.val);
5564 		switch (type) {
5565 		case json_type_object:
5566 			if (xstrcmp(iter.key, "capacity") == 0)
5567 				_parse_instance_capacity(iter.val, ent);
5568 			else if (xstrcmp(iter.key, "links") == 0)
5569 				_parse_instance_links(iter.val, ent);
5570 			break;
5571 		case json_type_int:
5572 			x = json_object_get_int64(iter.val);
5573 			if (xstrcmp(iter.key, "id") == 0) {
5574 				ent->id = x;
5575 			}
5576 			break;
5577 		default:
5578 			break;
5579 		}
5580 	}
5581 }
5582 
5583 /* _json_parse_pools_object()
5584  */
5585 static void
5586 _json_parse_pools_object(json_object *jobj, bb_pools_t *ent)
5587 {
5588 	enum json_type type;
5589 	struct json_object_iter iter;
5590 	int64_t x;
5591 	const char *p;
5592 
5593 	json_object_object_foreachC(jobj, iter) {
5594 		type = json_object_get_type(iter.val);
5595 		switch (type) {
5596 		case json_type_int:
5597 			x = json_object_get_int64(iter.val);
5598 			if (xstrcmp(iter.key, "granularity") == 0) {
5599 				ent->granularity = x;
5600 			} else if (xstrcmp(iter.key, "quantity") == 0) {
5601 				ent->quantity = x;
5602 			} else if (xstrcmp(iter.key, "free") == 0) {
5603 				ent->free = x;
5604 			}
5605 			break;
5606 		case json_type_string:
5607 			p = json_object_get_string(iter.val);
5608 			if (xstrcmp(iter.key, "id") == 0) {
5609 				ent->id = xstrdup(p);
5610 			} else if (xstrcmp(iter.key, "units") == 0) {
5611 				ent->units = xstrdup(p);
5612 			}
5613 			break;
5614 		default:
5615 			break;
5616 		}
5617 	}
5618 }
5619 
5620 /* _json_parse_session_object()
5621  */
5622 static void
5623 _json_parse_sessions_object(json_object *jobj, bb_sessions_t *ent)
5624 {
5625 	enum json_type type;
5626 	struct json_object_iter iter;
5627 	int64_t x;
5628 	const char *p;
5629 
5630 	json_object_object_foreachC(jobj, iter) {
5631 		type = json_object_get_type(iter.val);
5632 		switch (type) {
5633 		case json_type_int:
5634 			x = json_object_get_int64(iter.val);
5635 			if (xstrcmp(iter.key, "created") == 0) {
5636 				ent->created = x;
5637 			} else if (xstrcmp(iter.key, "id") == 0) {
5638 				ent->id = x;
5639 			} else if (xstrcmp(iter.key, "owner") == 0) {
5640 				ent->user_id = x;
5641 			}
5642 			break;
5643 		case json_type_string:
5644 			p = json_object_get_string(iter.val);
5645 			if (xstrcmp(iter.key, "token") == 0) {
5646 				ent->token = xstrdup(p);
5647 			}
5648 		default:
5649 			break;
5650 		}
5651 	}
5652 }
5653 
5654 /*
5655  * Translate a burst buffer string to it's equivalent TRES string
5656  * (e.g. "cray:2G,generic:4M" -> "1004=2048,1005=4")
5657  * Caller must xfree the return value
5658  */
5659 extern char *bb_p_xlate_bb_2_tres_str(char *burst_buffer)
5660 {
5661 	char *save_ptr = NULL, *sep, *tmp, *tok;
5662 	char *result = NULL;
5663 	uint64_t size, total = 0;
5664 
5665 	if (!burst_buffer || (bb_state.tres_id < 1))
5666 		return result;
5667 
5668 	tmp = xstrdup(burst_buffer);
5669 	tok = strtok_r(tmp, ",", &save_ptr);
5670 	while (tok) {
5671 		sep = strchr(tok, ':');
5672 		if (sep) {
5673 			if (!xstrncmp(tok, "cray:", 5))
5674 				tok += 5;
5675 			else
5676 				tok = NULL;
5677 		}
5678 
5679 		if (tok) {
5680 			uint64_t mb_xlate = 1024 * 1024;
5681 			size = bb_get_size_num(tok,
5682 					       bb_state.bb_config.granularity);
5683 			total += (size + mb_xlate - 1) / mb_xlate;
5684 		}
5685 
5686 		tok = strtok_r(NULL, ",", &save_ptr);
5687 	}
5688 	xfree(tmp);
5689 
5690 	if (total)
5691 		xstrfmtcat(result, "%d=%"PRIu64, bb_state.tres_id, total);
5692 
5693 	return result;
5694 }
5695