1 /*
2 * Copyright (c) 2010 The Trustees of Indiana University.
3 * All rights reserved.
4 * Copyright (c) 2004-2011 The University of Tennessee and The University
5 * of Tennessee Research Foundation. All rights
6 * reserved.
7 * Copyright (c) 2017 Research Organization for Information Science
8 * and Technology (RIST). All rights reserved.
9 * $COPYRIGHT$
10 *
11 * Additional copyrights may follow
12 *
13 * $HEADER$
14 */
15
16 /*
17 *
18 */
19
20 #include "orte_config.h"
21
22 #include <string.h>
23 #include <stdlib.h>
24 #include <sys/types.h>
25 #include <sys/stat.h>
26 #include <sys/wait.h>
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif /* HAVE_UNISTD_H */
30
31 #include "orte/mca/mca.h"
32 #include "opal/mca/base/base.h"
33
34 #include "opal/mca/event/event.h"
35
36 #include "orte/constants.h"
37 #include "orte/util/show_help.h"
38 #include "opal/util/argv.h"
39 #include "opal/util/output.h"
40 #include "opal/util/opal_environ.h"
41 #include "opal/util/basename.h"
42 #include "opal/util/os_dirpath.h"
43
44 #include "opal/mca/compress/compress.h"
45 #include "opal/mca/compress/base/base.h"
46
47 #include "opal/threads/mutex.h"
48 #include "opal/threads/condition.h"
49
50 #include "orte/util/name_fns.h"
51 #include "orte/util/proc_info.h"
52 #include "orte/runtime/orte_globals.h"
53 #include "orte/runtime/orte_wait.h"
54 #include "orte/mca/errmgr/errmgr.h"
55 #include "orte/mca/rml/rml.h"
56 #include "orte/mca/rml/rml_types.h"
57 #include "orte/mca/odls/odls_types.h"
58 #include "orte/mca/filem/filem.h"
59 #include "orte/mca/filem/base/base.h"
60
61 #include "orte/mca/sstore/sstore.h"
62 #include "orte/mca/sstore/base/base.h"
63
64 #include "sstore_stage.h"
65
66 /**********
67 * Object stuff
68 **********/
69 #define SSTORE_LOCAL_NONE 0
70 #define SSTORE_LOCAL_ERROR 1
71 #define SSTORE_LOCAL_INIT 2
72 #define SSTORE_LOCAL_READY 3
73 #define SSTORE_LOCAL_SYNCED 4
74 #define SSTORE_LOCAL_DONE 5
75
76 struct orte_sstore_stage_local_snapshot_info_t {
77 /** List super object */
78 opal_list_item_t super;
79
80 /** */
81 orte_sstore_base_handle_t id;
82
83 /** Status */
84 int status;
85
86 /** Sequence Number */
87 int seq_num;
88
89 /** Global Reference Name */
90 char * global_ref_name;
91
92 /** Local Location Format String */
93 char * location_fmt;
94
95 /** Local Cache Location Format String */
96 char * cache_location_fmt;
97
98 /* Application info handles*/
99 opal_list_t *app_info_handle;
100
101 /** Compress Component used */
102 char * compress_comp;
103
104 /** Compress Component postfix */
105 char * compress_postfix;
106
107 /** Is this checkpoint representing a migration? */
108 bool migrating;
109 };
110 typedef struct orte_sstore_stage_local_snapshot_info_t orte_sstore_stage_local_snapshot_info_t;
111 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_stage_local_snapshot_info_t);
112
113 void orte_sstore_stage_local_snapshot_info_construct(orte_sstore_stage_local_snapshot_info_t *info);
114 void orte_sstore_stage_local_snapshot_info_destruct( orte_sstore_stage_local_snapshot_info_t *info);
115
116 OBJ_CLASS_INSTANCE(orte_sstore_stage_local_snapshot_info_t,
117 opal_list_item_t,
118 orte_sstore_stage_local_snapshot_info_construct,
119 orte_sstore_stage_local_snapshot_info_destruct);
120
121 struct orte_sstore_stage_local_app_snapshot_info_t {
122 /** List super object */
123 opal_list_item_t super;
124
125 /** Process Name associated with this entry */
126 orte_process_name_t name;
127
128 /** Local Location (Absolute Path) */
129 char * local_location;
130
131 /** Compressed Local Location (Absolute Path) */
132 char * compressed_local_location;
133
134 /** Local Cache Location (Absolute Path) */
135 char * local_cache_location;
136
137 /** Metadata File Name (Absolute Path) */
138 char * metadata_filename;
139
140 /** CRS Component used */
141 char * crs_comp;
142
143 /** If this app. skipped the checkpoint - usually for non-migrating procs */
144 bool ckpt_skipped;
145
146 /** Compression PID to wait on */
147 pid_t compress_pid;
148 };
149 typedef struct orte_sstore_stage_local_app_snapshot_info_t orte_sstore_stage_local_app_snapshot_info_t;
150 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_stage_local_app_snapshot_info_t);
151
152 void orte_sstore_stage_local_app_snapshot_info_construct(orte_sstore_stage_local_app_snapshot_info_t *info);
153 void orte_sstore_stage_local_app_snapshot_info_destruct( orte_sstore_stage_local_app_snapshot_info_t *info);
154
155 OBJ_CLASS_INSTANCE(orte_sstore_stage_local_app_snapshot_info_t,
156 opal_list_item_t,
157 orte_sstore_stage_local_app_snapshot_info_construct,
158 orte_sstore_stage_local_app_snapshot_info_destruct);
159
160
161
162 /**********
163 * Local Function and Variable Declarations
164 **********/
165 static bool is_global_listener_active = false;
166 static int sstore_stage_local_start_listener(void);
167 static int sstore_stage_local_stop_listener(void);
168 static void sstore_stage_local_recv(int status,
169 orte_process_name_t* sender,
170 opal_buffer_t* buffer,
171 orte_rml_tag_t tag,
172 void* cbdata);
173
174 static int process_global_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
175 static int process_global_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
176 static int process_global_remove(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
177 static int process_app_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
178 static int process_app_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
179
180 static orte_sstore_stage_local_snapshot_info_t *create_new_handle_info(orte_sstore_base_handle_t handle);
181 static orte_sstore_stage_local_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle);
182 static orte_sstore_stage_local_snapshot_info_t *find_handle_info_ref(char * ref, int seq);
183
184 static int append_new_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
185 orte_process_name_t *name);
186 static orte_sstore_stage_local_app_snapshot_info_t *find_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
187 orte_process_name_t *name);
188
189 static int pull_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info );
190 static int push_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info );
191
192 static int wait_all_apps_updated(orte_sstore_stage_local_snapshot_info_t *handle_info);
193
194 static int start_compression(orte_sstore_stage_local_snapshot_info_t *handle_info,
195 orte_sstore_stage_local_app_snapshot_info_t *app_info);
196 static void sstore_stage_local_compress_waitpid_cb(orte_proc_t *proc, void* cbdata);
197 static int wait_all_compressed(orte_sstore_stage_local_snapshot_info_t *handle_info);
198
199 static int orte_sstore_stage_local_preload_files(char **local_location, bool *skip_xfer,
200 char *global_loc, char *ref, char *postfix, int seq);
201
202 static int sstore_stage_create_local_dir(void);
203 static int sstore_stage_destroy_local_dir(void);
204
205 static int sstore_stage_create_cache(void);
206 static int sstore_stage_update_cache(orte_sstore_stage_local_snapshot_info_t *handle_info);
207 static int sstore_stage_destroy_cache(void);
208
209 static opal_list_t *active_handles = NULL;
210 static char * sstore_stage_local_basedir = NULL;
211
212 static char * sstore_stage_cache_basedir = NULL;
213
214 static char * sstore_stage_cache_current_dir = NULL;
215 static char * sstore_stage_cache_last_dir = NULL;
216
217 static opal_list_t * preload_filem_requests = NULL;
218
219 /**********
220 * Object stuff
221 **********/
orte_sstore_stage_local_snapshot_info_construct(orte_sstore_stage_local_snapshot_info_t * info)222 void orte_sstore_stage_local_snapshot_info_construct(orte_sstore_stage_local_snapshot_info_t *info)
223 {
224 info->id = 0;
225
226 info->status = SSTORE_LOCAL_NONE;
227
228 info->seq_num = -1;
229
230 info->global_ref_name = NULL;
231
232 info->location_fmt = NULL;
233
234 info->cache_location_fmt = NULL;
235
236 info->app_info_handle = OBJ_NEW(opal_list_t);
237
238 info->compress_comp = NULL;
239
240 info->compress_postfix = NULL;
241
242 info->migrating = false;
243 }
244
orte_sstore_stage_local_snapshot_info_destruct(orte_sstore_stage_local_snapshot_info_t * info)245 void orte_sstore_stage_local_snapshot_info_destruct( orte_sstore_stage_local_snapshot_info_t *info)
246 {
247 info->id = 0;
248
249 info->status = SSTORE_LOCAL_NONE;
250
251 info->seq_num = -1;
252
253 if( NULL != info->global_ref_name ) {
254 free( info->global_ref_name );
255 info->global_ref_name = NULL;
256 }
257
258 if( NULL != info->location_fmt ) {
259 free( info->location_fmt );
260 info->location_fmt = NULL;
261 }
262
263 if( NULL != info->cache_location_fmt ) {
264 free( info->cache_location_fmt );
265 info->cache_location_fmt = NULL;
266 }
267
268 if( NULL != info->app_info_handle ) {
269 OBJ_RELEASE(info->app_info_handle);
270 info->app_info_handle = NULL;
271 }
272
273 if( NULL != info->compress_comp ) {
274 free(info->compress_comp);
275 info->compress_comp = NULL;
276 }
277
278 if( NULL != info->compress_postfix ) {
279 free(info->compress_postfix);
280 info->compress_postfix = NULL;
281 }
282
283 info->migrating = false;
284 }
285
orte_sstore_stage_local_app_snapshot_info_construct(orte_sstore_stage_local_app_snapshot_info_t * info)286 void orte_sstore_stage_local_app_snapshot_info_construct(orte_sstore_stage_local_app_snapshot_info_t *info)
287 {
288 info->name.jobid = ORTE_JOBID_INVALID;
289 info->name.vpid = ORTE_VPID_INVALID;
290
291 info->local_location = NULL;
292 info->compressed_local_location = NULL;
293 info->local_cache_location = NULL;
294 info->metadata_filename = NULL;
295 info->crs_comp = NULL;
296 info->ckpt_skipped = false;
297 info->compress_pid = 0;
298 }
299
orte_sstore_stage_local_app_snapshot_info_destruct(orte_sstore_stage_local_app_snapshot_info_t * info)300 void orte_sstore_stage_local_app_snapshot_info_destruct( orte_sstore_stage_local_app_snapshot_info_t *info)
301 {
302 info->name.jobid = ORTE_JOBID_INVALID;
303 info->name.vpid = ORTE_VPID_INVALID;
304
305 if( NULL != info->local_location ) {
306 free(info->local_location);
307 info->local_location = NULL;
308 }
309
310 if( NULL != info->compressed_local_location ) {
311 free(info->compressed_local_location);
312 info->compressed_local_location = NULL;
313 }
314
315 if( NULL != info->local_cache_location ) {
316 free(info->local_cache_location);
317 info->local_cache_location = NULL;
318 }
319
320 if( NULL != info->metadata_filename ) {
321 free(info->metadata_filename);
322 info->metadata_filename = NULL;
323 }
324
325 if( NULL != info->crs_comp ) {
326 free(info->crs_comp);
327 info->crs_comp = NULL;
328 }
329
330 info->ckpt_skipped = false;
331
332 info->compress_pid = 0;
333 }
334
335 /******************
336 * Local functions
337 ******************/
orte_sstore_stage_local_module_init(void)338 int orte_sstore_stage_local_module_init(void)
339 {
340 int ret, exit_status = ORTE_SUCCESS;
341
342 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
343 "sstore:stage:(local): init()"));
344
345 if( NULL == active_handles ) {
346 active_handles = OBJ_NEW(opal_list_t);
347 }
348
349 if( NULL == preload_filem_requests ) {
350 preload_filem_requests = OBJ_NEW(opal_list_t);
351 }
352
353 /*
354 * Create the local storage directory
355 */
356 asprintf(&sstore_stage_local_basedir, "%s/%s/%s",
357 orte_sstore_stage_local_snapshot_dir,
358 ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
359 ORTE_SSTORE_LOCAL_SNAPSHOT_STAGE_DIR_NAME);
360 if( ORTE_SUCCESS != (ret = sstore_stage_create_local_dir()) ) {
361 ORTE_ERROR_LOG(ret);
362 exit_status = ret;
363 goto cleanup;
364 }
365
366 /*
367 * Create the local cache
368 */
369 if( orte_sstore_stage_enabled_caching ) {
370 asprintf(&sstore_stage_cache_basedir, "%s/%s/%s",
371 orte_sstore_stage_local_snapshot_dir,
372 ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
373 ORTE_SSTORE_LOCAL_SNAPSHOT_CACHE_DIR_NAME);
374
375 if( ORTE_SUCCESS != (ret = sstore_stage_create_cache()) ) {
376 ORTE_ERROR_LOG(ret);
377 exit_status = ret;
378 goto cleanup;
379 }
380 }
381
382 /*
383 * Setup a listener for the HNP/Apps
384 * We could be the HNP, in which case the listener is already registered.
385 */
386 if( !ORTE_PROC_IS_HNP ) {
387 if( ORTE_SUCCESS != (ret = sstore_stage_local_start_listener()) ) {
388 ORTE_ERROR_LOG(ret);
389 exit_status = ret;
390 goto cleanup;
391 }
392 }
393
394 cleanup:
395 return exit_status;
396 }
397
orte_sstore_stage_local_module_finalize(void)398 int orte_sstore_stage_local_module_finalize(void)
399 {
400 int ret, exit_status = ORTE_SUCCESS;
401 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
402 opal_list_item_t* item = NULL;
403 bool done = false;
404 int cur_time = 0, max_time = 120;
405
406 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
407 "sstore:stage:(local): finalize()"));
408
409 /*
410 * Wait for all active transfers to finish
411 */
412 if( !ORTE_PROC_IS_HNP ) {
413 done = false;
414 while( 0 < opal_list_get_size(active_handles) && !done ) {
415 done = true;
416 for(item = opal_list_get_first(active_handles);
417 item != opal_list_get_end(active_handles);
418 item = opal_list_get_next(item) ) {
419 handle_info = (orte_sstore_stage_local_snapshot_info_t*)item;
420 if( SSTORE_LOCAL_DONE != handle_info->status &&
421 SSTORE_LOCAL_NONE != handle_info->status &&
422 SSTORE_LOCAL_ERROR != handle_info->status ) {
423 done = false;
424 break;
425 }
426 }
427 if( done ) {
428 break;
429 }
430 else {
431 if( cur_time != 0 && cur_time % 30 == 0 ) {
432 opal_output(0, "---> Waiting for fin(): %3d / %3d\n",
433 cur_time, max_time);
434 }
435
436 opal_progress();
437 if( cur_time >= max_time ) {
438 break;
439 } else {
440 sleep(1);
441 }
442 cur_time++;
443 }
444 }
445 }
446
447 if( NULL != active_handles ) {
448 OBJ_RELEASE(active_handles);
449 }
450
451 if( NULL != preload_filem_requests ) {
452 OBJ_RELEASE(preload_filem_requests);
453 }
454
455 /*
456 * Shutdown the listener for the HNP/Apps
457 * We could be the HNP, in which case the listener is already deregistered.
458 */
459 if( !ORTE_PROC_IS_HNP ) {
460 if( ORTE_SUCCESS != (ret = sstore_stage_local_stop_listener()) ) {
461 ORTE_ERROR_LOG(ret);
462 exit_status = ret;
463 goto cleanup;
464 }
465 }
466
467 /*
468 * Destroy the local cache
469 */
470 if( orte_sstore_stage_enabled_caching ) {
471 if( ORTE_SUCCESS != (ret = sstore_stage_destroy_cache()) ) {
472 ORTE_ERROR_LOG(ret);
473 exit_status = ret;
474 goto cleanup;
475 }
476 }
477
478 /*
479 * Destroy the local storage directory
480 */
481 if( ORTE_SUCCESS != (ret = sstore_stage_destroy_local_dir()) ) {
482 ORTE_ERROR_LOG(ret);
483 exit_status = ret;
484 goto cleanup;
485 }
486
487 cleanup:
488 if( orte_sstore_stage_enabled_caching ) {
489 if( NULL != sstore_stage_cache_basedir ) {
490 free(sstore_stage_cache_basedir);
491 sstore_stage_cache_basedir = NULL;
492 }
493 }
494
495 if( NULL != sstore_stage_local_basedir ) {
496 free(sstore_stage_local_basedir);
497 sstore_stage_local_basedir = NULL;
498 }
499
500 return exit_status;
501 }
502
orte_sstore_stage_local_request_checkpoint_handle(orte_sstore_base_handle_t * handle,int seq,orte_jobid_t jobid)503 int orte_sstore_stage_local_request_checkpoint_handle(orte_sstore_base_handle_t *handle, int seq, orte_jobid_t jobid)
504 {
505 opal_output(0, "sstore:stage:(local): request_checkpoint_handle() Not implemented!");
506 return ORTE_ERR_NOT_IMPLEMENTED;
507 }
508
orte_sstore_stage_local_register(orte_sstore_base_handle_t handle)509 int orte_sstore_stage_local_register(orte_sstore_base_handle_t handle)
510 {
511 int ret, exit_status = ORTE_SUCCESS;
512 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
513
514 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
515 "sstore:stage:(local): register()"));
516
517 /*
518 * Create a handle
519 */
520 if( NULL == (handle_info = find_handle_info(handle)) ) {
521 handle_info = create_new_handle_info(handle);
522 }
523
524 /*
525 * Get basic information from Global SStore
526 */
527 if( ORTE_SUCCESS != (ret = pull_handle_info(handle_info)) ) {
528 ORTE_ERROR_LOG(ret);
529 exit_status = ret;
530 goto cleanup;
531 }
532
533 /*
534 * Wait here until the pull request has been satisfied
535 */
536 while(SSTORE_LOCAL_READY != handle_info->status &&
537 SSTORE_LOCAL_ERROR != handle_info->status ) {
538 opal_progress();
539 }
540
541 cleanup:
542 return exit_status;
543 }
544
orte_sstore_stage_local_get_attr(orte_sstore_base_handle_t handle,orte_sstore_base_key_t key,char ** value)545 int orte_sstore_stage_local_get_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char **value)
546 {
547 opal_output(0, "sstore:stage:(local): get_attr() Not implemented!");
548 return ORTE_ERR_NOT_IMPLEMENTED;
549 }
550
orte_sstore_stage_local_set_attr(orte_sstore_base_handle_t handle,orte_sstore_base_key_t key,char * value)551 int orte_sstore_stage_local_set_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char *value)
552 {
553 opal_output(0, "sstore:stage:(local): set_attr() Not implemented!");
554 return ORTE_ERR_NOT_IMPLEMENTED;
555 }
556
orte_sstore_stage_local_sync(orte_sstore_base_handle_t handle)557 int orte_sstore_stage_local_sync(orte_sstore_base_handle_t handle)
558 {
559 int ret, exit_status = ORTE_SUCCESS;
560 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
561
562 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
563 "sstore:stage:(local): sync()"));
564
565 /*
566 * Lookup the handle
567 */
568 handle_info = find_handle_info(handle);
569
570 /*
571 * Wait for all of the applications to update their metadata
572 */
573 if( ORTE_SUCCESS != (ret = wait_all_apps_updated(handle_info))) {
574 ORTE_ERROR_LOG(ret);
575 exit_status = ret;
576 goto cleanup;
577 }
578
579 /*
580 * Wait for compression to finish
581 */
582 if( orte_sstore_stage_enabled_compression ) {
583 if( ORTE_SUCCESS != (ret = wait_all_compressed(handle_info))) {
584 ORTE_ERROR_LOG(ret);
585 exit_status = ret;
586 goto cleanup;
587 }
588 }
589
590 /*
591 * Push information to the Global coordinator
592 */
593 if( ORTE_SUCCESS != (ret = push_handle_info(handle_info)) ) {
594 ORTE_ERROR_LOG(ret);
595 exit_status = ret;
596 goto cleanup;
597 }
598
599 handle_info->status = SSTORE_LOCAL_SYNCED;
600
601 cleanup:
602 return exit_status;
603 }
604
orte_sstore_stage_local_remove(orte_sstore_base_handle_t handle)605 int orte_sstore_stage_local_remove(orte_sstore_base_handle_t handle)
606 {
607 opal_output(0, "sstore:stage:(local): remove() Not implemented!");
608 return ORTE_ERR_NOT_IMPLEMENTED;
609 }
610
orte_sstore_stage_local_pack(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_base_handle_t handle)611 int orte_sstore_stage_local_pack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t handle)
612 {
613 int ret, exit_status = ORTE_SUCCESS;
614
615 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
616 "sstore:stage:(local): pack()"));
617
618 /*
619 * Lookup the handle
620 */
621
622
623 /*
624 * Pack the handle ID
625 */
626 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &handle, 1, ORTE_SSTORE_HANDLE))) {
627 ORTE_ERROR_LOG(ret);
628 exit_status = ret;
629 goto cleanup;
630 }
631
632 /*
633 * Pack any metadata
634 */
635
636 cleanup:
637 return exit_status;
638 }
639
orte_sstore_stage_local_unpack(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_base_handle_t * handle)640 int orte_sstore_stage_local_unpack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t *handle)
641 {
642 int ret, exit_status = ORTE_SUCCESS;
643 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
644 orte_std_cntr_t count;
645
646 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
647 "sstore:stage:(local): unpack()"));
648
649 /*
650 * Unpack the handle id
651 */
652 count = 1;
653 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, handle, &count, ORTE_SSTORE_HANDLE))) {
654 ORTE_ERROR_LOG(ret);
655 exit_status = ret;
656 goto cleanup;
657 }
658
659 /*
660 * Lookup the handle
661 */
662 if( NULL == (handle_info = find_handle_info(*handle)) ) {
663 handle_info = create_new_handle_info(*handle);
664 }
665
666 /*
667 * Unpack the metadata piggybacked on this message
668 */
669 if( ORTE_SUCCESS != (ret = process_global_push(peer, buffer, handle_info))) {
670 ORTE_ERROR_LOG(ret);
671 exit_status = ret;
672 goto cleanup;
673 }
674
675 cleanup:
676 return exit_status;
677 }
678
orte_sstore_stage_local_fetch_app_deps(orte_app_context_t * app)679 int orte_sstore_stage_local_fetch_app_deps(orte_app_context_t *app)
680 {
681 int ret, exit_status = ORTE_SUCCESS;
682 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
683 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
684 char **sstore_args = NULL;
685 char * req_snap_loc = NULL;
686 char * req_snap_global_ref = NULL;
687 char * req_snap_ref = NULL;
688 char * req_snap_postfix = NULL;
689 char * local_location = NULL;
690 char * req_snap_compress = NULL;
691 char * compress_local_location = NULL;
692 char * compress_ref = NULL;
693 char * tmp_str = NULL;
694 int req_snap_seq = 0;
695 int i;
696 orte_proc_t *child = NULL;
697 int loc_argc = 0;
698 bool skip_xfer = false;
699 char *sload = NULL;
700
701 orte_get_attribute(&app->attributes, ORTE_APP_SSTORE_LOAD, (void **)&sload, OPAL_STRING);
702
703 if(!ORTE_FLAG_TEST(app, ORTE_APP_FLAG_USED_ON_NODE) || NULL == sload) {
704 OPAL_OUTPUT_VERBOSE((30, mca_sstore_stage_component.super.output_handle,
705 "sstore:stage:(local): fetch_app_deps(%3d): Not for this daemon (%s, %d, %s)",
706 app->idx, (ORTE_FLAG_TEST(app, ORTE_APP_FLAG_USED_ON_NODE) ? "T" : "F"),
707 (int)app->num_procs, sload));
708 /* Nothing to do */
709 goto cleanup;
710 }
711
712 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
713 "sstore:stage:(local): fetch_app_deps(%3d): %s",
714 app->idx, sload));
715
716 /*
717 * Extract the 'ref:seq' parameter
718 */
719 sstore_args = opal_argv_split(sload, ':');
720 req_snap_loc = strdup(sstore_args[0]);
721 req_snap_global_ref = strdup(sstore_args[1]);
722 req_snap_ref = strdup(sstore_args[2]);
723 if( NULL == sstore_args[4] ) { /* Not compressed */
724 req_snap_seq = atoi( sstore_args[3]);
725 } else {
726 req_snap_compress = strdup(sstore_args[3]);
727 req_snap_postfix = strdup(sstore_args[4]);
728 req_snap_seq = atoi( sstore_args[5]);
729 }
730
731 handle_info = find_handle_info_ref(req_snap_global_ref, req_snap_seq);
732 if( NULL == handle_info ) {
733 /* No checkpoints known, just preload the checkpoint */
734 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
735 "sstore:stage:(local): fetch_app_deps(%3d): No known checkpoint [%s, %d]",
736 app->idx,
737 req_snap_ref,
738 req_snap_seq));
739 goto filem_preload;
740 }
741
742 /*
743 * If caching enabled, then look to see if we have this snapshot cached
744 * Do not cache if migrating, since checkpoints taken while migrating are
745 * not guaranteed to be globally taken.
746 */
747 if( orte_sstore_stage_enabled_caching && !handle_info->migrating ) {
748 /*
749 * Find the process
750 */
751 for (i=0; i < orte_local_children->size; i++) {
752 if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
753 continue;
754 }
755
756 if( app->idx == child->app_idx ) {
757 /*
758 * Find the app snapshot ref
759 */
760 app_info = find_app_handle_info(handle_info, &child->name);
761 break;
762 }
763 }
764
765 if( NULL == app_info ) {
766 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
767 "sstore:stage:(local): fetch_app_deps(%3d): No processes known for this app context",
768 app->idx));
769 goto filem_preload;
770 }
771
772 /*
773 * Do we have a cached version of this file?
774 */
775 if( NULL != app_info->local_cache_location &&
776 0 == (ret = access(app_info->local_cache_location, F_OK)) ) {
777 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
778 "sstore:stage:(local): fetch_app_deps(%3d): Using local cache. (%s)",
779 app->idx,
780 app_info->local_cache_location));
781
782 opal_argv_append(&loc_argc, &(app->argv), "-c");
783 opal_argv_append(&loc_argc, &(app->argv), app_info->local_cache_location);
784 goto cleanup;
785 } else {
786 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
787 "sstore:stage:(local): fetch_app_deps(%3d): No cache available for %s. (%s)",
788 app->idx,
789 ORTE_NAME_PRINT(&app_info->name),
790 app_info->local_cache_location));
791 }
792 }
793
794 filem_preload:
795 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
796 "sstore:stage:(local): fetch_app_deps(%3d): Fetch files from Central storage",
797 app->idx));
798
799 /*
800 * If we got here, then there is no cached directory, so just preload the
801 * files, update the argument set, and carry on.
802 */
803 if( ORTE_SUCCESS != (ret = orte_sstore_stage_local_preload_files(&local_location,
804 &skip_xfer,
805 req_snap_loc,
806 req_snap_ref,
807 req_snap_postfix,
808 req_snap_seq)) ) {
809 ORTE_ERROR_LOG(ret);
810 exit_status = ret;
811 goto cleanup;
812 }
813 opal_argv_append(&loc_argc, &(app->argv), "-l");
814 opal_argv_append(&loc_argc, &(app->argv), local_location);
815
816 /*
817 * Decompress files:
818 * opal-restart will do this for us on launch
819 */
820 if( !skip_xfer ) {
821 if( NULL != req_snap_compress && 0 < strlen(req_snap_compress) ) {
822 opal_argv_append(&loc_argc, &(app->argv), "-d");
823 opal_argv_append(&loc_argc, &(app->argv), req_snap_compress);
824 }
825 if( NULL != req_snap_postfix && 0 < strlen(req_snap_postfix) ) {
826 opal_argv_append(&loc_argc, &(app->argv), "-p");
827 opal_argv_append(&loc_argc, &(app->argv), req_snap_postfix);
828 }
829 }
830
831 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
832 "sstore:stage:(local): fetch_app_deps(%3d): Fetching to (%s)",
833 app->idx,
834 local_location));
835
836 cleanup:
837 if( NULL != req_snap_compress ) {
838 free(req_snap_compress);
839 req_snap_compress = NULL;
840 }
841
842 if( NULL != tmp_str ) {
843 free(tmp_str);
844 tmp_str = NULL;
845 }
846
847 if( NULL != compress_local_location ) {
848 free(compress_local_location);
849 compress_local_location = NULL;
850 }
851
852 if( NULL != compress_ref ) {
853 free(compress_ref);
854 compress_ref = NULL;
855 }
856
857 if( NULL != sstore_args ) {
858 opal_argv_free(sstore_args);
859 sstore_args = NULL;
860 }
861
862 if( NULL != req_snap_ref ) {
863 free(req_snap_ref);
864 req_snap_ref = NULL;
865 }
866
867 if( NULL != req_snap_postfix ) {
868 free(req_snap_postfix);
869 req_snap_postfix = NULL;
870 }
871
872 if( NULL != req_snap_loc ) {
873 free(req_snap_loc);
874 req_snap_loc = NULL;
875 }
876
877 if( NULL != req_snap_global_ref ) {
878 free(req_snap_global_ref);
879 req_snap_global_ref = NULL;
880 }
881
882 return exit_status;
883 }
884
orte_sstore_stage_local_wait_all_deps(void)885 int orte_sstore_stage_local_wait_all_deps(void)
886 {
887 int ret, exit_status = ORTE_SUCCESS;
888 opal_list_item_t* item = NULL;
889
890 /* Nothing being preloaded, so just move on */
891 if( 0 >= opal_list_get_size(preload_filem_requests) ) {
892 return ORTE_SUCCESS;
893 }
894
895 /*
896 * Wait for all files to move
897 */
898 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
899 "sstore:stage:(local): wait_all_deps(): Waiting on %d requests",
900 (int)opal_list_get_size(preload_filem_requests)));
901
902 if(ORTE_SUCCESS != (ret = orte_filem.wait_all(preload_filem_requests)) ) {
903 ORTE_ERROR_LOG(ret);
904 exit_status = ret;
905 goto cleanup;
906 }
907
908 /*
909 * Cache the restart files locally, so we can restart faster next time
910 * JJH: We already check the restart directory for a local copy before
911 * starting the transfer. So this feels unnecessary since the
912 * restart directory is always used as a cache, whether or not
913 * caching is enabled. The extra copy to the cache directory
914 * does not buy us anything.
915 */
916
917 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
918 "sstore:stage:(local): wait_all_deps(): Finished waiting on %d requests!",
919 (int)opal_list_get_size(preload_filem_requests)));
920
921 cleanup:
922 while (NULL != (item = opal_list_remove_first(preload_filem_requests) ) ) {
923 OBJ_RELEASE(item);
924 }
925
926 return exit_status;
927 }
928
929 /**************************
930 * Local functions
931 **************************/
create_new_handle_info(orte_sstore_base_handle_t handle)932 static orte_sstore_stage_local_snapshot_info_t *create_new_handle_info(orte_sstore_base_handle_t handle)
933 {
934 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
935 int i;
936 orte_proc_t *child = NULL;
937
938 if( NULL == active_handles ) {
939 active_handles = OBJ_NEW(opal_list_t);
940 }
941
942 handle_info = OBJ_NEW(orte_sstore_stage_local_snapshot_info_t);
943
944 handle_info->id = handle;
945
946 opal_list_append(active_handles, &(handle_info->super));
947
948 /*
949 * Create a sub structure for each child
950 */
951 for (i=0; i < orte_local_children->size; i++) {
952 if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
953 continue;
954 }
955 append_new_app_handle_info(handle_info, &child->name);
956 }
957
958 handle_info->status = SSTORE_LOCAL_INIT;
959
960 return handle_info;
961 }
962
find_handle_info(orte_sstore_base_handle_t handle)963 static orte_sstore_stage_local_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle)
964 {
965 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
966 opal_list_item_t* item = NULL;
967
968 if( NULL == active_handles ) {
969 return NULL;
970 }
971
972 for(item = opal_list_get_first(active_handles);
973 item != opal_list_get_end(active_handles);
974 item = opal_list_get_next(item) ) {
975 handle_info = (orte_sstore_stage_local_snapshot_info_t*)item;
976
977 if( handle_info->id == handle ) {
978 return handle_info;
979 }
980 }
981
982 return NULL;
983 }
984
find_handle_info_ref(char * ref,int seq)985 static orte_sstore_stage_local_snapshot_info_t *find_handle_info_ref(char * ref, int seq)
986 {
987 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
988 opal_list_item_t* item = NULL;
989
990 if( NULL == active_handles ) {
991 return NULL;
992 }
993
994 for(item = opal_list_get_first(active_handles);
995 item != opal_list_get_end(active_handles);
996 item = opal_list_get_next(item) ) {
997 handle_info = (orte_sstore_stage_local_snapshot_info_t*)item;
998
999 if( 0 == strncmp(handle_info->global_ref_name, ref, strlen(ref)) &&
1000 handle_info->seq_num == seq ) {
1001 return handle_info;
1002 }
1003 }
1004
1005 return NULL;
1006 }
1007
append_new_app_handle_info(orte_sstore_stage_local_snapshot_info_t * handle_info,orte_process_name_t * name)1008 static int append_new_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
1009 orte_process_name_t *name)
1010 {
1011 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1012
1013 app_info = OBJ_NEW(orte_sstore_stage_local_app_snapshot_info_t);
1014
1015 app_info->name.jobid = name->jobid;
1016 app_info->name.vpid = name->vpid;
1017
1018 opal_list_append(handle_info->app_info_handle, &(app_info->super));
1019
1020 return ORTE_SUCCESS;
1021 }
1022
find_app_handle_info(orte_sstore_stage_local_snapshot_info_t * handle_info,orte_process_name_t * name)1023 static orte_sstore_stage_local_app_snapshot_info_t *find_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
1024 orte_process_name_t *name)
1025 {
1026 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1027 opal_list_item_t* item = NULL;
1028 orte_ns_cmp_bitmask_t mask;
1029
1030 for(item = opal_list_get_first(handle_info->app_info_handle);
1031 item != opal_list_get_end(handle_info->app_info_handle);
1032 item = opal_list_get_next(item) ) {
1033 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1034
1035 mask = ORTE_NS_CMP_ALL;
1036
1037 if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &app_info->name, name)) {
1038 return app_info;
1039 }
1040 }
1041
1042 return NULL;
1043 }
1044
sstore_stage_local_start_listener(void)1045 static int sstore_stage_local_start_listener(void)
1046 {
1047 if( is_global_listener_active ) {
1048 return ORTE_SUCCESS;
1049 }
1050
1051 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL,
1052 ORTE_RML_PERSISTENT, sstore_stage_local_recv, NULL);
1053
1054 is_global_listener_active = true;
1055 return ORTE_SUCCESS;
1056 }
1057
sstore_stage_local_stop_listener(void)1058 static int sstore_stage_local_stop_listener(void)
1059 {
1060 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL);
1061 is_global_listener_active = false;
1062 return ORTE_SUCCESS;
1063 }
1064
sstore_stage_local_recv(int status,orte_process_name_t * sender,opal_buffer_t * buffer,orte_rml_tag_t tag,void * cbdata)1065 static void sstore_stage_local_recv(int status,
1066 orte_process_name_t* sender,
1067 opal_buffer_t* buffer,
1068 orte_rml_tag_t tag,
1069 void* cbdata)
1070 {
1071 int ret;
1072 orte_sstore_stage_cmd_flag_t command;
1073 orte_std_cntr_t count;
1074 orte_sstore_base_handle_t loc_id;
1075
1076 if( ORTE_RML_TAG_SSTORE_INTERNAL != tag ) {
1077 return;
1078 }
1079
1080 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1081 "sstore:stage:(local): process_cmd(%s)",
1082 ORTE_NAME_PRINT(sender)));
1083
1084 count = 1;
1085 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_STAGE_CMD))) {
1086 ORTE_ERROR_LOG(ret);
1087 goto cleanup;
1088 }
1089
1090 count = 1;
1091 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE )) ) {
1092 ORTE_ERROR_LOG(ret);
1093 goto cleanup;
1094 }
1095
1096 orte_sstore_stage_local_process_cmd_action(sender, command, loc_id, buffer);
1097
1098 cleanup:
1099 return;
1100 }
1101
orte_sstore_stage_local_process_cmd_action(orte_process_name_t * sender,orte_sstore_stage_cmd_flag_t command,orte_sstore_base_handle_t loc_id,opal_buffer_t * buffer)1102 int orte_sstore_stage_local_process_cmd_action(orte_process_name_t *sender,
1103 orte_sstore_stage_cmd_flag_t command,
1104 orte_sstore_base_handle_t loc_id,
1105 opal_buffer_t* buffer)
1106 {
1107 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
1108
1109 /*
1110 * Find the referenced handle (Create if it does not exist)
1111 */
1112 if(NULL == (handle_info = find_handle_info(loc_id)) ) {
1113 handle_info = create_new_handle_info(loc_id);
1114 }
1115
1116 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1117 "sstore:stage:(local): process_cmd(%s) - Command = %s",
1118 ORTE_NAME_PRINT(sender),
1119 (ORTE_SSTORE_STAGE_PULL == command ? "Pull" :
1120 (ORTE_SSTORE_STAGE_PUSH == command ? "Push" :
1121 (ORTE_SSTORE_STAGE_REMOVE == command ? "Remove" :
1122 (ORTE_SSTORE_STAGE_DONE == command ? "Done" : "Unknown")))) ));
1123
1124 /*
1125 * Process the command
1126 */
1127 if( ORTE_SSTORE_STAGE_PULL == command ) {
1128 if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_HNP, sender)) {
1129 process_global_pull(sender, buffer, handle_info);
1130 } else {
1131 process_app_pull(sender, buffer, handle_info);
1132 }
1133 }
1134 else if( ORTE_SSTORE_STAGE_PUSH == command ) {
1135 if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_HNP, sender)) {
1136 process_global_push(sender, buffer, handle_info);
1137 } else {
1138 process_app_push(sender, buffer, handle_info);
1139 }
1140 }
1141 else if( ORTE_SSTORE_STAGE_REMOVE == command ) {
1142 /* The xcast from the root makes the 'sender' equal to this process :/
1143 * so we know it is the HNP, so just use that name */
1144 process_global_remove(ORTE_PROC_MY_HNP, buffer, handle_info);
1145 }
1146
1147 return ORTE_SUCCESS;
1148 }
1149
process_global_pull(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_local_snapshot_info_t * handle_info)1150 static int process_global_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1151 {
1152 /* JJH should be as simple as calling push_handle_info() */
1153 opal_output(0, "sstore:stage:(local): process_global_pull() Not implemented!");
1154 return ORTE_ERR_NOT_IMPLEMENTED;
1155 }
1156
process_global_push(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_local_snapshot_info_t * handle_info)1157 static int process_global_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1158 {
1159 int ret, exit_status = ORTE_SUCCESS;
1160 orte_std_cntr_t count;
1161 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1162 opal_list_item_t* item = NULL;
1163
1164 count = 1;
1165 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->seq_num), &count, OPAL_INT))) {
1166 ORTE_ERROR_LOG(ret);
1167 exit_status = ret;
1168 goto cleanup;
1169 }
1170
1171 count = 1;
1172 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
1173 ORTE_ERROR_LOG(ret);
1174 exit_status = ret;
1175 goto cleanup;
1176 }
1177
1178 count = 1;
1179 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->location_fmt), &count, OPAL_STRING))) {
1180 ORTE_ERROR_LOG(ret);
1181 exit_status = ret;
1182 goto cleanup;
1183 }
1184
1185 if( orte_sstore_stage_enabled_caching ) {
1186 count = 1;
1187 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->cache_location_fmt), &count, OPAL_STRING))) {
1188 ORTE_ERROR_LOG(ret);
1189 exit_status = ret;
1190 goto cleanup;
1191 }
1192 }
1193
1194 count = 1;
1195 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->migrating), &count, OPAL_BOOL))) {
1196 ORTE_ERROR_LOG(ret);
1197 exit_status = ret;
1198 goto cleanup;
1199 }
1200
1201 /*
1202 * For each process we are working with
1203 */
1204 for(item = opal_list_get_first(handle_info->app_info_handle);
1205 item != opal_list_get_end(handle_info->app_info_handle);
1206 item = opal_list_get_next(item) ) {
1207 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1208
1209 if( NULL != app_info->local_location ) {
1210 free(app_info->local_location);
1211 app_info->local_location = NULL;
1212 }
1213 asprintf(&(app_info->local_location), handle_info->location_fmt, app_info->name.vpid);
1214
1215 if( orte_sstore_stage_enabled_caching ) {
1216 if( NULL != app_info->local_cache_location ) {
1217 free(app_info->local_cache_location);
1218 app_info->local_cache_location = NULL;
1219 }
1220 asprintf(&(app_info->local_cache_location), handle_info->cache_location_fmt, app_info->name.vpid);
1221 }
1222
1223 if( NULL != app_info->metadata_filename ) {
1224 free(app_info->metadata_filename);
1225 app_info->metadata_filename = NULL;
1226 }
1227 asprintf(&(app_info->metadata_filename), "%s/%s",
1228 app_info->local_location,
1229 orte_sstore_base_local_metadata_filename);
1230 }
1231
1232 cleanup:
1233 if( ORTE_SUCCESS == exit_status ) {
1234 handle_info->status = SSTORE_LOCAL_READY;
1235 } else {
1236 handle_info->status = SSTORE_LOCAL_ERROR;
1237 }
1238
1239 return exit_status;
1240 }
1241
process_global_remove(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_local_snapshot_info_t * handle_info)1242 static int process_global_remove(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1243 {
1244 int ret, exit_status = ORTE_SUCCESS;
1245 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1246 opal_list_item_t* item = NULL;
1247 opal_buffer_t *loc_buffer = NULL;
1248 orte_sstore_stage_cmd_flag_t command;
1249 size_t list_size;
1250 char * cmd = NULL;
1251
1252 /*
1253 * If not caching, then just remove the local copy
1254 * Or if migrating, since we do not cache checkpoints generated while
1255 * migrating.
1256 */
1257 if( !orte_sstore_stage_enabled_caching || handle_info->migrating ) {
1258 for(item = opal_list_get_first(handle_info->app_info_handle);
1259 item != opal_list_get_end(handle_info->app_info_handle);
1260 item = opal_list_get_next(item) ) {
1261 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1262
1263 asprintf(&cmd, "rm -rf %s", app_info->local_location);
1264 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1265 "sstore:stage:(local): update_cache(): Removing with command (%s)",
1266 cmd));
1267 system(cmd);
1268
1269 if( orte_sstore_stage_enabled_compression && NULL != app_info->compressed_local_location) {
1270 free(cmd);
1271 cmd = NULL;
1272
1273 asprintf(&cmd, "rm -rf %s", app_info->compressed_local_location);
1274 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1275 "sstore:stage:(local): update_cache(): Removing with command (%s)",
1276 cmd));
1277 system(cmd);
1278 }
1279 }
1280 }
1281 else {
1282 /*
1283 * Update the local cache
1284 */
1285 if( ORTE_SUCCESS != (ret = sstore_stage_update_cache(handle_info)) ) {
1286 ORTE_ERROR_LOG(ret);
1287 exit_status = ret;
1288 goto cleanup;
1289 }
1290 }
1291
1292 loc_buffer = OBJ_NEW(opal_buffer_t);
1293
1294 command = ORTE_SSTORE_STAGE_DONE;
1295 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1296 ORTE_ERROR_LOG(ret);
1297 exit_status = ret;
1298 goto cleanup;
1299 }
1300
1301 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1302 ORTE_ERROR_LOG(ret);
1303 exit_status = ret;
1304 goto cleanup;
1305 }
1306
1307 list_size = opal_list_get_size(handle_info->app_info_handle);
1308 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &list_size, 1, OPAL_SIZE))) {
1309 ORTE_ERROR_LOG(ret);
1310 exit_status = ret;
1311 goto cleanup;
1312 }
1313
1314 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1315 orte_rml_send_callback, NULL))) {
1316 ORTE_ERROR_LOG(ret);
1317 exit_status = ret;
1318 goto cleanup;
1319 }
1320
1321 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1322 "sstore:stage:(local): remove(): Sent done for %d files to %s",
1323 (int)list_size,
1324 ORTE_NAME_PRINT(peer)));
1325
1326 handle_info->status = SSTORE_LOCAL_DONE;
1327 /* loc_buffer should not be released here; the callback releases it */
1328 loc_buffer = NULL;
1329
1330 cleanup:
1331 if( NULL != cmd ) {
1332 free(cmd);
1333 cmd = NULL;
1334 }
1335
1336 if (NULL != loc_buffer) {
1337 OBJ_RELEASE(loc_buffer);
1338 loc_buffer = NULL;
1339 }
1340
1341 return exit_status;
1342 }
1343
process_app_pull(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_local_snapshot_info_t * handle_info)1344 static int process_app_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1345 {
1346 int ret, exit_status = ORTE_SUCCESS;
1347 opal_buffer_t *loc_buffer = NULL;
1348 orte_sstore_stage_cmd_flag_t command;
1349 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1350
1351 /*
1352 * Find this app's data
1353 */
1354 app_info = find_app_handle_info(handle_info, peer);
1355
1356 /*
1357 * Push back the requested information
1358 */
1359 loc_buffer = OBJ_NEW(opal_buffer_t);
1360
1361 command = ORTE_SSTORE_STAGE_PUSH;
1362 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1363 ORTE_ERROR_LOG(ret);
1364 exit_status = ret;
1365 goto cleanup;
1366 }
1367
1368 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1369 ORTE_ERROR_LOG(ret);
1370 exit_status = ret;
1371 goto cleanup;
1372 }
1373
1374 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->seq_num), 1, OPAL_INT))) {
1375 ORTE_ERROR_LOG(ret);
1376 exit_status = ret;
1377 goto cleanup;
1378 }
1379
1380 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->global_ref_name), 1, OPAL_STRING))) {
1381 ORTE_ERROR_LOG(ret);
1382 exit_status = ret;
1383 goto cleanup;
1384 }
1385
1386 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(app_info->local_location), 1, OPAL_STRING))) {
1387 ORTE_ERROR_LOG(ret);
1388 exit_status = ret;
1389 goto cleanup;
1390 }
1391
1392 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(app_info->metadata_filename), 1, OPAL_STRING))) {
1393 ORTE_ERROR_LOG(ret);
1394 exit_status = ret;
1395 goto cleanup;
1396 }
1397
1398 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1399 orte_rml_send_callback, NULL))) {
1400 ORTE_ERROR_LOG(ret);
1401 exit_status = ret;
1402 goto cleanup;
1403 }
1404
1405 /* loc_buffer should not be released here; the callback releases it */
1406 loc_buffer = NULL;
1407
1408 cleanup:
1409 if (NULL != loc_buffer) {
1410 OBJ_RELEASE(loc_buffer);
1411 loc_buffer = NULL;
1412 }
1413
1414 return exit_status;
1415 }
1416
process_app_push(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_local_snapshot_info_t * handle_info)1417 static int process_app_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1418 {
1419 int ret, exit_status = ORTE_SUCCESS;
1420 orte_std_cntr_t count;
1421 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1422
1423 /*
1424 * Find this app's data
1425 */
1426 app_info = find_app_handle_info(handle_info, peer);
1427
1428 /*
1429 * Unpack the data
1430 */
1431 count = 1;
1432 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(app_info->ckpt_skipped), &count, OPAL_BOOL))) {
1433 ORTE_ERROR_LOG(ret);
1434 exit_status = ret;
1435 goto cleanup;
1436 }
1437
1438 if( !app_info->ckpt_skipped ) {
1439 count = 1;
1440 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(app_info->crs_comp), &count, OPAL_STRING))) {
1441 ORTE_ERROR_LOG(ret);
1442 exit_status = ret;
1443 goto cleanup;
1444 }
1445 }
1446
1447 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1448 "sstore:stage:(local): app_push(%s, skip=%s, %s)",
1449 ORTE_NAME_PRINT(&(app_info->name)),
1450 (app_info->ckpt_skipped ? "T" : "F"),
1451 app_info->crs_comp));
1452
1453 /* Compression started on sync() */
1454
1455 cleanup:
1456 return exit_status;
1457 }
1458
wait_all_apps_updated(orte_sstore_stage_local_snapshot_info_t * handle_info)1459 static int wait_all_apps_updated(orte_sstore_stage_local_snapshot_info_t *handle_info)
1460 {
1461 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1462 opal_list_item_t *item = NULL;
1463 bool is_done = true;
1464
1465 do {
1466 is_done = true;
1467 for(item = opal_list_get_first(handle_info->app_info_handle);
1468 item != opal_list_get_end(handle_info->app_info_handle);
1469 item = opal_list_get_next(item) ) {
1470 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1471
1472 if( NULL == app_info->crs_comp && !app_info->ckpt_skipped ) {
1473 is_done = false;
1474 break;
1475 }
1476 }
1477
1478 if( !is_done ) {
1479 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1480 "sstore:stage:(local): Waiting for appliccation %s",
1481 ORTE_NAME_PRINT(&(app_info->name)) ));
1482 opal_progress();
1483 }
1484 } while(!is_done);
1485
1486 return ORTE_SUCCESS;
1487 }
1488
start_compression(orte_sstore_stage_local_snapshot_info_t * handle_info,orte_sstore_stage_local_app_snapshot_info_t * app_info)1489 static int start_compression(orte_sstore_stage_local_snapshot_info_t *handle_info,
1490 orte_sstore_stage_local_app_snapshot_info_t *app_info)
1491 {
1492 int ret, exit_status = ORTE_SUCCESS;
1493 char * postfix = NULL;
1494 orte_proc_t *proc;
1495
1496 /* Sanity Check */
1497 if( !orte_sstore_stage_enabled_compression ) {
1498 goto cleanup;
1499 }
1500
1501 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1502 "sstore:stage:(local): start_compression() Starting compression for process %s of (%s)",
1503 ORTE_NAME_PRINT(&(app_info->name)),
1504 app_info->local_location ));
1505
1506 /*
1507 * Start compression (nonblocking)
1508 */
1509 if( ORTE_SUCCESS != (ret = opal_compress.compress_nb(app_info->local_location,
1510 &(app_info->compressed_local_location),
1511 &(postfix),
1512 &(app_info->compress_pid))) ) {
1513 ORTE_ERROR_LOG(ret);
1514 exit_status = ret;
1515 goto cleanup;
1516 }
1517 if( app_info->compress_pid <= 0 ) {
1518 ORTE_ERROR_LOG(ORTE_ERROR);
1519 exit_status = ORTE_ERROR;
1520 goto cleanup;
1521 }
1522
1523 if( NULL == handle_info->compress_comp ) {
1524 handle_info->compress_comp = strdup(opal_compress_base_selected_component.base_version.mca_component_name);
1525 handle_info->compress_postfix = strdup(postfix);
1526 }
1527
1528 /*
1529 * Setup a callback for when it is finished
1530 */
1531 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1532 "sstore:stage:(local): start_compression() Waiting for compression (%d) for process %s",
1533 app_info->compress_pid,
1534 ORTE_NAME_PRINT(&(app_info->name)) ));
1535
1536 proc = OBJ_NEW(orte_proc_t);
1537 proc->pid = app_info->compress_pid;
1538 /* be sure to mark it as alive so we don't instantly fire */
1539 ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_ALIVE);
1540
1541 orte_wait_cb(proc, sstore_stage_local_compress_waitpid_cb, app_info);
1542
1543 cleanup:
1544 if( NULL != postfix ) {
1545 free(postfix);
1546 postfix = NULL;
1547 }
1548
1549 return exit_status;
1550 }
1551
sstore_stage_local_compress_waitpid_cb(orte_proc_t * proc,void * cbdata)1552 static void sstore_stage_local_compress_waitpid_cb(orte_proc_t *proc, void* cbdata)
1553 {
1554 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1555 orte_wait_tracker_t *t2 = (orte_wait_tracker_t *)cbdata;
1556
1557 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)t2->cbdata;
1558
1559 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1560 "sstore:stage:(local): waitpid(%6d) Compression finished for Process %s",
1561 (int)proc->pid,
1562 ORTE_NAME_PRINT(&(app_info->name)) ));
1563
1564 app_info->compress_pid = 0;
1565 OBJ_RELEASE(proc);
1566 OBJ_RELEASE(t2);
1567 }
1568
wait_all_compressed(orte_sstore_stage_local_snapshot_info_t * handle_info)1569 static int wait_all_compressed(orte_sstore_stage_local_snapshot_info_t *handle_info)
1570 {
1571 int ret, exit_status = ORTE_SUCCESS;
1572 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1573 opal_list_item_t *item = NULL;
1574 bool is_done = true;
1575 int usleep_time = 1000;
1576 int s_time = 0, max_wait_time;
1577
1578 /* Sanity Check */
1579 if( !orte_sstore_stage_enabled_compression ) {
1580 return ORTE_SUCCESS;
1581 }
1582
1583 /*
1584 * Start all compression
1585 */
1586 if( orte_sstore_stage_compress_delay > 0 ) {
1587 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1588 "sstore:stage:(local): Delaying %d second before starting compression...",
1589 orte_sstore_stage_compress_delay));
1590 max_wait_time = orte_sstore_stage_compress_delay * (1000000/usleep_time);
1591 for( s_time = 0; s_time < max_wait_time; ++s_time) {
1592 opal_progress();
1593 usleep(1000);
1594 }
1595 }
1596
1597 for(item = opal_list_get_first(handle_info->app_info_handle);
1598 item != opal_list_get_end(handle_info->app_info_handle);
1599 item = opal_list_get_next(item) ) {
1600 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1601
1602 if( ORTE_SUCCESS != (ret = start_compression(handle_info, app_info)) ) {
1603 ORTE_ERROR_LOG(ret);
1604 exit_status = ret;
1605 goto cleanup;
1606 }
1607 }
1608
1609 /*
1610 * Wait for compression to finish
1611 */
1612 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1613 "sstore:stage:(local): Waiting for compression to finish..."));
1614 do {
1615 is_done = true;
1616 for(item = opal_list_get_first(handle_info->app_info_handle);
1617 item != opal_list_get_end(handle_info->app_info_handle);
1618 item = opal_list_get_next(item) ) {
1619 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1620
1621 if( 0 < app_info->compress_pid ) {
1622 is_done = false;
1623 break;
1624 }
1625 }
1626
1627 if( !is_done ) {
1628 OPAL_OUTPUT_VERBOSE((30, mca_sstore_stage_component.super.output_handle,
1629 "sstore:stage:(local): Waiting for compression to finish for appliccation %s",
1630 ORTE_NAME_PRINT(&(app_info->name)) ));
1631 opal_progress();
1632 }
1633 } while(!is_done);
1634
1635 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1636 "sstore:stage:(local): Compression finished!"));
1637 cleanup:
1638 return exit_status;
1639 }
1640
pull_handle_info(orte_sstore_stage_local_snapshot_info_t * handle_info)1641 static int pull_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info )
1642 {
1643 int ret, exit_status = ORTE_SUCCESS;
1644 opal_buffer_t *buffer = NULL;
1645 orte_sstore_stage_cmd_flag_t command;
1646
1647 /*
1648 * Check to see if this is necessary
1649 * (Did we get all of the info from the handle unpack?)
1650 */
1651 if( 0 <= handle_info->seq_num &&
1652 NULL != handle_info->global_ref_name &&
1653 NULL != handle_info->location_fmt ) {
1654 handle_info->status = SSTORE_LOCAL_READY;
1655 return ORTE_SUCCESS;
1656 }
1657
1658 buffer = OBJ_NEW(opal_buffer_t);
1659
1660 /*
1661 * Ask the daemon to send us the info that we need
1662 */
1663 command = ORTE_SSTORE_STAGE_PULL;
1664 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1665 ORTE_ERROR_LOG(ret);
1666 exit_status = ret;
1667 goto cleanup;
1668 }
1669
1670 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1671 ORTE_ERROR_LOG(ret);
1672 exit_status = ret;
1673 goto cleanup;
1674 }
1675
1676 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
1677 ORTE_RML_TAG_SSTORE_INTERNAL,
1678 orte_rml_send_callback, NULL))) {
1679 ORTE_ERROR_LOG(ret);
1680 exit_status = ret;
1681 goto cleanup;
1682 }
1683
1684 /* buffer should not be released here; the callback releases it */
1685 buffer = NULL;
1686
1687 cleanup:
1688 if (NULL != buffer) {
1689 OBJ_RELEASE(buffer);
1690 buffer = NULL;
1691 }
1692
1693 return exit_status;
1694 }
1695
push_handle_info(orte_sstore_stage_local_snapshot_info_t * handle_info)1696 static int push_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info )
1697 {
1698 int ret, exit_status = ORTE_SUCCESS;
1699 opal_buffer_t *buffer = NULL;
1700 orte_sstore_stage_cmd_flag_t command;
1701 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1702 opal_list_item_t *item = NULL;
1703 size_t list_size;
1704
1705 buffer = OBJ_NEW(opal_buffer_t);
1706
1707 command = ORTE_SSTORE_STAGE_PUSH;
1708 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1709 ORTE_ERROR_LOG(ret);
1710 exit_status = ret;
1711 goto cleanup;
1712 }
1713
1714 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1715 ORTE_ERROR_LOG(ret);
1716 exit_status = ret;
1717 goto cleanup;
1718 }
1719
1720 list_size = opal_list_get_size(handle_info->app_info_handle);
1721 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &list_size, 1, OPAL_SIZE))) {
1722 ORTE_ERROR_LOG(ret);
1723 exit_status = ret;
1724 goto cleanup;
1725 }
1726
1727 /*
1728 * For each process we are working with
1729 */
1730 for(item = opal_list_get_first(handle_info->app_info_handle);
1731 item != opal_list_get_end(handle_info->app_info_handle);
1732 item = opal_list_get_next(item) ) {
1733 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1734
1735 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->name), 1, ORTE_NAME))) {
1736 ORTE_ERROR_LOG(ret);
1737 exit_status = ret;
1738 goto cleanup;
1739 }
1740
1741 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->ckpt_skipped), 1, OPAL_BOOL))) {
1742 ORTE_ERROR_LOG(ret);
1743 exit_status = ret;
1744 goto cleanup;
1745 }
1746
1747 if( !app_info->ckpt_skipped ) {
1748 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->crs_comp), 1, OPAL_STRING))) {
1749 ORTE_ERROR_LOG(ret);
1750 exit_status = ret;
1751 goto cleanup;
1752 }
1753
1754 if( orte_sstore_stage_enabled_compression ) {
1755 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->compress_comp), 1, OPAL_STRING))) {
1756 ORTE_ERROR_LOG(ret);
1757 exit_status = ret;
1758 goto cleanup;
1759 }
1760 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->compress_postfix), 1, OPAL_STRING))) {
1761 ORTE_ERROR_LOG(ret);
1762 exit_status = ret;
1763 goto cleanup;
1764 }
1765 }
1766 }
1767 }
1768
1769 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1770 orte_rml_send_callback, NULL))) {
1771 ORTE_ERROR_LOG(ret);
1772 exit_status = ret;
1773 goto cleanup;
1774 }
1775
1776 /* buffer should not be released here; the callback releases it */
1777 buffer = NULL;
1778
1779 cleanup:
1780 if (NULL != buffer) {
1781 OBJ_RELEASE(buffer);
1782 buffer = NULL;
1783 }
1784
1785 return exit_status;
1786 }
1787
sstore_stage_create_local_dir(void)1788 static int sstore_stage_create_local_dir(void)
1789 {
1790 int ret, exit_status = ORTE_SUCCESS;
1791 mode_t my_mode = S_IRWXU;
1792
1793 if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(sstore_stage_local_basedir, my_mode)) ) {
1794 ORTE_ERROR_LOG(ret);
1795 exit_status = ret;
1796 goto cleanup;
1797 }
1798
1799 cleanup:
1800 return exit_status;
1801 }
1802
sstore_stage_destroy_local_dir(void)1803 static int sstore_stage_destroy_local_dir(void)
1804 {
1805 int ret, exit_status = ORTE_SUCCESS;
1806 char * basedir_root = NULL;
1807
1808 asprintf(&basedir_root, "%s/%s",
1809 orte_sstore_stage_local_snapshot_dir,
1810 ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME);
1811
1812 if(OPAL_SUCCESS != (ret = opal_os_dirpath_destroy(basedir_root, true, NULL)) ) {
1813 ORTE_ERROR_LOG(ret);
1814 exit_status = ret;
1815 goto cleanup;
1816 }
1817
1818 cleanup:
1819 if( NULL != basedir_root ) {
1820 free(basedir_root);
1821 basedir_root = NULL;
1822 }
1823
1824 return exit_status;
1825 }
1826
sstore_stage_create_cache(void)1827 static int sstore_stage_create_cache(void)
1828 {
1829 int ret, exit_status = ORTE_SUCCESS;
1830 mode_t my_mode = S_IRWXU;
1831
1832 /* Sanity check */
1833 if( !orte_sstore_stage_enabled_caching ) {
1834 goto cleanup;
1835 }
1836
1837 if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(sstore_stage_cache_basedir, my_mode)) ) {
1838 ORTE_ERROR_LOG(ret);
1839 exit_status = ret;
1840 goto cleanup;
1841 }
1842
1843 cleanup:
1844 return exit_status;
1845 }
1846
sstore_stage_destroy_cache(void)1847 static int sstore_stage_destroy_cache(void)
1848 {
1849 int ret, exit_status = ORTE_SUCCESS;
1850
1851 /* Sanity check */
1852 if( !orte_sstore_stage_enabled_caching ) {
1853 goto cleanup;
1854 }
1855
1856 if(OPAL_SUCCESS != (ret = opal_os_dirpath_destroy(sstore_stage_cache_basedir, true, NULL)) ) {
1857 ORTE_ERROR_LOG(ret);
1858 exit_status = ret;
1859 goto cleanup;
1860 }
1861
1862 cleanup:
1863 return exit_status;
1864 }
1865
sstore_stage_update_cache(orte_sstore_stage_local_snapshot_info_t * handle_info)1866 static int sstore_stage_update_cache(orte_sstore_stage_local_snapshot_info_t *handle_info)
1867 {
1868 int ret, exit_status = ORTE_SUCCESS;
1869 char *cmd = NULL;
1870 mode_t my_mode = S_IRWXU;
1871 char *cache_dirname = NULL;
1872 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1873 opal_list_item_t* item = NULL;
1874 size_t list_size;
1875
1876 /* Sanity Check */
1877 if( !orte_sstore_stage_enabled_caching || handle_info->migrating) {
1878 goto cleanup;
1879 }
1880
1881 list_size = opal_list_get_size(handle_info->app_info_handle);
1882 if( 0 >= list_size ) {
1883 /* No processes on this node, skip */
1884 exit_status = ORTE_SUCCESS;
1885 goto cleanup;
1886 }
1887
1888 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)opal_list_get_first(handle_info->app_info_handle);
1889 if( NULL == app_info ) {
1890 ORTE_ERROR_LOG(ORTE_ERROR);
1891 exit_status = ORTE_ERROR;
1892 goto cleanup;
1893 }
1894
1895 /*
1896 * Create the base cache directory
1897 */
1898 cache_dirname = opal_dirname(app_info->local_cache_location);
1899 if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(cache_dirname, my_mode)) ) {
1900 ORTE_ERROR_LOG(ret);
1901 exit_status = ret;
1902 goto cleanup;
1903 }
1904
1905 /*
1906 * For each process, move the current checkpoint to the cache directory
1907 * Cached snapshots are always stored uncompressed.
1908 */
1909 for(item = opal_list_get_first(handle_info->app_info_handle);
1910 item != opal_list_get_end(handle_info->app_info_handle);
1911 item = opal_list_get_next(item) ) {
1912 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1913
1914 asprintf(&cmd, "mv %s %s", app_info->local_location, cache_dirname);
1915 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1916 "sstore:stage:(local): update_cache(): Caching snapshot for process %s [%s]",
1917 ORTE_NAME_PRINT(&app_info->name),
1918 cmd));
1919 system(cmd);
1920
1921 /* (JJH) Remove the cached files */
1922 if( orte_sstore_stage_enabled_compression && NULL != app_info->compressed_local_location) {
1923 free(cmd);
1924 cmd = NULL;
1925
1926 asprintf(&cmd, "rm -rf %s", app_info->compressed_local_location);
1927 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1928 "sstore:stage:(local): update_cache(): Removing with command (%s)",
1929 cmd));
1930 system(cmd);
1931 }
1932 }
1933
1934 /*
1935 * Remove the previous cached checkpoint
1936 */
1937 if( NULL != sstore_stage_cache_last_dir ) {
1938 asprintf(&cmd, "rm -rf %s", sstore_stage_cache_last_dir);
1939 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1940 "sstore:stage:(local): update_cache(): Removing old cache dir command (%s)",
1941 sstore_stage_cache_last_dir));
1942 system(cmd);
1943 }
1944
1945 /*
1946 * Update 'last' cache pointer
1947 */
1948 if( NULL != sstore_stage_cache_last_dir ) {
1949 free(sstore_stage_cache_last_dir);
1950 sstore_stage_cache_last_dir = NULL;
1951 }
1952 if( NULL != sstore_stage_cache_current_dir ) {
1953 sstore_stage_cache_last_dir = strdup(sstore_stage_cache_current_dir);
1954 }
1955
1956 /*
1957 * Update 'current' cache pointer
1958 */
1959 if( NULL != sstore_stage_cache_current_dir ) {
1960 free(sstore_stage_cache_current_dir);
1961 sstore_stage_cache_current_dir = NULL;
1962 }
1963 sstore_stage_cache_current_dir = strdup(cache_dirname);
1964
1965 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1966 "sstore:stage:(local): update_cache(): Cache Pointers cur(%s), last(%s)",
1967 sstore_stage_cache_current_dir, sstore_stage_cache_last_dir));
1968
1969 cleanup:
1970 if( NULL != cmd ) {
1971 free(cmd);
1972 cmd = NULL;
1973 }
1974
1975 return exit_status;
1976 }
1977
orte_sstore_stage_local_preload_files(char ** local_location,bool * skip_xfer,char * global_loc,char * ref,char * postfix,int seq)1978 static int orte_sstore_stage_local_preload_files(char **local_location, bool *skip_xfer,
1979 char *global_loc, char *ref, char *postfix, int seq)
1980 {
1981 int ret, exit_status = ORTE_SUCCESS;
1982 mode_t my_mode = S_IRWXU;
1983 orte_filem_base_request_t *filem_request;
1984 orte_filem_base_process_set_t *p_set = NULL;
1985 orte_filem_base_file_set_t * f_set = NULL;
1986 char * full_local_location = NULL;
1987
1988 *skip_xfer = false;
1989
1990 if( NULL != *local_location) {
1991 free(*local_location);
1992 *local_location = NULL;
1993 }
1994
1995 /*
1996 * If the global directory is shared, then just reference directly
1997 *
1998 * Skip this optimization if compressing. Since decompressing on the
1999 * central storage would typically require a transfer to the local
2000 * disk to decompress, then transfer back. Eliminating all benefits
2001 * of the optimization.
2002 */
2003 /* (JJH) If we are going to use the preloaded restart files for subsequent
2004 * restarts then we actually always want to preload the files. This
2005 * way if we need to restart from the same checkpoint again, then
2006 * we can from the local restart cache.
2007 */
2008 #if 0
2009 if( orte_sstore_stage_global_is_shared &&
2010 (NULL == postfix || 0 >= strlen(postfix) ) ) {
2011 *local_location = strdup(global_loc);
2012 *skip_xfer = true;
2013 goto cleanup;
2014 }
2015 #endif
2016
2017 asprintf(local_location, "%s/%s/%s/%d",
2018 orte_sstore_stage_local_snapshot_dir,
2019 ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
2020 ORTE_SSTORE_LOCAL_SNAPSHOT_RESTART_DIR_NAME,
2021 seq);
2022 asprintf(&full_local_location, "%s/%s",
2023 *local_location,
2024 ref);
2025
2026 /*
2027 * If the snapshot already exists locally, just reuse it instead of
2028 * transfering it again.
2029 */
2030 if( 0 == (ret = access(full_local_location, F_OK)) ) {
2031 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
2032 "sstore:stage:(local): preload_files() Local snapshot already exists, reuse it (%s)",
2033 full_local_location));
2034 *skip_xfer = true;
2035 goto cleanup;
2036 }
2037
2038 /*
2039 * Create the local restart directory
2040 */
2041 if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(*local_location, my_mode)) ) {
2042 ORTE_ERROR_LOG(ret);
2043 exit_status = ret;
2044 goto cleanup;
2045 }
2046
2047 /*
2048 * FileM request to move the checkpoint to the local directory
2049 */
2050 filem_request = OBJ_NEW(orte_filem_base_request_t);
2051
2052 /* Define the process set */
2053 p_set = OBJ_NEW(orte_filem_base_process_set_t);
2054 if( ORTE_PROC_IS_HNP ) {
2055 /* if I am the HNP, then use me as the source */
2056 p_set->source.jobid = ORTE_PROC_MY_NAME->jobid;
2057 p_set->source.vpid = ORTE_PROC_MY_NAME->vpid;
2058 }
2059 else {
2060 /* otherwise, set the HNP as the source */
2061 p_set->source.jobid = ORTE_PROC_MY_HNP->jobid;
2062 p_set->source.vpid = ORTE_PROC_MY_HNP->vpid;
2063 }
2064 p_set->sink.jobid = ORTE_PROC_MY_NAME->jobid;
2065 p_set->sink.vpid = ORTE_PROC_MY_NAME->vpid;
2066 opal_list_append(&(filem_request->process_sets), &(p_set->super) );
2067
2068 /* Define the file set */
2069 f_set = OBJ_NEW(orte_filem_base_file_set_t);
2070
2071 f_set->local_target = strdup(*local_location);
2072 if( NULL != postfix && 0 < strlen(postfix) ) {
2073 asprintf(&(f_set->remote_target), "%s/%s%s",
2074 global_loc,
2075 ref,
2076 postfix);
2077 } else {
2078 asprintf(&(f_set->remote_target), "%s/%s",
2079 global_loc,
2080 ref);
2081 }
2082 if( NULL != postfix && 0 < strlen(postfix) ) {
2083 f_set->target_flag = ORTE_FILEM_TYPE_FILE;
2084 } else {
2085 f_set->target_flag = ORTE_FILEM_TYPE_DIR;
2086 }
2087
2088 opal_list_append(&(filem_request->file_sets), &(f_set->super) );
2089
2090 /* Start getting the files */
2091 opal_list_append(preload_filem_requests, &(filem_request->super));
2092 if(ORTE_SUCCESS != (ret = orte_filem.get_nb(filem_request)) ) {
2093 ORTE_ERROR_LOG(ret);
2094 exit_status = ret;
2095 goto cleanup;
2096 }
2097
2098 cleanup:
2099 if( NULL != full_local_location ) {
2100 free(full_local_location);
2101 full_local_location = NULL;
2102 }
2103
2104 return exit_status;
2105 }
2106