1 /*
2  * Copyright (c)      2010 The Trustees of Indiana University.
3  *                         All rights reserved.
4  * Copyright (c) 2004-2011 The University of Tennessee and The University
5  *                         of Tennessee Research Foundation.  All rights
6  *                         reserved.
7  * $COPYRIGHT$
8  *
9  * Additional copyrights may follow
10  *
11  * $HEADER$
12  */
13 
14 /*
15  *
16  */
17 
18 #include "orte_config.h"
19 
20 #include <string.h>
21 #include <stdlib.h>
22 #include <sys/types.h>
23 #include <sys/stat.h>
24 #include <sys/wait.h>
25 #ifdef HAVE_UNISTD_H
26 #include <unistd.h>
27 #endif  /* HAVE_UNISTD_H */
28 
29 #include "orte/mca/mca.h"
30 #include "opal/mca/base/base.h"
31 
32 #include "opal/mca/event/event.h"
33 
34 #include "orte/constants.h"
35 #include "orte/util/show_help.h"
36 #include "opal/util/argv.h"
37 #include "opal/util/output.h"
38 #include "opal/util/opal_environ.h"
39 #include "opal/util/basename.h"
40 #include "opal/util/os_dirpath.h"
41 
42 #include "opal/mca/compress/compress.h"
43 #include "opal/mca/compress/base/base.h"
44 
45 #include "opal/threads/mutex.h"
46 #include "opal/threads/condition.h"
47 
48 #include "orte/util/name_fns.h"
49 #include "orte/util/proc_info.h"
50 #include "orte/runtime/orte_globals.h"
51 #include "orte/runtime/orte_wait.h"
52 #include "orte/mca/errmgr/errmgr.h"
53 #include "orte/mca/rml/rml.h"
54 #include "orte/mca/rml/rml_types.h"
55 #include "orte/mca/odls/odls_types.h"
56 #include "orte/mca/filem/filem.h"
57 #include "orte/mca/filem/base/base.h"
58 
59 #include "orte/mca/sstore/sstore.h"
60 #include "orte/mca/sstore/base/base.h"
61 
62 #include "sstore_stage.h"
63 
64 /**********
65  * Object stuff
66  **********/
67 #define SSTORE_LOCAL_NONE   0
68 #define SSTORE_LOCAL_ERROR  1
69 #define SSTORE_LOCAL_INIT   2
70 #define SSTORE_LOCAL_READY  3
71 #define SSTORE_LOCAL_SYNCED 4
72 #define SSTORE_LOCAL_DONE   5
73 
74 struct  orte_sstore_stage_local_snapshot_info_t {
75     /** List super object */
76     opal_list_item_t super;
77 
78     /** */
79     orte_sstore_base_handle_t id;
80 
81     /** Status */
82     int status;
83 
84     /** Sequence Number */
85     int seq_num;
86 
87     /** Global Reference Name */
88     char * global_ref_name;
89 
90     /** Local Location Format String */
91     char * location_fmt;
92 
93     /** Local Cache Location Format String */
94     char * cache_location_fmt;
95 
96     /* Application info handles*/
97     opal_list_t *app_info_handle;
98 
99     /** Compress Component used */
100     char * compress_comp;
101 
102     /** Compress Component postfix */
103     char * compress_postfix;
104 
105     /** Is this checkpoint representing a migration? */
106     bool migrating;
107 };
108 typedef struct orte_sstore_stage_local_snapshot_info_t orte_sstore_stage_local_snapshot_info_t;
109 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_stage_local_snapshot_info_t);
110 
111 void orte_sstore_stage_local_snapshot_info_construct(orte_sstore_stage_local_snapshot_info_t *info);
112 void orte_sstore_stage_local_snapshot_info_destruct( orte_sstore_stage_local_snapshot_info_t *info);
113 
114 OBJ_CLASS_INSTANCE(orte_sstore_stage_local_snapshot_info_t,
115                    opal_list_item_t,
116                    orte_sstore_stage_local_snapshot_info_construct,
117                    orte_sstore_stage_local_snapshot_info_destruct);
118 
119 struct  orte_sstore_stage_local_app_snapshot_info_t {
120     /** List super object */
121     opal_list_item_t super;
122 
123     /** Process Name associated with this entry */
124     orte_process_name_t name;
125 
126     /** Local Location (Absolute Path) */
127     char * local_location;
128 
129     /** Compressed Local Location (Absolute Path) */
130     char * compressed_local_location;
131 
132     /** Local Cache Location (Absolute Path) */
133     char * local_cache_location;
134 
135     /** Metadata File Name (Absolute Path) */
136     char * metadata_filename;
137 
138     /** CRS Component used */
139     char * crs_comp;
140 
141     /** If this app. skipped the checkpoint - usually for non-migrating procs */
142     bool ckpt_skipped;
143 
144     /** Compression PID to wait on */
145     pid_t compress_pid;
146 };
147 typedef struct orte_sstore_stage_local_app_snapshot_info_t orte_sstore_stage_local_app_snapshot_info_t;
148 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_stage_local_app_snapshot_info_t);
149 
150 void orte_sstore_stage_local_app_snapshot_info_construct(orte_sstore_stage_local_app_snapshot_info_t *info);
151 void orte_sstore_stage_local_app_snapshot_info_destruct( orte_sstore_stage_local_app_snapshot_info_t *info);
152 
153 OBJ_CLASS_INSTANCE(orte_sstore_stage_local_app_snapshot_info_t,
154                    opal_list_item_t,
155                    orte_sstore_stage_local_app_snapshot_info_construct,
156                    orte_sstore_stage_local_app_snapshot_info_destruct);
157 
158 
159 
160 /**********
161  * Local Function and Variable Declarations
162  **********/
163 static bool is_global_listener_active = false;
164 static int sstore_stage_local_start_listener(void);
165 static int sstore_stage_local_stop_listener(void);
166 static void sstore_stage_local_recv(int status,
167                                       orte_process_name_t* sender,
168                                       opal_buffer_t* buffer,
169                                       orte_rml_tag_t tag,
170                                       void* cbdata);
171 
172 static int process_global_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
173 static int process_global_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
174 static int process_global_remove(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
175 static int process_app_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
176 static int process_app_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
177 
178 static orte_sstore_stage_local_snapshot_info_t *create_new_handle_info(orte_sstore_base_handle_t handle);
179 static orte_sstore_stage_local_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle);
180 static orte_sstore_stage_local_snapshot_info_t *find_handle_info_ref(char * ref, int seq);
181 
182 static int append_new_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
183                                       orte_process_name_t *name);
184 static orte_sstore_stage_local_app_snapshot_info_t *find_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
185                                                                            orte_process_name_t *name);
186 
187 static int pull_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info );
188 static int push_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info );
189 
190 static int wait_all_apps_updated(orte_sstore_stage_local_snapshot_info_t *handle_info);
191 
192 static int start_compression(orte_sstore_stage_local_snapshot_info_t *handle_info,
193                              orte_sstore_stage_local_app_snapshot_info_t *app_info);
194 static void sstore_stage_local_compress_waitpid_cb(orte_proc_t *proc, void* cbdata);
195 static int wait_all_compressed(orte_sstore_stage_local_snapshot_info_t *handle_info);
196 
197 static int orte_sstore_stage_local_preload_files(char **local_location, bool *skip_xfer,
198                                                  char *global_loc, char *ref, char *postfix, int seq);
199 
200 static int sstore_stage_create_local_dir(void);
201 static int sstore_stage_destroy_local_dir(void);
202 
203 static int sstore_stage_create_cache(void);
204 static int sstore_stage_update_cache(orte_sstore_stage_local_snapshot_info_t *handle_info);
205 static int sstore_stage_destroy_cache(void);
206 
207 static opal_list_t *active_handles = NULL;
208 static char * sstore_stage_local_basedir = NULL;
209 
210 static char * sstore_stage_cache_basedir = NULL;
211 
212 static char * sstore_stage_cache_current_dir = NULL;
213 static char * sstore_stage_cache_last_dir    = NULL;
214 
215 static opal_list_t * preload_filem_requests = NULL;
216 
217 /**********
218  * Object stuff
219  **********/
orte_sstore_stage_local_snapshot_info_construct(orte_sstore_stage_local_snapshot_info_t * info)220 void orte_sstore_stage_local_snapshot_info_construct(orte_sstore_stage_local_snapshot_info_t *info)
221 {
222     info->id      = 0;
223 
224     info->status = SSTORE_LOCAL_NONE;
225 
226     info->seq_num = -1;
227 
228     info->global_ref_name = NULL;
229 
230     info->location_fmt    = NULL;
231 
232     info->cache_location_fmt    = NULL;
233 
234     info->app_info_handle = OBJ_NEW(opal_list_t);
235 
236     info->compress_comp = NULL;
237 
238     info->compress_postfix = NULL;
239 
240     info->migrating = false;
241 }
242 
orte_sstore_stage_local_snapshot_info_destruct(orte_sstore_stage_local_snapshot_info_t * info)243 void orte_sstore_stage_local_snapshot_info_destruct( orte_sstore_stage_local_snapshot_info_t *info)
244 {
245     info->id      = 0;
246 
247     info->status = SSTORE_LOCAL_NONE;
248 
249     info->seq_num = -1;
250 
251     if( NULL != info->global_ref_name ) {
252         free( info->global_ref_name );
253         info->global_ref_name  = NULL;
254     }
255 
256     if( NULL != info->location_fmt ) {
257         free( info->location_fmt );
258         info->location_fmt = NULL;
259     }
260 
261     if( NULL != info->cache_location_fmt ) {
262         free( info->cache_location_fmt );
263         info->cache_location_fmt = NULL;
264     }
265 
266     if( NULL != info->app_info_handle ) {
267         OBJ_RELEASE(info->app_info_handle);
268         info->app_info_handle = NULL;
269     }
270 
271     if( NULL != info->compress_comp ) {
272         free(info->compress_comp);
273         info->compress_comp = NULL;
274     }
275 
276     if( NULL != info->compress_postfix ) {
277         free(info->compress_postfix);
278         info->compress_postfix = NULL;
279     }
280 
281     info->migrating = false;
282 }
283 
orte_sstore_stage_local_app_snapshot_info_construct(orte_sstore_stage_local_app_snapshot_info_t * info)284 void orte_sstore_stage_local_app_snapshot_info_construct(orte_sstore_stage_local_app_snapshot_info_t *info)
285 {
286     info->name.jobid = ORTE_JOBID_INVALID;
287     info->name.vpid  = ORTE_VPID_INVALID;
288 
289     info->local_location = NULL;
290     info->compressed_local_location = NULL;
291     info->local_cache_location = NULL;
292     info->metadata_filename = NULL;
293     info->crs_comp = NULL;
294     info->ckpt_skipped = false;
295     info->compress_pid = 0;
296 }
297 
orte_sstore_stage_local_app_snapshot_info_destruct(orte_sstore_stage_local_app_snapshot_info_t * info)298 void orte_sstore_stage_local_app_snapshot_info_destruct( orte_sstore_stage_local_app_snapshot_info_t *info)
299 {
300     info->name.jobid = ORTE_JOBID_INVALID;
301     info->name.vpid  = ORTE_VPID_INVALID;
302 
303     if( NULL != info->local_location ) {
304         free(info->local_location);
305         info->local_location = NULL;
306     }
307 
308     if( NULL != info->compressed_local_location ) {
309         free(info->compressed_local_location);
310         info->compressed_local_location = NULL;
311     }
312 
313     if( NULL != info->local_cache_location ) {
314         free(info->local_cache_location);
315         info->local_cache_location = NULL;
316     }
317 
318     if( NULL != info->metadata_filename ) {
319         free(info->metadata_filename);
320         info->metadata_filename = NULL;
321     }
322 
323     if( NULL != info->crs_comp ) {
324         free(info->crs_comp);
325         info->crs_comp = NULL;
326     }
327 
328     info->ckpt_skipped = false;
329 
330     info->compress_pid = 0;
331 }
332 
333 /******************
334  * Local functions
335  ******************/
orte_sstore_stage_local_module_init(void)336 int orte_sstore_stage_local_module_init(void)
337 {
338     int ret, exit_status = ORTE_SUCCESS;
339 
340     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
341                          "sstore:stage:(local): init()"));
342 
343     if( NULL == active_handles ) {
344         active_handles = OBJ_NEW(opal_list_t);
345     }
346 
347     if( NULL == preload_filem_requests ) {
348         preload_filem_requests = OBJ_NEW(opal_list_t);
349     }
350 
351     /*
352      * Create the local storage directory
353      */
354     asprintf(&sstore_stage_local_basedir, "%s/%s/%s",
355              orte_sstore_stage_local_snapshot_dir,
356              ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
357              ORTE_SSTORE_LOCAL_SNAPSHOT_STAGE_DIR_NAME);
358     if( ORTE_SUCCESS != (ret = sstore_stage_create_local_dir()) ) {
359         ORTE_ERROR_LOG(ret);
360         exit_status = ret;
361         goto cleanup;
362     }
363 
364     /*
365      * Create the local cache
366      */
367     if( orte_sstore_stage_enabled_caching ) {
368         asprintf(&sstore_stage_cache_basedir, "%s/%s/%s",
369                  orte_sstore_stage_local_snapshot_dir,
370                  ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
371                  ORTE_SSTORE_LOCAL_SNAPSHOT_CACHE_DIR_NAME);
372 
373         if( ORTE_SUCCESS != (ret = sstore_stage_create_cache()) ) {
374             ORTE_ERROR_LOG(ret);
375             exit_status = ret;
376             goto cleanup;
377         }
378     }
379 
380     /*
381      * Setup a listener for the HNP/Apps
382      * We could be the HNP, in which case the listener is already registered.
383      */
384     if( !ORTE_PROC_IS_HNP ) {
385         if( ORTE_SUCCESS != (ret = sstore_stage_local_start_listener()) ) {
386             ORTE_ERROR_LOG(ret);
387             exit_status = ret;
388             goto cleanup;
389         }
390     }
391 
392  cleanup:
393     return exit_status;
394 }
395 
orte_sstore_stage_local_module_finalize(void)396 int orte_sstore_stage_local_module_finalize(void)
397 {
398     int ret, exit_status = ORTE_SUCCESS;
399     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
400     opal_list_item_t* item = NULL;
401     bool done = false;
402     int cur_time = 0, max_time = 120;
403 
404     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
405                          "sstore:stage:(local): finalize()"));
406 
407     /*
408      * Wait for all active transfers to finish
409      */
410     if( !ORTE_PROC_IS_HNP ) {
411         done = false;
412         while( 0 < opal_list_get_size(active_handles) && !done ) {
413             done = true;
414             for(item  = opal_list_get_first(active_handles);
415                 item != opal_list_get_end(active_handles);
416                 item  = opal_list_get_next(item) ) {
417                 handle_info = (orte_sstore_stage_local_snapshot_info_t*)item;
418                 if( SSTORE_LOCAL_DONE  != handle_info->status &&
419                     SSTORE_LOCAL_NONE  != handle_info->status &&
420                     SSTORE_LOCAL_ERROR != handle_info->status ) {
421                     done = false;
422                     break;
423                 }
424             }
425             if( done ) {
426                 break;
427             }
428             else {
429                 if( cur_time != 0 && cur_time % 30 == 0 ) {
430                     opal_output(0, "---> Waiting for fin(): %3d / %3d\n",
431                                 cur_time, max_time);
432                 }
433 
434                 opal_progress();
435                 if( cur_time >= max_time ) {
436                     break;
437                 } else {
438                     sleep(1);
439                 }
440                 cur_time++;
441             }
442         }
443     }
444 
445     if( NULL != active_handles ) {
446         OBJ_RELEASE(active_handles);
447     }
448 
449     if( NULL != preload_filem_requests ) {
450         OBJ_RELEASE(preload_filem_requests);
451     }
452 
453     /*
454      * Shutdown the listener for the HNP/Apps
455      * We could be the HNP, in which case the listener is already deregistered.
456      */
457     if( !ORTE_PROC_IS_HNP ) {
458         if( ORTE_SUCCESS != (ret = sstore_stage_local_stop_listener()) ) {
459             ORTE_ERROR_LOG(ret);
460             exit_status = ret;
461             goto cleanup;
462         }
463     }
464 
465     /*
466      * Destroy the local cache
467      */
468     if( orte_sstore_stage_enabled_caching ) {
469         if( ORTE_SUCCESS != (ret = sstore_stage_destroy_cache()) ) {
470             ORTE_ERROR_LOG(ret);
471             exit_status = ret;
472             goto cleanup;
473         }
474     }
475 
476     /*
477      * Destroy the local storage directory
478      */
479     if( ORTE_SUCCESS != (ret = sstore_stage_destroy_local_dir()) ) {
480         ORTE_ERROR_LOG(ret);
481         exit_status = ret;
482         goto cleanup;
483     }
484 
485  cleanup:
486     if( orte_sstore_stage_enabled_caching ) {
487         if( NULL != sstore_stage_cache_basedir ) {
488             free(sstore_stage_cache_basedir);
489             sstore_stage_cache_basedir = NULL;
490         }
491     }
492 
493     if( NULL != sstore_stage_local_basedir ) {
494         free(sstore_stage_local_basedir);
495         sstore_stage_local_basedir = NULL;
496     }
497 
498     return exit_status;
499 }
500 
orte_sstore_stage_local_request_checkpoint_handle(orte_sstore_base_handle_t * handle,int seq,orte_jobid_t jobid)501 int orte_sstore_stage_local_request_checkpoint_handle(orte_sstore_base_handle_t *handle, int seq, orte_jobid_t jobid)
502 {
503     opal_output(0, "sstore:stage:(local): request_checkpoint_handle() Not implemented!");
504     return ORTE_ERR_NOT_IMPLEMENTED;
505 }
506 
orte_sstore_stage_local_register(orte_sstore_base_handle_t handle)507 int orte_sstore_stage_local_register(orte_sstore_base_handle_t handle)
508 {
509     int ret, exit_status = ORTE_SUCCESS;
510     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
511 
512     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
513                          "sstore:stage:(local): register()"));
514 
515     /*
516      * Create a handle
517      */
518     if( NULL == (handle_info = find_handle_info(handle)) ) {
519         handle_info = create_new_handle_info(handle);
520     }
521 
522     /*
523      * Get basic information from Global SStore
524      */
525     if( ORTE_SUCCESS != (ret = pull_handle_info(handle_info)) ) {
526         ORTE_ERROR_LOG(ret);
527         exit_status = ret;
528         goto cleanup;
529     }
530 
531     /*
532      * Wait here until the pull request has been satisfied
533      */
534     while(SSTORE_LOCAL_READY != handle_info->status &&
535           SSTORE_LOCAL_ERROR != handle_info->status ) {
536         opal_progress();
537     }
538 
539  cleanup:
540     return exit_status;
541 }
542 
orte_sstore_stage_local_get_attr(orte_sstore_base_handle_t handle,orte_sstore_base_key_t key,char ** value)543 int orte_sstore_stage_local_get_attr(orte_sstore_base_handle_t handle,  orte_sstore_base_key_t key, char **value)
544 {
545     opal_output(0, "sstore:stage:(local): get_attr() Not implemented!");
546     return ORTE_ERR_NOT_IMPLEMENTED;
547 }
548 
orte_sstore_stage_local_set_attr(orte_sstore_base_handle_t handle,orte_sstore_base_key_t key,char * value)549 int orte_sstore_stage_local_set_attr(orte_sstore_base_handle_t handle,  orte_sstore_base_key_t key, char *value)
550 {
551     opal_output(0, "sstore:stage:(local): set_attr() Not implemented!");
552     return ORTE_ERR_NOT_IMPLEMENTED;
553 }
554 
orte_sstore_stage_local_sync(orte_sstore_base_handle_t handle)555 int orte_sstore_stage_local_sync(orte_sstore_base_handle_t handle)
556 {
557     int ret, exit_status = ORTE_SUCCESS;
558     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
559 
560     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
561                          "sstore:stage:(local): sync()"));
562 
563     /*
564      * Lookup the handle
565      */
566     handle_info = find_handle_info(handle);
567 
568     /*
569      * Wait for all of the applications to update their metadata
570      */
571     if( ORTE_SUCCESS != (ret = wait_all_apps_updated(handle_info))) {
572         ORTE_ERROR_LOG(ret);
573         exit_status = ret;
574         goto cleanup;
575     }
576 
577     /*
578      * Wait for compression to finish
579      */
580     if( orte_sstore_stage_enabled_compression ) {
581         if( ORTE_SUCCESS != (ret = wait_all_compressed(handle_info))) {
582             ORTE_ERROR_LOG(ret);
583             exit_status = ret;
584             goto cleanup;
585         }
586     }
587 
588     /*
589      * Push information to the Global coordinator
590      */
591     if( ORTE_SUCCESS != (ret = push_handle_info(handle_info)) ) {
592         ORTE_ERROR_LOG(ret);
593         exit_status = ret;
594         goto cleanup;
595     }
596 
597     handle_info->status = SSTORE_LOCAL_SYNCED;
598 
599  cleanup:
600     return exit_status;
601 }
602 
orte_sstore_stage_local_remove(orte_sstore_base_handle_t handle)603 int orte_sstore_stage_local_remove(orte_sstore_base_handle_t handle)
604 {
605     opal_output(0, "sstore:stage:(local): remove() Not implemented!");
606     return ORTE_ERR_NOT_IMPLEMENTED;
607 }
608 
orte_sstore_stage_local_pack(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_base_handle_t handle)609 int orte_sstore_stage_local_pack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t handle)
610 {
611     int ret, exit_status = ORTE_SUCCESS;
612 
613     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
614                          "sstore:stage:(local): pack()"));
615 
616     /*
617      * Lookup the handle
618      */
619 
620 
621     /*
622      * Pack the handle ID
623      */
624     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &handle, 1, ORTE_SSTORE_HANDLE))) {
625         ORTE_ERROR_LOG(ret);
626         exit_status = ret;
627         goto cleanup;
628     }
629 
630     /*
631      * Pack any metadata
632      */
633 
634  cleanup:
635     return exit_status;
636 }
637 
orte_sstore_stage_local_unpack(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_base_handle_t * handle)638 int orte_sstore_stage_local_unpack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t *handle)
639 {
640     int ret, exit_status = ORTE_SUCCESS;
641     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
642     orte_std_cntr_t count;
643 
644     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
645                          "sstore:stage:(local): unpack()"));
646 
647     /*
648      * Unpack the handle id
649      */
650     count = 1;
651     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, handle, &count, ORTE_SSTORE_HANDLE))) {
652         ORTE_ERROR_LOG(ret);
653         exit_status = ret;
654         goto cleanup;
655     }
656 
657     /*
658      * Lookup the handle
659      */
660     if( NULL == (handle_info = find_handle_info(*handle)) ) {
661         handle_info = create_new_handle_info(*handle);
662     }
663 
664     /*
665      * Unpack the metadata piggybacked on this message
666      */
667     if( ORTE_SUCCESS != (ret = process_global_push(peer, buffer, handle_info))) {
668         ORTE_ERROR_LOG(ret);
669         exit_status = ret;
670         goto cleanup;
671     }
672 
673  cleanup:
674     return exit_status;
675 }
676 
orte_sstore_stage_local_fetch_app_deps(orte_app_context_t * app)677 int orte_sstore_stage_local_fetch_app_deps(orte_app_context_t *app)
678 {
679     int ret, exit_status = ORTE_SUCCESS;
680     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
681     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
682     char **sstore_args = NULL;
683     char * req_snap_loc = NULL;
684     char * req_snap_global_ref = NULL;
685     char * req_snap_ref = NULL;
686     char * req_snap_postfix = NULL;
687     char * local_location = NULL;
688     char * req_snap_compress = NULL;
689     char * compress_local_location = NULL;
690     char * compress_ref = NULL;
691     char * tmp_str = NULL;
692     int req_snap_seq = 0;
693     int i;
694     orte_proc_t *child = NULL;
695     int loc_argc = 0;
696     bool skip_xfer = false;
697     char *sload = NULL;
698 
699     orte_get_attribute(&app->attributes, ORTE_APP_SSTORE_LOAD, (void **)&sload, OPAL_STRING);
700 
701     if(!ORTE_FLAG_TEST(app, ORTE_APP_FLAG_USED_ON_NODE) || NULL == sload) {
702         OPAL_OUTPUT_VERBOSE((30, mca_sstore_stage_component.super.output_handle,
703                              "sstore:stage:(local): fetch_app_deps(%3d): Not for this daemon (%s, %d, %s)",
704                              app->idx, (ORTE_FLAG_TEST(app, ORTE_APP_FLAG_USED_ON_NODE) ? "T" : "F"),
705                              (int)app->num_procs, sload));
706         /* Nothing to do */
707         goto cleanup;
708     }
709 
710     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
711                          "sstore:stage:(local): fetch_app_deps(%3d): %s",
712                          app->idx, sload));
713 
714     /*
715      * Extract the 'ref:seq' parameter
716      */
717     sstore_args = opal_argv_split(sload, ':');
718     req_snap_loc        = strdup(sstore_args[0]);
719     req_snap_global_ref = strdup(sstore_args[1]);
720     req_snap_ref        = strdup(sstore_args[2]);
721     if( NULL == sstore_args[4] ) { /* Not compressed */
722         req_snap_seq        = atoi(  sstore_args[3]);
723     } else {
724         req_snap_compress   = strdup(sstore_args[3]);
725         req_snap_postfix    = strdup(sstore_args[4]);
726         req_snap_seq        = atoi(  sstore_args[5]);
727     }
728 
729     handle_info = find_handle_info_ref(req_snap_global_ref, req_snap_seq);
730     if( NULL == handle_info ) {
731         /* No checkpoints known, just preload the checkpoint */
732         OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
733                              "sstore:stage:(local): fetch_app_deps(%3d): No known checkpoint [%s, %d]",
734                              app->idx,
735                              req_snap_ref,
736                              req_snap_seq));
737         goto filem_preload;
738     }
739 
740     /*
741      * If caching enabled, then look to see if we have this snapshot cached
742      * Do not cache if migrating, since checkpoints taken while migrating are
743      * not guaranteed to be globally taken.
744      */
745     if( orte_sstore_stage_enabled_caching && !handle_info->migrating ) {
746         /*
747          * Find the process
748          */
749         for (i=0; i < orte_local_children->size; i++) {
750             if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
751                 continue;
752             }
753 
754             if( app->idx == child->app_idx ) {
755                 /*
756                  * Find the app snapshot ref
757                  */
758                 app_info = find_app_handle_info(handle_info, &child->name);
759                 break;
760             }
761         }
762 
763         if( NULL == app_info ) {
764             OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
765                                  "sstore:stage:(local): fetch_app_deps(%3d): No processes known for this app context",
766                                  app->idx));
767             goto filem_preload;
768         }
769 
770         /*
771          * Do we have a cached version of this file?
772          */
773         if( NULL != app_info->local_cache_location &&
774             0 == (ret = access(app_info->local_cache_location, F_OK)) ) {
775             OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
776                                  "sstore:stage:(local): fetch_app_deps(%3d): Using local cache. (%s)",
777                                  app->idx,
778                                  app_info->local_cache_location));
779 
780             opal_argv_append(&loc_argc, &(app->argv), "-c");
781             opal_argv_append(&loc_argc, &(app->argv), app_info->local_cache_location);
782             goto cleanup;
783         } else {
784             OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
785                                  "sstore:stage:(local): fetch_app_deps(%3d): No cache available for %s. (%s)",
786                                  app->idx,
787                                  ORTE_NAME_PRINT(&app_info->name),
788                                  app_info->local_cache_location));
789         }
790     }
791 
792  filem_preload:
793     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
794                          "sstore:stage:(local): fetch_app_deps(%3d): Fetch files from Central storage",
795                          app->idx));
796 
797     /*
798      * If we got here, then there is no cached directory, so just preload the
799      * files, update the argument set, and carry on.
800      */
801     if( ORTE_SUCCESS != (ret = orte_sstore_stage_local_preload_files(&local_location,
802                                                                      &skip_xfer,
803                                                                      req_snap_loc,
804                                                                      req_snap_ref,
805                                                                      req_snap_postfix,
806                                                                      req_snap_seq)) ) {
807         ORTE_ERROR_LOG(ret);
808         exit_status = ret;
809         goto cleanup;
810     }
811     opal_argv_append(&loc_argc, &(app->argv), "-l");
812     opal_argv_append(&loc_argc, &(app->argv), local_location);
813 
814     /*
815      * Decompress files:
816      *  opal-restart will do this for us on launch
817      */
818     if( !skip_xfer ) {
819         if( NULL != req_snap_compress && 0 < strlen(req_snap_compress) ) {
820             opal_argv_append(&loc_argc, &(app->argv), "-d");
821             opal_argv_append(&loc_argc, &(app->argv), req_snap_compress);
822         }
823         if( NULL != req_snap_postfix && 0 < strlen(req_snap_postfix) ) {
824             opal_argv_append(&loc_argc, &(app->argv), "-p");
825             opal_argv_append(&loc_argc, &(app->argv), req_snap_postfix);
826         }
827     }
828 
829     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
830                          "sstore:stage:(local): fetch_app_deps(%3d): Fetching to (%s)",
831                          app->idx,
832                          local_location));
833 
834  cleanup:
835     if( NULL != req_snap_compress ) {
836         free(req_snap_compress);
837         req_snap_compress = NULL;
838     }
839 
840     if( NULL != tmp_str ) {
841         free(tmp_str);
842         tmp_str = NULL;
843     }
844 
845     if( NULL != compress_local_location ) {
846         free(compress_local_location);
847         compress_local_location = NULL;
848     }
849 
850     if( NULL != compress_ref ) {
851         free(compress_ref);
852         compress_ref = NULL;
853     }
854 
855     if( NULL != sstore_args ) {
856         opal_argv_free(sstore_args);
857         sstore_args = NULL;
858     }
859 
860     if( NULL != req_snap_ref ) {
861         free(req_snap_ref);
862         req_snap_ref = NULL;
863     }
864 
865     if( NULL != req_snap_postfix ) {
866         free(req_snap_postfix);
867         req_snap_postfix = NULL;
868     }
869 
870     if( NULL != req_snap_loc ) {
871         free(req_snap_loc);
872         req_snap_loc = NULL;
873     }
874 
875     if( NULL != req_snap_global_ref ) {
876         free(req_snap_global_ref);
877         req_snap_global_ref = NULL;
878     }
879 
880     return exit_status;
881 }
882 
orte_sstore_stage_local_wait_all_deps(void)883 int orte_sstore_stage_local_wait_all_deps(void)
884 {
885     int ret, exit_status = ORTE_SUCCESS;
886     opal_list_item_t* item = NULL;
887 
888     /* Nothing being preloaded, so just move on */
889     if( 0 >= opal_list_get_size(preload_filem_requests) ) {
890         return ORTE_SUCCESS;
891     }
892 
893     /*
894      * Wait for all files to move
895      */
896     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
897                          "sstore:stage:(local): wait_all_deps(): Waiting on %d requests",
898                          (int)opal_list_get_size(preload_filem_requests)));
899 
900     if(ORTE_SUCCESS != (ret = orte_filem.wait_all(preload_filem_requests)) ) {
901         ORTE_ERROR_LOG(ret);
902         exit_status = ret;
903         goto cleanup;
904     }
905 
906     /*
907      * Cache the restart files locally, so we can restart faster next time
908      * JJH: We already check the restart directory for a local copy before
909      *      starting the transfer. So this feels unnecessary since the
910      *      restart directory is always used as a cache, whether or not
911      *      caching is enabled. The extra copy to the cache directory
912      *      does not buy us anything.
913      */
914 
915     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
916                          "sstore:stage:(local): wait_all_deps(): Finished waiting on %d requests!",
917                          (int)opal_list_get_size(preload_filem_requests)));
918 
919  cleanup:
920     while (NULL != (item = opal_list_remove_first(preload_filem_requests) ) ) {
921         OBJ_RELEASE(item);
922     }
923 
924     return exit_status;
925 }
926 
927 /**************************
928  * Local functions
929  **************************/
create_new_handle_info(orte_sstore_base_handle_t handle)930 static orte_sstore_stage_local_snapshot_info_t *create_new_handle_info(orte_sstore_base_handle_t handle)
931 {
932     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
933     int i;
934     orte_proc_t *child = NULL;
935 
936     if( NULL == active_handles ) {
937         active_handles = OBJ_NEW(opal_list_t);
938     }
939 
940     handle_info = OBJ_NEW(orte_sstore_stage_local_snapshot_info_t);
941 
942     handle_info->id = handle;
943 
944     opal_list_append(active_handles, &(handle_info->super));
945 
946     /*
947      * Create a sub structure for each child
948      */
949     for (i=0; i < orte_local_children->size; i++) {
950 	    if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
951             continue;
952         }
953         append_new_app_handle_info(handle_info, &child->name);
954     }
955 
956     handle_info->status = SSTORE_LOCAL_INIT;
957 
958     return handle_info;
959 }
960 
find_handle_info(orte_sstore_base_handle_t handle)961 static orte_sstore_stage_local_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle)
962 {
963     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
964     opal_list_item_t* item = NULL;
965 
966     if( NULL == active_handles ) {
967         return NULL;
968     }
969 
970     for(item  = opal_list_get_first(active_handles);
971         item != opal_list_get_end(active_handles);
972         item  = opal_list_get_next(item) ) {
973         handle_info = (orte_sstore_stage_local_snapshot_info_t*)item;
974 
975         if( handle_info->id == handle ) {
976             return handle_info;
977         }
978     }
979 
980     return NULL;
981 }
982 
find_handle_info_ref(char * ref,int seq)983 static orte_sstore_stage_local_snapshot_info_t *find_handle_info_ref(char * ref, int seq)
984 {
985     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
986     opal_list_item_t* item = NULL;
987 
988     if( NULL == active_handles ) {
989         return NULL;
990     }
991 
992     for(item  = opal_list_get_first(active_handles);
993         item != opal_list_get_end(active_handles);
994         item  = opal_list_get_next(item) ) {
995         handle_info = (orte_sstore_stage_local_snapshot_info_t*)item;
996 
997         if( 0 == strncmp(handle_info->global_ref_name, ref, strlen(ref)) &&
998             handle_info->seq_num == seq ) {
999             return handle_info;
1000         }
1001     }
1002 
1003     return NULL;
1004 }
1005 
append_new_app_handle_info(orte_sstore_stage_local_snapshot_info_t * handle_info,orte_process_name_t * name)1006 static int append_new_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
1007                                       orte_process_name_t *name)
1008 {
1009     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1010 
1011     app_info = OBJ_NEW(orte_sstore_stage_local_app_snapshot_info_t);
1012 
1013     app_info->name.jobid = name->jobid;
1014     app_info->name.vpid  = name->vpid;
1015 
1016     opal_list_append(handle_info->app_info_handle, &(app_info->super));
1017 
1018     return ORTE_SUCCESS;
1019 }
1020 
find_app_handle_info(orte_sstore_stage_local_snapshot_info_t * handle_info,orte_process_name_t * name)1021 static orte_sstore_stage_local_app_snapshot_info_t *find_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
1022                                                                            orte_process_name_t *name)
1023 {
1024     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1025     opal_list_item_t* item = NULL;
1026     orte_ns_cmp_bitmask_t mask;
1027 
1028     for(item  = opal_list_get_first(handle_info->app_info_handle);
1029         item != opal_list_get_end(handle_info->app_info_handle);
1030         item  = opal_list_get_next(item) ) {
1031         app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1032 
1033         mask = ORTE_NS_CMP_ALL;
1034 
1035         if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &app_info->name, name)) {
1036             return app_info;
1037         }
1038     }
1039 
1040     return NULL;
1041 }
1042 
sstore_stage_local_start_listener(void)1043 static int sstore_stage_local_start_listener(void)
1044 {
1045     if( is_global_listener_active ) {
1046         return ORTE_SUCCESS;
1047     }
1048 
1049     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL,
1050                             ORTE_RML_PERSISTENT, sstore_stage_local_recv, NULL);
1051 
1052     is_global_listener_active = true;
1053     return ORTE_SUCCESS;
1054 }
1055 
sstore_stage_local_stop_listener(void)1056 static int sstore_stage_local_stop_listener(void)
1057 {
1058     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL);
1059     is_global_listener_active = false;
1060     return ORTE_SUCCESS;
1061 }
1062 
sstore_stage_local_recv(int status,orte_process_name_t * sender,opal_buffer_t * buffer,orte_rml_tag_t tag,void * cbdata)1063 static void sstore_stage_local_recv(int status,
1064                                     orte_process_name_t* sender,
1065                                     opal_buffer_t* buffer,
1066                                     orte_rml_tag_t tag,
1067                                     void* cbdata)
1068 {
1069     int ret;
1070     orte_sstore_stage_cmd_flag_t command;
1071     orte_std_cntr_t count;
1072     orte_sstore_base_handle_t loc_id;
1073 
1074     if( ORTE_RML_TAG_SSTORE_INTERNAL != tag ) {
1075         return;
1076     }
1077 
1078     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1079                          "sstore:stage:(local): process_cmd(%s)",
1080                          ORTE_NAME_PRINT(sender)));
1081 
1082     count = 1;
1083     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_STAGE_CMD))) {
1084         ORTE_ERROR_LOG(ret);
1085         goto cleanup;
1086     }
1087 
1088     count = 1;
1089     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE )) ) {
1090         ORTE_ERROR_LOG(ret);
1091         goto cleanup;
1092     }
1093 
1094     orte_sstore_stage_local_process_cmd_action(sender, command, loc_id, buffer);
1095 
1096  cleanup:
1097     return;
1098 }
1099 
orte_sstore_stage_local_process_cmd_action(orte_process_name_t * sender,orte_sstore_stage_cmd_flag_t command,orte_sstore_base_handle_t loc_id,opal_buffer_t * buffer)1100 int orte_sstore_stage_local_process_cmd_action(orte_process_name_t *sender,
1101                                                orte_sstore_stage_cmd_flag_t command,
1102                                                orte_sstore_base_handle_t loc_id,
1103                                                opal_buffer_t* buffer)
1104 {
1105     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
1106 
1107     /*
1108      * Find the referenced handle (Create if it does not exist)
1109      */
1110     if(NULL == (handle_info = find_handle_info(loc_id)) ) {
1111         handle_info = create_new_handle_info(loc_id);
1112     }
1113 
1114     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1115                          "sstore:stage:(local): process_cmd(%s) - Command = %s",
1116                          ORTE_NAME_PRINT(sender),
1117                          (ORTE_SSTORE_STAGE_PULL == command ? "Pull" :
1118                           (ORTE_SSTORE_STAGE_PUSH == command ? "Push" :
1119                            (ORTE_SSTORE_STAGE_REMOVE == command ? "Remove" :
1120                             (ORTE_SSTORE_STAGE_DONE == command ? "Done" : "Unknown")))) ));
1121 
1122     /*
1123      * Process the command
1124      */
1125     if( ORTE_SSTORE_STAGE_PULL == command ) {
1126         if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_HNP, sender)) {
1127             process_global_pull(sender, buffer, handle_info);
1128         } else {
1129             process_app_pull(sender, buffer, handle_info);
1130         }
1131     }
1132     else if( ORTE_SSTORE_STAGE_PUSH == command ) {
1133         if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_HNP, sender)) {
1134             process_global_push(sender, buffer, handle_info);
1135         } else {
1136             process_app_push(sender, buffer, handle_info);
1137         }
1138     }
1139     else if( ORTE_SSTORE_STAGE_REMOVE == command ) {
1140         /* The xcast from the root makes the 'sender' equal to this process :/
1141          * so we know it is the HNP, so just use that name */
1142         process_global_remove(ORTE_PROC_MY_HNP, buffer, handle_info);
1143     }
1144 
1145     return ORTE_SUCCESS;
1146 }
1147 
process_global_pull(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_local_snapshot_info_t * handle_info)1148 static int process_global_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1149 {
1150     /* JJH should be as simple as calling push_handle_info() */
1151     opal_output(0, "sstore:stage:(local): process_global_pull() Not implemented!");
1152     return ORTE_ERR_NOT_IMPLEMENTED;
1153 }
1154 
process_global_push(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_local_snapshot_info_t * handle_info)1155 static int process_global_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1156 {
1157     int ret, exit_status = ORTE_SUCCESS;
1158     orte_std_cntr_t count;
1159     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1160     opal_list_item_t* item = NULL;
1161 
1162     count = 1;
1163     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->seq_num), &count, OPAL_INT))) {
1164         ORTE_ERROR_LOG(ret);
1165         exit_status = ret;
1166         goto cleanup;
1167     }
1168 
1169     count = 1;
1170     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
1171         ORTE_ERROR_LOG(ret);
1172         exit_status = ret;
1173         goto cleanup;
1174     }
1175 
1176     count = 1;
1177     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->location_fmt), &count, OPAL_STRING))) {
1178         ORTE_ERROR_LOG(ret);
1179         exit_status = ret;
1180         goto cleanup;
1181     }
1182 
1183     if( orte_sstore_stage_enabled_caching ) {
1184         count = 1;
1185         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->cache_location_fmt), &count, OPAL_STRING))) {
1186             ORTE_ERROR_LOG(ret);
1187             exit_status = ret;
1188             goto cleanup;
1189         }
1190     }
1191 
1192     count = 1;
1193     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->migrating), &count, OPAL_BOOL))) {
1194         ORTE_ERROR_LOG(ret);
1195         exit_status = ret;
1196         goto cleanup;
1197     }
1198 
1199     /*
1200      * For each process we are working with
1201      */
1202     for(item  = opal_list_get_first(handle_info->app_info_handle);
1203         item != opal_list_get_end(handle_info->app_info_handle);
1204         item  = opal_list_get_next(item) ) {
1205         app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1206 
1207         if( NULL != app_info->local_location ) {
1208             free(app_info->local_location);
1209             app_info->local_location = NULL;
1210         }
1211         asprintf(&(app_info->local_location), handle_info->location_fmt, app_info->name.vpid);
1212 
1213         if( orte_sstore_stage_enabled_caching ) {
1214             if( NULL != app_info->local_cache_location ) {
1215                 free(app_info->local_cache_location);
1216                 app_info->local_cache_location = NULL;
1217             }
1218             asprintf(&(app_info->local_cache_location), handle_info->cache_location_fmt, app_info->name.vpid);
1219         }
1220 
1221         if( NULL != app_info->metadata_filename ) {
1222             free(app_info->metadata_filename);
1223             app_info->metadata_filename = NULL;
1224         }
1225         asprintf(&(app_info->metadata_filename), "%s/%s",
1226                  app_info->local_location,
1227                  orte_sstore_base_local_metadata_filename);
1228     }
1229 
1230  cleanup:
1231     if( ORTE_SUCCESS == exit_status ) {
1232         handle_info->status = SSTORE_LOCAL_READY;
1233     } else {
1234         handle_info->status = SSTORE_LOCAL_ERROR;
1235     }
1236 
1237     return exit_status;
1238 }
1239 
process_global_remove(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_local_snapshot_info_t * handle_info)1240 static int process_global_remove(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1241 {
1242     int ret, exit_status = ORTE_SUCCESS;
1243     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1244     opal_list_item_t* item = NULL;
1245     opal_buffer_t *loc_buffer = NULL;
1246     orte_sstore_stage_cmd_flag_t command;
1247     size_t list_size;
1248     char * cmd = NULL;
1249 
1250     /*
1251      * If not caching, then just remove the local copy
1252      * Or if migrating, since we do not cache checkpoints generated while
1253      * migrating.
1254      */
1255     if( !orte_sstore_stage_enabled_caching || handle_info->migrating ) {
1256         for(item  = opal_list_get_first(handle_info->app_info_handle);
1257             item != opal_list_get_end(handle_info->app_info_handle);
1258             item  = opal_list_get_next(item) ) {
1259             app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1260 
1261             asprintf(&cmd, "rm -rf %s", app_info->local_location);
1262             OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1263                                  "sstore:stage:(local): update_cache(): Removing with command (%s)",
1264                                  cmd));
1265             system(cmd);
1266 
1267             if( orte_sstore_stage_enabled_compression && NULL != app_info->compressed_local_location) {
1268                 free(cmd);
1269                 cmd = NULL;
1270 
1271                 asprintf(&cmd, "rm -rf %s", app_info->compressed_local_location);
1272                 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1273                                      "sstore:stage:(local): update_cache(): Removing with command (%s)",
1274                                      cmd));
1275                 system(cmd);
1276             }
1277         }
1278     }
1279     else {
1280           /*
1281            * Update the local cache
1282            */
1283           if( ORTE_SUCCESS != (ret = sstore_stage_update_cache(handle_info)) ) {
1284               ORTE_ERROR_LOG(ret);
1285               exit_status = ret;
1286               goto cleanup;
1287           }
1288     }
1289 
1290     loc_buffer = OBJ_NEW(opal_buffer_t);
1291 
1292     command = ORTE_SSTORE_STAGE_DONE;
1293     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1294         ORTE_ERROR_LOG(ret);
1295         exit_status = ret;
1296         goto cleanup;
1297     }
1298 
1299     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1300         ORTE_ERROR_LOG(ret);
1301         exit_status = ret;
1302         goto cleanup;
1303     }
1304 
1305     list_size = opal_list_get_size(handle_info->app_info_handle);
1306     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &list_size, 1, OPAL_SIZE))) {
1307         ORTE_ERROR_LOG(ret);
1308         exit_status = ret;
1309         goto cleanup;
1310     }
1311 
1312     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1313                                                        orte_rml_send_callback, NULL))) {
1314         ORTE_ERROR_LOG(ret);
1315         exit_status = ret;
1316         goto cleanup;
1317     }
1318 
1319     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1320                          "sstore:stage:(local): remove(): Sent done for %d files to %s",
1321                          (int)list_size,
1322                          ORTE_NAME_PRINT(peer)));
1323 
1324     handle_info->status = SSTORE_LOCAL_DONE;
1325     /* loc_buffer should not be released here; the callback releases it */
1326     loc_buffer = NULL;
1327 
1328  cleanup:
1329     if( NULL != cmd ) {
1330         free(cmd);
1331         cmd = NULL;
1332     }
1333 
1334     if (NULL != loc_buffer) {
1335         OBJ_RELEASE(loc_buffer);
1336         loc_buffer = NULL;
1337     }
1338 
1339     return exit_status;
1340 }
1341 
process_app_pull(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_local_snapshot_info_t * handle_info)1342 static int process_app_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1343 {
1344     int ret, exit_status = ORTE_SUCCESS;
1345     opal_buffer_t *loc_buffer = NULL;
1346     orte_sstore_stage_cmd_flag_t command;
1347     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1348 
1349     /*
1350      * Find this app's data
1351      */
1352     app_info = find_app_handle_info(handle_info, peer);
1353 
1354     /*
1355      * Push back the requested information
1356      */
1357     loc_buffer = OBJ_NEW(opal_buffer_t);
1358 
1359     command = ORTE_SSTORE_STAGE_PUSH;
1360     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1361         ORTE_ERROR_LOG(ret);
1362         exit_status = ret;
1363         goto cleanup;
1364     }
1365 
1366     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1367         ORTE_ERROR_LOG(ret);
1368         exit_status = ret;
1369         goto cleanup;
1370     }
1371 
1372     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->seq_num), 1, OPAL_INT))) {
1373         ORTE_ERROR_LOG(ret);
1374         exit_status = ret;
1375         goto cleanup;
1376     }
1377 
1378     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->global_ref_name), 1, OPAL_STRING))) {
1379         ORTE_ERROR_LOG(ret);
1380         exit_status = ret;
1381         goto cleanup;
1382     }
1383 
1384     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(app_info->local_location), 1, OPAL_STRING))) {
1385         ORTE_ERROR_LOG(ret);
1386         exit_status = ret;
1387         goto cleanup;
1388     }
1389 
1390     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(app_info->metadata_filename), 1, OPAL_STRING))) {
1391         ORTE_ERROR_LOG(ret);
1392         exit_status = ret;
1393         goto cleanup;
1394     }
1395 
1396     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1397                                                        orte_rml_send_callback, NULL))) {
1398         ORTE_ERROR_LOG(ret);
1399         exit_status = ret;
1400         goto cleanup;
1401     }
1402 
1403     /* loc_buffer should not be released here; the callback releases it */
1404     loc_buffer = NULL;
1405 
1406  cleanup:
1407     if (NULL != loc_buffer) {
1408         OBJ_RELEASE(loc_buffer);
1409         loc_buffer = NULL;
1410     }
1411 
1412     return exit_status;
1413 }
1414 
process_app_push(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_local_snapshot_info_t * handle_info)1415 static int process_app_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1416 {
1417     int ret, exit_status = ORTE_SUCCESS;
1418     orte_std_cntr_t count;
1419     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1420 
1421     /*
1422      * Find this app's data
1423      */
1424     app_info = find_app_handle_info(handle_info, peer);
1425 
1426     /*
1427      * Unpack the data
1428      */
1429     count = 1;
1430     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(app_info->ckpt_skipped), &count, OPAL_BOOL))) {
1431         ORTE_ERROR_LOG(ret);
1432         exit_status = ret;
1433         goto cleanup;
1434     }
1435 
1436     if( !app_info->ckpt_skipped ) {
1437         count = 1;
1438         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(app_info->crs_comp), &count, OPAL_STRING))) {
1439             ORTE_ERROR_LOG(ret);
1440             exit_status = ret;
1441             goto cleanup;
1442         }
1443     }
1444 
1445     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1446                          "sstore:stage:(local): app_push(%s, skip=%s, %s)",
1447                          ORTE_NAME_PRINT(&(app_info->name)),
1448                          (app_info->ckpt_skipped ? "T" : "F"),
1449                          app_info->crs_comp));
1450 
1451     /* Compression started on sync() */
1452 
1453  cleanup:
1454     return exit_status;
1455 }
1456 
wait_all_apps_updated(orte_sstore_stage_local_snapshot_info_t * handle_info)1457 static int wait_all_apps_updated(orte_sstore_stage_local_snapshot_info_t *handle_info)
1458 {
1459     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1460     opal_list_item_t *item = NULL;
1461     bool is_done = true;
1462 
1463     do {
1464         is_done = true;
1465         for(item  = opal_list_get_first(handle_info->app_info_handle);
1466             item != opal_list_get_end(handle_info->app_info_handle);
1467             item  = opal_list_get_next(item) ) {
1468             app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1469 
1470             if( NULL == app_info->crs_comp && !app_info->ckpt_skipped ) {
1471                 is_done = false;
1472                 break;
1473             }
1474         }
1475 
1476         if( !is_done ) {
1477             OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1478                                  "sstore:stage:(local): Waiting for appliccation %s",
1479                                  ORTE_NAME_PRINT(&(app_info->name)) ));
1480             opal_progress();
1481         }
1482     } while(!is_done);
1483 
1484     return ORTE_SUCCESS;
1485 }
1486 
start_compression(orte_sstore_stage_local_snapshot_info_t * handle_info,orte_sstore_stage_local_app_snapshot_info_t * app_info)1487 static int start_compression(orte_sstore_stage_local_snapshot_info_t *handle_info,
1488                              orte_sstore_stage_local_app_snapshot_info_t *app_info)
1489 {
1490     int ret, exit_status = ORTE_SUCCESS;
1491     char * postfix = NULL;
1492     orte_proc_t *proc;
1493 
1494     /* Sanity Check */
1495     if( !orte_sstore_stage_enabled_compression ) {
1496         goto cleanup;
1497     }
1498 
1499     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1500                          "sstore:stage:(local): start_compression() Starting compression for process %s of (%s)",
1501                          ORTE_NAME_PRINT(&(app_info->name)),
1502                          app_info->local_location ));
1503 
1504     /*
1505      * Start compression (nonblocking)
1506      */
1507     if( ORTE_SUCCESS != (ret = opal_compress.compress_nb(app_info->local_location,
1508                                                          &(app_info->compressed_local_location),
1509                                                          &(postfix),
1510                                                          &(app_info->compress_pid))) ) {
1511         ORTE_ERROR_LOG(ret);
1512         exit_status = ret;
1513         goto cleanup;
1514     }
1515     if( app_info->compress_pid <= 0 ) {
1516         ORTE_ERROR_LOG(ORTE_ERROR);
1517         exit_status = ORTE_ERROR;
1518         goto cleanup;
1519     }
1520 
1521     if( NULL == handle_info->compress_comp ) {
1522         handle_info->compress_comp = strdup(opal_compress_base_selected_component.base_version.mca_component_name);
1523         handle_info->compress_postfix = strdup(postfix);
1524     }
1525 
1526     /*
1527      * Setup a callback for when it is finished
1528      */
1529     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1530                          "sstore:stage:(local): start_compression() Waiting for compression (%d) for process %s",
1531                          app_info->compress_pid,
1532                          ORTE_NAME_PRINT(&(app_info->name)) ));
1533 
1534     proc = OBJ_NEW(orte_proc_t);
1535     proc->pid = app_info->compress_pid;
1536     /* be sure to mark it as alive so we don't instantly fire */
1537     ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_ALIVE);
1538 
1539     orte_wait_cb(proc, sstore_stage_local_compress_waitpid_cb, app_info);
1540 
1541  cleanup:
1542     if( NULL != postfix ) {
1543         free(postfix);
1544         postfix = NULL;
1545     }
1546 
1547     return exit_status;
1548 }
1549 
sstore_stage_local_compress_waitpid_cb(orte_proc_t * proc,void * cbdata)1550 static void sstore_stage_local_compress_waitpid_cb(orte_proc_t *proc, void* cbdata)
1551 {
1552     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1553 
1554     app_info = (orte_sstore_stage_local_app_snapshot_info_t*)cbdata;
1555 
1556     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1557                          "sstore:stage:(local): waitpid(%6d) Compression finished for Process %s",
1558                          (int)proc->pid,
1559                          ORTE_NAME_PRINT(&(app_info->name)) ));
1560 
1561     app_info->compress_pid = 0;
1562     OBJ_RELEASE(proc);
1563 }
1564 
wait_all_compressed(orte_sstore_stage_local_snapshot_info_t * handle_info)1565 static int wait_all_compressed(orte_sstore_stage_local_snapshot_info_t *handle_info)
1566 {
1567     int ret, exit_status = ORTE_SUCCESS;
1568     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1569     opal_list_item_t *item = NULL;
1570     bool is_done = true;
1571     int usleep_time = 1000;
1572     int s_time = 0, max_wait_time;
1573 
1574     /* Sanity Check */
1575     if( !orte_sstore_stage_enabled_compression ) {
1576         return ORTE_SUCCESS;
1577     }
1578 
1579     /*
1580      * Start all compression
1581      */
1582     if( orte_sstore_stage_compress_delay > 0 ) {
1583         OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1584                              "sstore:stage:(local): Delaying %d second before starting compression...",
1585                              orte_sstore_stage_compress_delay));
1586         max_wait_time = orte_sstore_stage_compress_delay * (1000000/usleep_time);
1587         for( s_time = 0; s_time < max_wait_time; ++s_time) {
1588             opal_progress();
1589             usleep(1000);
1590         }
1591     }
1592 
1593     for(item  = opal_list_get_first(handle_info->app_info_handle);
1594         item != opal_list_get_end(handle_info->app_info_handle);
1595         item  = opal_list_get_next(item) ) {
1596         app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1597 
1598         if( ORTE_SUCCESS != (ret = start_compression(handle_info, app_info)) ) {
1599             ORTE_ERROR_LOG(ret);
1600             exit_status = ret;
1601             goto cleanup;
1602         }
1603     }
1604 
1605     /*
1606      * Wait for compression to finish
1607      */
1608     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1609                          "sstore:stage:(local): Waiting for compression to finish..."));
1610     do {
1611         is_done = true;
1612         for(item  = opal_list_get_first(handle_info->app_info_handle);
1613             item != opal_list_get_end(handle_info->app_info_handle);
1614             item  = opal_list_get_next(item) ) {
1615             app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1616 
1617             if( 0 < app_info->compress_pid ) {
1618                 is_done = false;
1619                 break;
1620             }
1621         }
1622 
1623         if( !is_done ) {
1624             OPAL_OUTPUT_VERBOSE((30, mca_sstore_stage_component.super.output_handle,
1625                                  "sstore:stage:(local): Waiting for compression to finish for appliccation %s",
1626                                  ORTE_NAME_PRINT(&(app_info->name)) ));
1627             opal_progress();
1628         }
1629     } while(!is_done);
1630 
1631     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1632                          "sstore:stage:(local): Compression finished!"));
1633  cleanup:
1634     return exit_status;
1635 }
1636 
pull_handle_info(orte_sstore_stage_local_snapshot_info_t * handle_info)1637 static int pull_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info )
1638 {
1639     int ret, exit_status = ORTE_SUCCESS;
1640     opal_buffer_t *buffer = NULL;
1641     orte_sstore_stage_cmd_flag_t command;
1642 
1643     /*
1644      * Check to see if this is necessary
1645      * (Did we get all of the info from the handle unpack?)
1646      */
1647     if( 0 <= handle_info->seq_num &&
1648         NULL != handle_info->global_ref_name &&
1649         NULL != handle_info->location_fmt ) {
1650         handle_info->status = SSTORE_LOCAL_READY;
1651         return ORTE_SUCCESS;
1652     }
1653 
1654     buffer = OBJ_NEW(opal_buffer_t);
1655 
1656     /*
1657      * Ask the daemon to send us the info that we need
1658      */
1659     command = ORTE_SSTORE_STAGE_PULL;
1660     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1661         ORTE_ERROR_LOG(ret);
1662         exit_status = ret;
1663         goto cleanup;
1664     }
1665 
1666     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1667         ORTE_ERROR_LOG(ret);
1668         exit_status = ret;
1669         goto cleanup;
1670     }
1671 
1672     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
1673                                                        ORTE_RML_TAG_SSTORE_INTERNAL,
1674                                                        orte_rml_send_callback, NULL))) {
1675         ORTE_ERROR_LOG(ret);
1676         exit_status = ret;
1677         goto cleanup;
1678     }
1679 
1680     /* buffer should not be released here; the callback releases it */
1681     buffer = NULL;
1682 
1683  cleanup:
1684     if (NULL != buffer) {
1685         OBJ_RELEASE(buffer);
1686         buffer = NULL;
1687     }
1688 
1689     return exit_status;
1690 }
1691 
push_handle_info(orte_sstore_stage_local_snapshot_info_t * handle_info)1692 static int push_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info )
1693 {
1694     int ret, exit_status = ORTE_SUCCESS;
1695     opal_buffer_t *buffer = NULL;
1696     orte_sstore_stage_cmd_flag_t command;
1697     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1698     opal_list_item_t *item = NULL;
1699     size_t list_size;
1700 
1701     buffer = OBJ_NEW(opal_buffer_t);
1702 
1703     command = ORTE_SSTORE_STAGE_PUSH;
1704     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1705         ORTE_ERROR_LOG(ret);
1706         exit_status = ret;
1707         goto cleanup;
1708     }
1709 
1710     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1711         ORTE_ERROR_LOG(ret);
1712         exit_status = ret;
1713         goto cleanup;
1714     }
1715 
1716     list_size = opal_list_get_size(handle_info->app_info_handle);
1717     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &list_size, 1, OPAL_SIZE))) {
1718         ORTE_ERROR_LOG(ret);
1719         exit_status = ret;
1720         goto cleanup;
1721     }
1722 
1723     /*
1724      * For each process we are working with
1725      */
1726     for(item  = opal_list_get_first(handle_info->app_info_handle);
1727         item != opal_list_get_end(handle_info->app_info_handle);
1728         item  = opal_list_get_next(item) ) {
1729         app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1730 
1731         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->name), 1, ORTE_NAME))) {
1732             ORTE_ERROR_LOG(ret);
1733             exit_status = ret;
1734             goto cleanup;
1735         }
1736 
1737         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->ckpt_skipped), 1, OPAL_BOOL))) {
1738             ORTE_ERROR_LOG(ret);
1739             exit_status = ret;
1740             goto cleanup;
1741         }
1742 
1743         if( !app_info->ckpt_skipped ) {
1744             if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->crs_comp), 1, OPAL_STRING))) {
1745                 ORTE_ERROR_LOG(ret);
1746                 exit_status = ret;
1747                 goto cleanup;
1748             }
1749 
1750             if( orte_sstore_stage_enabled_compression ) {
1751                 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->compress_comp), 1, OPAL_STRING))) {
1752                     ORTE_ERROR_LOG(ret);
1753                     exit_status = ret;
1754                     goto cleanup;
1755                 }
1756                 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->compress_postfix), 1, OPAL_STRING))) {
1757                     ORTE_ERROR_LOG(ret);
1758                     exit_status = ret;
1759                     goto cleanup;
1760                 }
1761             }
1762         }
1763     }
1764 
1765     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1766                                                        orte_rml_send_callback, NULL))) {
1767         ORTE_ERROR_LOG(ret);
1768         exit_status = ret;
1769         goto cleanup;
1770     }
1771 
1772     /* buffer should not be released here; the callback releases it */
1773     buffer = NULL;
1774 
1775  cleanup:
1776     if (NULL != buffer) {
1777         OBJ_RELEASE(buffer);
1778         buffer = NULL;
1779     }
1780 
1781     return exit_status;
1782 }
1783 
sstore_stage_create_local_dir(void)1784 static int sstore_stage_create_local_dir(void)
1785 {
1786     int ret, exit_status = ORTE_SUCCESS;
1787     mode_t my_mode = S_IRWXU;
1788 
1789     if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(sstore_stage_local_basedir, my_mode)) ) {
1790         ORTE_ERROR_LOG(ret);
1791         exit_status = ret;
1792         goto cleanup;
1793     }
1794 
1795  cleanup:
1796     return exit_status;
1797 }
1798 
sstore_stage_destroy_local_dir(void)1799 static int sstore_stage_destroy_local_dir(void)
1800 {
1801     int ret, exit_status = ORTE_SUCCESS;
1802     char * basedir_root = NULL;
1803 
1804     asprintf(&basedir_root, "%s/%s",
1805              orte_sstore_stage_local_snapshot_dir,
1806              ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME);
1807 
1808     if(OPAL_SUCCESS != (ret = opal_os_dirpath_destroy(basedir_root, true, NULL)) ) {
1809         ORTE_ERROR_LOG(ret);
1810         exit_status = ret;
1811         goto cleanup;
1812     }
1813 
1814  cleanup:
1815     if( NULL != basedir_root ) {
1816         free(basedir_root);
1817         basedir_root = NULL;
1818     }
1819 
1820     return exit_status;
1821 }
1822 
sstore_stage_create_cache(void)1823 static int sstore_stage_create_cache(void)
1824 {
1825     int ret, exit_status = ORTE_SUCCESS;
1826     mode_t my_mode = S_IRWXU;
1827 
1828     /* Sanity check */
1829     if( !orte_sstore_stage_enabled_caching ) {
1830         goto cleanup;
1831     }
1832 
1833     if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(sstore_stage_cache_basedir, my_mode)) ) {
1834         ORTE_ERROR_LOG(ret);
1835         exit_status = ret;
1836         goto cleanup;
1837     }
1838 
1839  cleanup:
1840     return exit_status;
1841 }
1842 
sstore_stage_destroy_cache(void)1843 static int sstore_stage_destroy_cache(void)
1844 {
1845     int ret, exit_status = ORTE_SUCCESS;
1846 
1847     /* Sanity check */
1848     if( !orte_sstore_stage_enabled_caching ) {
1849         goto cleanup;
1850     }
1851 
1852     if(OPAL_SUCCESS != (ret = opal_os_dirpath_destroy(sstore_stage_cache_basedir, true, NULL)) ) {
1853         ORTE_ERROR_LOG(ret);
1854         exit_status = ret;
1855         goto cleanup;
1856     }
1857 
1858  cleanup:
1859     return exit_status;
1860 }
1861 
sstore_stage_update_cache(orte_sstore_stage_local_snapshot_info_t * handle_info)1862 static int sstore_stage_update_cache(orte_sstore_stage_local_snapshot_info_t *handle_info)
1863 {
1864     int ret, exit_status = ORTE_SUCCESS;
1865     char *cmd = NULL;
1866     mode_t my_mode = S_IRWXU;
1867     char *cache_dirname = NULL;
1868     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1869     opal_list_item_t* item = NULL;
1870     size_t list_size;
1871 
1872     /* Sanity Check */
1873     if( !orte_sstore_stage_enabled_caching || handle_info->migrating) {
1874         goto cleanup;
1875     }
1876 
1877     list_size = opal_list_get_size(handle_info->app_info_handle);
1878     if( 0 >= list_size ) {
1879         /* No processes on this node, skip */
1880         exit_status = ORTE_SUCCESS;
1881         goto cleanup;
1882     }
1883 
1884     app_info = (orte_sstore_stage_local_app_snapshot_info_t*)opal_list_get_first(handle_info->app_info_handle);
1885     if( NULL == app_info ) {
1886         ORTE_ERROR_LOG(ORTE_ERROR);
1887         exit_status = ORTE_ERROR;
1888         goto cleanup;
1889     }
1890 
1891     /*
1892      * Create the base cache directory
1893      */
1894     cache_dirname = opal_dirname(app_info->local_cache_location);
1895     if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(cache_dirname, my_mode)) ) {
1896         ORTE_ERROR_LOG(ret);
1897         exit_status = ret;
1898         goto cleanup;
1899     }
1900 
1901     /*
1902      * For each process, move the current checkpoint to the cache directory
1903      * Cached snapshots are always stored uncompressed.
1904      */
1905     for(item  = opal_list_get_first(handle_info->app_info_handle);
1906         item != opal_list_get_end(handle_info->app_info_handle);
1907         item  = opal_list_get_next(item) ) {
1908         app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1909 
1910         asprintf(&cmd, "mv %s %s", app_info->local_location, cache_dirname);
1911         OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1912                              "sstore:stage:(local): update_cache(): Caching snapshot for process %s [%s]",
1913                              ORTE_NAME_PRINT(&app_info->name),
1914                              cmd));
1915         system(cmd);
1916 
1917         /* (JJH) Remove the cached files */
1918         if( orte_sstore_stage_enabled_compression && NULL != app_info->compressed_local_location) {
1919             free(cmd);
1920             cmd = NULL;
1921 
1922             asprintf(&cmd, "rm -rf %s", app_info->compressed_local_location);
1923             OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1924                                  "sstore:stage:(local): update_cache(): Removing with command (%s)",
1925                                  cmd));
1926             system(cmd);
1927         }
1928     }
1929 
1930     /*
1931      * Remove the previous cached checkpoint
1932      */
1933     if( NULL != sstore_stage_cache_last_dir ) {
1934         asprintf(&cmd, "rm -rf %s", sstore_stage_cache_last_dir);
1935         OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1936                              "sstore:stage:(local): update_cache(): Removing old cache dir command (%s)",
1937                              sstore_stage_cache_last_dir));
1938         system(cmd);
1939     }
1940 
1941     /*
1942      * Update 'last' cache pointer
1943      */
1944     if( NULL != sstore_stage_cache_last_dir ) {
1945         free(sstore_stage_cache_last_dir);
1946         sstore_stage_cache_last_dir = NULL;
1947     }
1948     if( NULL != sstore_stage_cache_current_dir ) {
1949         sstore_stage_cache_last_dir    = strdup(sstore_stage_cache_current_dir);
1950     }
1951 
1952     /*
1953      * Update 'current' cache pointer
1954      */
1955     if( NULL != sstore_stage_cache_current_dir ) {
1956         free(sstore_stage_cache_current_dir);
1957         sstore_stage_cache_current_dir = NULL;
1958     }
1959     sstore_stage_cache_current_dir = strdup(cache_dirname);
1960 
1961     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1962                          "sstore:stage:(local): update_cache(): Cache Pointers cur(%s), last(%s)",
1963                          sstore_stage_cache_current_dir, sstore_stage_cache_last_dir));
1964 
1965  cleanup:
1966     if( NULL != cmd ) {
1967         free(cmd);
1968         cmd = NULL;
1969     }
1970 
1971     return exit_status;
1972 }
1973 
orte_sstore_stage_local_preload_files(char ** local_location,bool * skip_xfer,char * global_loc,char * ref,char * postfix,int seq)1974 static int orte_sstore_stage_local_preload_files(char **local_location, bool *skip_xfer,
1975                                                  char *global_loc, char *ref, char *postfix, int seq)
1976 {
1977     int ret, exit_status = ORTE_SUCCESS;
1978     mode_t my_mode = S_IRWXU;
1979     orte_filem_base_request_t *filem_request;
1980     orte_filem_base_process_set_t *p_set = NULL;
1981     orte_filem_base_file_set_t * f_set = NULL;
1982     char * full_local_location = NULL;
1983 
1984     *skip_xfer = false;
1985 
1986     if( NULL != *local_location) {
1987         free(*local_location);
1988         *local_location = NULL;
1989     }
1990 
1991     /*
1992      * If the global directory is shared, then just reference directly
1993      *
1994      * Skip this optimization if compressing. Since decompressing on the
1995      * central storage would typically require a transfer to the local
1996      * disk to decompress, then transfer back. Eliminating all benefits
1997      * of the optimization.
1998      */
1999     /* (JJH) If we are going to use the preloaded restart files for subsequent
2000      *       restarts then we actually always want to preload the files. This
2001      *       way if we need to restart from the same checkpoint again, then
2002      *       we can from the local restart cache.
2003      */
2004 #if 0
2005     if( orte_sstore_stage_global_is_shared &&
2006         (NULL == postfix || 0 >= strlen(postfix) ) ) {
2007         *local_location = strdup(global_loc);
2008         *skip_xfer = true;
2009         goto cleanup;
2010     }
2011 #endif
2012 
2013     asprintf(local_location, "%s/%s/%s/%d",
2014              orte_sstore_stage_local_snapshot_dir,
2015              ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
2016              ORTE_SSTORE_LOCAL_SNAPSHOT_RESTART_DIR_NAME,
2017              seq);
2018     asprintf(&full_local_location, "%s/%s",
2019              *local_location,
2020              ref);
2021 
2022     /*
2023      * If the snapshot already exists locally, just reuse it instead of
2024      * transfering it again.
2025      */
2026     if( 0 == (ret = access(full_local_location, F_OK)) ) {
2027         OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
2028                              "sstore:stage:(local): preload_files() Local snapshot already exists, reuse it (%s)",
2029                              full_local_location));
2030         *skip_xfer = true;
2031         goto cleanup;
2032     }
2033 
2034     /*
2035      * Create the local restart directory
2036      */
2037     if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(*local_location, my_mode)) ) {
2038         ORTE_ERROR_LOG(ret);
2039         exit_status = ret;
2040         goto cleanup;
2041     }
2042 
2043     /*
2044      * FileM request to move the checkpoint to the local directory
2045      */
2046     filem_request = OBJ_NEW(orte_filem_base_request_t);
2047 
2048     /* Define the process set */
2049     p_set = OBJ_NEW(orte_filem_base_process_set_t);
2050     if( ORTE_PROC_IS_HNP ) {
2051         /* if I am the HNP, then use me as the source */
2052         p_set->source.jobid = ORTE_PROC_MY_NAME->jobid;
2053         p_set->source.vpid  = ORTE_PROC_MY_NAME->vpid;
2054     }
2055     else {
2056         /* otherwise, set the HNP as the source */
2057         p_set->source.jobid = ORTE_PROC_MY_HNP->jobid;
2058         p_set->source.vpid  = ORTE_PROC_MY_HNP->vpid;
2059     }
2060     p_set->sink.jobid   = ORTE_PROC_MY_NAME->jobid;
2061     p_set->sink.vpid    = ORTE_PROC_MY_NAME->vpid;
2062     opal_list_append(&(filem_request->process_sets), &(p_set->super) );
2063 
2064     /* Define the file set */
2065     f_set = OBJ_NEW(orte_filem_base_file_set_t);
2066 
2067     f_set->local_target = strdup(*local_location);
2068     if( NULL != postfix && 0 < strlen(postfix) ) {
2069         asprintf(&(f_set->remote_target), "%s/%s%s",
2070                  global_loc,
2071                  ref,
2072                  postfix);
2073     } else {
2074         asprintf(&(f_set->remote_target), "%s/%s",
2075                  global_loc,
2076                  ref);
2077     }
2078     if( NULL != postfix && 0 < strlen(postfix) ) {
2079         f_set->target_flag = ORTE_FILEM_TYPE_FILE;
2080     } else {
2081         f_set->target_flag = ORTE_FILEM_TYPE_DIR;
2082     }
2083 
2084     opal_list_append(&(filem_request->file_sets), &(f_set->super) );
2085 
2086     /* Start getting the files */
2087     opal_list_append(preload_filem_requests, &(filem_request->super));
2088     if(ORTE_SUCCESS != (ret = orte_filem.get_nb(filem_request)) ) {
2089         ORTE_ERROR_LOG(ret);
2090         exit_status = ret;
2091         goto cleanup;
2092     }
2093 
2094  cleanup:
2095     if( NULL != full_local_location ) {
2096         free(full_local_location);
2097         full_local_location = NULL;
2098     }
2099 
2100     return exit_status;
2101 }
2102