1 /*
2  * Copyright (c) 2010      The Trustees of Indiana University.
3  *                         All rights reserved.
4  * Copyright (c) 2004-2011 The University of Tennessee and The University
5  *                         of Tennessee Research Foundation.  All rights
6  *                         reserved.
7  * Copyright (c) 2017      Research Organization for Information Science
8  *                         and Technology (RIST). All rights reserved.
9  * $COPYRIGHT$
10  *
11  * Additional copyrights may follow
12  *
13  * $HEADER$
14  */
15 
16 /*
17  *
18  */
19 
20 #include "orte_config.h"
21 
22 #include <string.h>
23 #include <stdlib.h>
24 #include <sys/types.h>
25 #include <sys/stat.h>
26 #include <sys/wait.h>
27 #ifdef HAVE_UNISTD_H
28 #include <unistd.h>
29 #endif  /* HAVE_UNISTD_H */
30 
31 #include "orte/mca/mca.h"
32 #include "opal/mca/base/base.h"
33 
34 #include "opal/mca/event/event.h"
35 
36 #include "orte/constants.h"
37 #include "orte/util/show_help.h"
38 #include "opal/util/argv.h"
39 #include "opal/util/output.h"
40 #include "opal/util/opal_environ.h"
41 #include "opal/util/basename.h"
42 #include "opal/util/os_dirpath.h"
43 
44 #include "opal/mca/compress/compress.h"
45 #include "opal/mca/compress/base/base.h"
46 
47 #include "opal/threads/mutex.h"
48 #include "opal/threads/condition.h"
49 
50 #include "orte/util/name_fns.h"
51 #include "orte/util/proc_info.h"
52 #include "orte/runtime/orte_globals.h"
53 #include "orte/runtime/orte_wait.h"
54 #include "orte/mca/errmgr/errmgr.h"
55 #include "orte/mca/rml/rml.h"
56 #include "orte/mca/rml/rml_types.h"
57 #include "orte/mca/odls/odls_types.h"
58 #include "orte/mca/filem/filem.h"
59 #include "orte/mca/filem/base/base.h"
60 
61 #include "orte/mca/sstore/sstore.h"
62 #include "orte/mca/sstore/base/base.h"
63 
64 #include "sstore_stage.h"
65 
66 /**********
67  * Object stuff
68  **********/
69 #define SSTORE_LOCAL_NONE   0
70 #define SSTORE_LOCAL_ERROR  1
71 #define SSTORE_LOCAL_INIT   2
72 #define SSTORE_LOCAL_READY  3
73 #define SSTORE_LOCAL_SYNCED 4
74 #define SSTORE_LOCAL_DONE   5
75 
76 struct  orte_sstore_stage_local_snapshot_info_t {
77     /** List super object */
78     opal_list_item_t super;
79 
80     /** */
81     orte_sstore_base_handle_t id;
82 
83     /** Status */
84     int status;
85 
86     /** Sequence Number */
87     int seq_num;
88 
89     /** Global Reference Name */
90     char * global_ref_name;
91 
92     /** Local Location Format String */
93     char * location_fmt;
94 
95     /** Local Cache Location Format String */
96     char * cache_location_fmt;
97 
98     /* Application info handles*/
99     opal_list_t *app_info_handle;
100 
101     /** Compress Component used */
102     char * compress_comp;
103 
104     /** Compress Component postfix */
105     char * compress_postfix;
106 
107     /** Is this checkpoint representing a migration? */
108     bool migrating;
109 };
110 typedef struct orte_sstore_stage_local_snapshot_info_t orte_sstore_stage_local_snapshot_info_t;
111 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_stage_local_snapshot_info_t);
112 
113 void orte_sstore_stage_local_snapshot_info_construct(orte_sstore_stage_local_snapshot_info_t *info);
114 void orte_sstore_stage_local_snapshot_info_destruct( orte_sstore_stage_local_snapshot_info_t *info);
115 
116 OBJ_CLASS_INSTANCE(orte_sstore_stage_local_snapshot_info_t,
117                    opal_list_item_t,
118                    orte_sstore_stage_local_snapshot_info_construct,
119                    orte_sstore_stage_local_snapshot_info_destruct);
120 
121 struct  orte_sstore_stage_local_app_snapshot_info_t {
122     /** List super object */
123     opal_list_item_t super;
124 
125     /** Process Name associated with this entry */
126     orte_process_name_t name;
127 
128     /** Local Location (Absolute Path) */
129     char * local_location;
130 
131     /** Compressed Local Location (Absolute Path) */
132     char * compressed_local_location;
133 
134     /** Local Cache Location (Absolute Path) */
135     char * local_cache_location;
136 
137     /** Metadata File Name (Absolute Path) */
138     char * metadata_filename;
139 
140     /** CRS Component used */
141     char * crs_comp;
142 
143     /** If this app. skipped the checkpoint - usually for non-migrating procs */
144     bool ckpt_skipped;
145 
146     /** Compression PID to wait on */
147     pid_t compress_pid;
148 };
149 typedef struct orte_sstore_stage_local_app_snapshot_info_t orte_sstore_stage_local_app_snapshot_info_t;
150 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_stage_local_app_snapshot_info_t);
151 
152 void orte_sstore_stage_local_app_snapshot_info_construct(orte_sstore_stage_local_app_snapshot_info_t *info);
153 void orte_sstore_stage_local_app_snapshot_info_destruct( orte_sstore_stage_local_app_snapshot_info_t *info);
154 
155 OBJ_CLASS_INSTANCE(orte_sstore_stage_local_app_snapshot_info_t,
156                    opal_list_item_t,
157                    orte_sstore_stage_local_app_snapshot_info_construct,
158                    orte_sstore_stage_local_app_snapshot_info_destruct);
159 
160 
161 
162 /**********
163  * Local Function and Variable Declarations
164  **********/
165 static bool is_global_listener_active = false;
166 static int sstore_stage_local_start_listener(void);
167 static int sstore_stage_local_stop_listener(void);
168 static void sstore_stage_local_recv(int status,
169                                       orte_process_name_t* sender,
170                                       opal_buffer_t* buffer,
171                                       orte_rml_tag_t tag,
172                                       void* cbdata);
173 
174 static int process_global_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
175 static int process_global_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
176 static int process_global_remove(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
177 static int process_app_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
178 static int process_app_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
179 
180 static orte_sstore_stage_local_snapshot_info_t *create_new_handle_info(orte_sstore_base_handle_t handle);
181 static orte_sstore_stage_local_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle);
182 static orte_sstore_stage_local_snapshot_info_t *find_handle_info_ref(char * ref, int seq);
183 
184 static int append_new_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
185                                       orte_process_name_t *name);
186 static orte_sstore_stage_local_app_snapshot_info_t *find_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
187                                                                            orte_process_name_t *name);
188 
189 static int pull_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info );
190 static int push_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info );
191 
192 static int wait_all_apps_updated(orte_sstore_stage_local_snapshot_info_t *handle_info);
193 
194 static int start_compression(orte_sstore_stage_local_snapshot_info_t *handle_info,
195                              orte_sstore_stage_local_app_snapshot_info_t *app_info);
196 static void sstore_stage_local_compress_waitpid_cb(orte_proc_t *proc, void* cbdata);
197 static int wait_all_compressed(orte_sstore_stage_local_snapshot_info_t *handle_info);
198 
199 static int orte_sstore_stage_local_preload_files(char **local_location, bool *skip_xfer,
200                                                  char *global_loc, char *ref, char *postfix, int seq);
201 
202 static int sstore_stage_create_local_dir(void);
203 static int sstore_stage_destroy_local_dir(void);
204 
205 static int sstore_stage_create_cache(void);
206 static int sstore_stage_update_cache(orte_sstore_stage_local_snapshot_info_t *handle_info);
207 static int sstore_stage_destroy_cache(void);
208 
209 static opal_list_t *active_handles = NULL;
210 static char * sstore_stage_local_basedir = NULL;
211 
212 static char * sstore_stage_cache_basedir = NULL;
213 
214 static char * sstore_stage_cache_current_dir = NULL;
215 static char * sstore_stage_cache_last_dir    = NULL;
216 
217 static opal_list_t * preload_filem_requests = NULL;
218 
219 /**********
220  * Object stuff
221  **********/
orte_sstore_stage_local_snapshot_info_construct(orte_sstore_stage_local_snapshot_info_t * info)222 void orte_sstore_stage_local_snapshot_info_construct(orte_sstore_stage_local_snapshot_info_t *info)
223 {
224     info->id      = 0;
225 
226     info->status = SSTORE_LOCAL_NONE;
227 
228     info->seq_num = -1;
229 
230     info->global_ref_name = NULL;
231 
232     info->location_fmt    = NULL;
233 
234     info->cache_location_fmt    = NULL;
235 
236     info->app_info_handle = OBJ_NEW(opal_list_t);
237 
238     info->compress_comp = NULL;
239 
240     info->compress_postfix = NULL;
241 
242     info->migrating = false;
243 }
244 
orte_sstore_stage_local_snapshot_info_destruct(orte_sstore_stage_local_snapshot_info_t * info)245 void orte_sstore_stage_local_snapshot_info_destruct( orte_sstore_stage_local_snapshot_info_t *info)
246 {
247     info->id      = 0;
248 
249     info->status = SSTORE_LOCAL_NONE;
250 
251     info->seq_num = -1;
252 
253     if( NULL != info->global_ref_name ) {
254         free( info->global_ref_name );
255         info->global_ref_name  = NULL;
256     }
257 
258     if( NULL != info->location_fmt ) {
259         free( info->location_fmt );
260         info->location_fmt = NULL;
261     }
262 
263     if( NULL != info->cache_location_fmt ) {
264         free( info->cache_location_fmt );
265         info->cache_location_fmt = NULL;
266     }
267 
268     if( NULL != info->app_info_handle ) {
269         OBJ_RELEASE(info->app_info_handle);
270         info->app_info_handle = NULL;
271     }
272 
273     if( NULL != info->compress_comp ) {
274         free(info->compress_comp);
275         info->compress_comp = NULL;
276     }
277 
278     if( NULL != info->compress_postfix ) {
279         free(info->compress_postfix);
280         info->compress_postfix = NULL;
281     }
282 
283     info->migrating = false;
284 }
285 
orte_sstore_stage_local_app_snapshot_info_construct(orte_sstore_stage_local_app_snapshot_info_t * info)286 void orte_sstore_stage_local_app_snapshot_info_construct(orte_sstore_stage_local_app_snapshot_info_t *info)
287 {
288     info->name.jobid = ORTE_JOBID_INVALID;
289     info->name.vpid  = ORTE_VPID_INVALID;
290 
291     info->local_location = NULL;
292     info->compressed_local_location = NULL;
293     info->local_cache_location = NULL;
294     info->metadata_filename = NULL;
295     info->crs_comp = NULL;
296     info->ckpt_skipped = false;
297     info->compress_pid = 0;
298 }
299 
orte_sstore_stage_local_app_snapshot_info_destruct(orte_sstore_stage_local_app_snapshot_info_t * info)300 void orte_sstore_stage_local_app_snapshot_info_destruct( orte_sstore_stage_local_app_snapshot_info_t *info)
301 {
302     info->name.jobid = ORTE_JOBID_INVALID;
303     info->name.vpid  = ORTE_VPID_INVALID;
304 
305     if( NULL != info->local_location ) {
306         free(info->local_location);
307         info->local_location = NULL;
308     }
309 
310     if( NULL != info->compressed_local_location ) {
311         free(info->compressed_local_location);
312         info->compressed_local_location = NULL;
313     }
314 
315     if( NULL != info->local_cache_location ) {
316         free(info->local_cache_location);
317         info->local_cache_location = NULL;
318     }
319 
320     if( NULL != info->metadata_filename ) {
321         free(info->metadata_filename);
322         info->metadata_filename = NULL;
323     }
324 
325     if( NULL != info->crs_comp ) {
326         free(info->crs_comp);
327         info->crs_comp = NULL;
328     }
329 
330     info->ckpt_skipped = false;
331 
332     info->compress_pid = 0;
333 }
334 
335 /******************
336  * Local functions
337  ******************/
orte_sstore_stage_local_module_init(void)338 int orte_sstore_stage_local_module_init(void)
339 {
340     int ret, exit_status = ORTE_SUCCESS;
341 
342     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
343                          "sstore:stage:(local): init()"));
344 
345     if( NULL == active_handles ) {
346         active_handles = OBJ_NEW(opal_list_t);
347     }
348 
349     if( NULL == preload_filem_requests ) {
350         preload_filem_requests = OBJ_NEW(opal_list_t);
351     }
352 
353     /*
354      * Create the local storage directory
355      */
356     asprintf(&sstore_stage_local_basedir, "%s/%s/%s",
357              orte_sstore_stage_local_snapshot_dir,
358              ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
359              ORTE_SSTORE_LOCAL_SNAPSHOT_STAGE_DIR_NAME);
360     if( ORTE_SUCCESS != (ret = sstore_stage_create_local_dir()) ) {
361         ORTE_ERROR_LOG(ret);
362         exit_status = ret;
363         goto cleanup;
364     }
365 
366     /*
367      * Create the local cache
368      */
369     if( orte_sstore_stage_enabled_caching ) {
370         asprintf(&sstore_stage_cache_basedir, "%s/%s/%s",
371                  orte_sstore_stage_local_snapshot_dir,
372                  ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
373                  ORTE_SSTORE_LOCAL_SNAPSHOT_CACHE_DIR_NAME);
374 
375         if( ORTE_SUCCESS != (ret = sstore_stage_create_cache()) ) {
376             ORTE_ERROR_LOG(ret);
377             exit_status = ret;
378             goto cleanup;
379         }
380     }
381 
382     /*
383      * Setup a listener for the HNP/Apps
384      * We could be the HNP, in which case the listener is already registered.
385      */
386     if( !ORTE_PROC_IS_HNP ) {
387         if( ORTE_SUCCESS != (ret = sstore_stage_local_start_listener()) ) {
388             ORTE_ERROR_LOG(ret);
389             exit_status = ret;
390             goto cleanup;
391         }
392     }
393 
394  cleanup:
395     return exit_status;
396 }
397 
orte_sstore_stage_local_module_finalize(void)398 int orte_sstore_stage_local_module_finalize(void)
399 {
400     int ret, exit_status = ORTE_SUCCESS;
401     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
402     opal_list_item_t* item = NULL;
403     bool done = false;
404     int cur_time = 0, max_time = 120;
405 
406     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
407                          "sstore:stage:(local): finalize()"));
408 
409     /*
410      * Wait for all active transfers to finish
411      */
412     if( !ORTE_PROC_IS_HNP ) {
413         done = false;
414         while( 0 < opal_list_get_size(active_handles) && !done ) {
415             done = true;
416             for(item  = opal_list_get_first(active_handles);
417                 item != opal_list_get_end(active_handles);
418                 item  = opal_list_get_next(item) ) {
419                 handle_info = (orte_sstore_stage_local_snapshot_info_t*)item;
420                 if( SSTORE_LOCAL_DONE  != handle_info->status &&
421                     SSTORE_LOCAL_NONE  != handle_info->status &&
422                     SSTORE_LOCAL_ERROR != handle_info->status ) {
423                     done = false;
424                     break;
425                 }
426             }
427             if( done ) {
428                 break;
429             }
430             else {
431                 if( cur_time != 0 && cur_time % 30 == 0 ) {
432                     opal_output(0, "---> Waiting for fin(): %3d / %3d\n",
433                                 cur_time, max_time);
434                 }
435 
436                 opal_progress();
437                 if( cur_time >= max_time ) {
438                     break;
439                 } else {
440                     sleep(1);
441                 }
442                 cur_time++;
443             }
444         }
445     }
446 
447     if( NULL != active_handles ) {
448         OBJ_RELEASE(active_handles);
449     }
450 
451     if( NULL != preload_filem_requests ) {
452         OBJ_RELEASE(preload_filem_requests);
453     }
454 
455     /*
456      * Shutdown the listener for the HNP/Apps
457      * We could be the HNP, in which case the listener is already deregistered.
458      */
459     if( !ORTE_PROC_IS_HNP ) {
460         if( ORTE_SUCCESS != (ret = sstore_stage_local_stop_listener()) ) {
461             ORTE_ERROR_LOG(ret);
462             exit_status = ret;
463             goto cleanup;
464         }
465     }
466 
467     /*
468      * Destroy the local cache
469      */
470     if( orte_sstore_stage_enabled_caching ) {
471         if( ORTE_SUCCESS != (ret = sstore_stage_destroy_cache()) ) {
472             ORTE_ERROR_LOG(ret);
473             exit_status = ret;
474             goto cleanup;
475         }
476     }
477 
478     /*
479      * Destroy the local storage directory
480      */
481     if( ORTE_SUCCESS != (ret = sstore_stage_destroy_local_dir()) ) {
482         ORTE_ERROR_LOG(ret);
483         exit_status = ret;
484         goto cleanup;
485     }
486 
487  cleanup:
488     if( orte_sstore_stage_enabled_caching ) {
489         if( NULL != sstore_stage_cache_basedir ) {
490             free(sstore_stage_cache_basedir);
491             sstore_stage_cache_basedir = NULL;
492         }
493     }
494 
495     if( NULL != sstore_stage_local_basedir ) {
496         free(sstore_stage_local_basedir);
497         sstore_stage_local_basedir = NULL;
498     }
499 
500     return exit_status;
501 }
502 
orte_sstore_stage_local_request_checkpoint_handle(orte_sstore_base_handle_t * handle,int seq,orte_jobid_t jobid)503 int orte_sstore_stage_local_request_checkpoint_handle(orte_sstore_base_handle_t *handle, int seq, orte_jobid_t jobid)
504 {
505     opal_output(0, "sstore:stage:(local): request_checkpoint_handle() Not implemented!");
506     return ORTE_ERR_NOT_IMPLEMENTED;
507 }
508 
orte_sstore_stage_local_register(orte_sstore_base_handle_t handle)509 int orte_sstore_stage_local_register(orte_sstore_base_handle_t handle)
510 {
511     int ret, exit_status = ORTE_SUCCESS;
512     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
513 
514     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
515                          "sstore:stage:(local): register()"));
516 
517     /*
518      * Create a handle
519      */
520     if( NULL == (handle_info = find_handle_info(handle)) ) {
521         handle_info = create_new_handle_info(handle);
522     }
523 
524     /*
525      * Get basic information from Global SStore
526      */
527     if( ORTE_SUCCESS != (ret = pull_handle_info(handle_info)) ) {
528         ORTE_ERROR_LOG(ret);
529         exit_status = ret;
530         goto cleanup;
531     }
532 
533     /*
534      * Wait here until the pull request has been satisfied
535      */
536     while(SSTORE_LOCAL_READY != handle_info->status &&
537           SSTORE_LOCAL_ERROR != handle_info->status ) {
538         opal_progress();
539     }
540 
541  cleanup:
542     return exit_status;
543 }
544 
orte_sstore_stage_local_get_attr(orte_sstore_base_handle_t handle,orte_sstore_base_key_t key,char ** value)545 int orte_sstore_stage_local_get_attr(orte_sstore_base_handle_t handle,  orte_sstore_base_key_t key, char **value)
546 {
547     opal_output(0, "sstore:stage:(local): get_attr() Not implemented!");
548     return ORTE_ERR_NOT_IMPLEMENTED;
549 }
550 
orte_sstore_stage_local_set_attr(orte_sstore_base_handle_t handle,orte_sstore_base_key_t key,char * value)551 int orte_sstore_stage_local_set_attr(orte_sstore_base_handle_t handle,  orte_sstore_base_key_t key, char *value)
552 {
553     opal_output(0, "sstore:stage:(local): set_attr() Not implemented!");
554     return ORTE_ERR_NOT_IMPLEMENTED;
555 }
556 
orte_sstore_stage_local_sync(orte_sstore_base_handle_t handle)557 int orte_sstore_stage_local_sync(orte_sstore_base_handle_t handle)
558 {
559     int ret, exit_status = ORTE_SUCCESS;
560     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
561 
562     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
563                          "sstore:stage:(local): sync()"));
564 
565     /*
566      * Lookup the handle
567      */
568     handle_info = find_handle_info(handle);
569 
570     /*
571      * Wait for all of the applications to update their metadata
572      */
573     if( ORTE_SUCCESS != (ret = wait_all_apps_updated(handle_info))) {
574         ORTE_ERROR_LOG(ret);
575         exit_status = ret;
576         goto cleanup;
577     }
578 
579     /*
580      * Wait for compression to finish
581      */
582     if( orte_sstore_stage_enabled_compression ) {
583         if( ORTE_SUCCESS != (ret = wait_all_compressed(handle_info))) {
584             ORTE_ERROR_LOG(ret);
585             exit_status = ret;
586             goto cleanup;
587         }
588     }
589 
590     /*
591      * Push information to the Global coordinator
592      */
593     if( ORTE_SUCCESS != (ret = push_handle_info(handle_info)) ) {
594         ORTE_ERROR_LOG(ret);
595         exit_status = ret;
596         goto cleanup;
597     }
598 
599     handle_info->status = SSTORE_LOCAL_SYNCED;
600 
601  cleanup:
602     return exit_status;
603 }
604 
orte_sstore_stage_local_remove(orte_sstore_base_handle_t handle)605 int orte_sstore_stage_local_remove(orte_sstore_base_handle_t handle)
606 {
607     opal_output(0, "sstore:stage:(local): remove() Not implemented!");
608     return ORTE_ERR_NOT_IMPLEMENTED;
609 }
610 
orte_sstore_stage_local_pack(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_base_handle_t handle)611 int orte_sstore_stage_local_pack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t handle)
612 {
613     int ret, exit_status = ORTE_SUCCESS;
614 
615     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
616                          "sstore:stage:(local): pack()"));
617 
618     /*
619      * Lookup the handle
620      */
621 
622 
623     /*
624      * Pack the handle ID
625      */
626     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &handle, 1, ORTE_SSTORE_HANDLE))) {
627         ORTE_ERROR_LOG(ret);
628         exit_status = ret;
629         goto cleanup;
630     }
631 
632     /*
633      * Pack any metadata
634      */
635 
636  cleanup:
637     return exit_status;
638 }
639 
orte_sstore_stage_local_unpack(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_base_handle_t * handle)640 int orte_sstore_stage_local_unpack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t *handle)
641 {
642     int ret, exit_status = ORTE_SUCCESS;
643     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
644     orte_std_cntr_t count;
645 
646     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
647                          "sstore:stage:(local): unpack()"));
648 
649     /*
650      * Unpack the handle id
651      */
652     count = 1;
653     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, handle, &count, ORTE_SSTORE_HANDLE))) {
654         ORTE_ERROR_LOG(ret);
655         exit_status = ret;
656         goto cleanup;
657     }
658 
659     /*
660      * Lookup the handle
661      */
662     if( NULL == (handle_info = find_handle_info(*handle)) ) {
663         handle_info = create_new_handle_info(*handle);
664     }
665 
666     /*
667      * Unpack the metadata piggybacked on this message
668      */
669     if( ORTE_SUCCESS != (ret = process_global_push(peer, buffer, handle_info))) {
670         ORTE_ERROR_LOG(ret);
671         exit_status = ret;
672         goto cleanup;
673     }
674 
675  cleanup:
676     return exit_status;
677 }
678 
orte_sstore_stage_local_fetch_app_deps(orte_app_context_t * app)679 int orte_sstore_stage_local_fetch_app_deps(orte_app_context_t *app)
680 {
681     int ret, exit_status = ORTE_SUCCESS;
682     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
683     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
684     char **sstore_args = NULL;
685     char * req_snap_loc = NULL;
686     char * req_snap_global_ref = NULL;
687     char * req_snap_ref = NULL;
688     char * req_snap_postfix = NULL;
689     char * local_location = NULL;
690     char * req_snap_compress = NULL;
691     char * compress_local_location = NULL;
692     char * compress_ref = NULL;
693     char * tmp_str = NULL;
694     int req_snap_seq = 0;
695     int i;
696     orte_proc_t *child = NULL;
697     int loc_argc = 0;
698     bool skip_xfer = false;
699     char *sload = NULL;
700 
701     orte_get_attribute(&app->attributes, ORTE_APP_SSTORE_LOAD, (void **)&sload, OPAL_STRING);
702 
703     if(!ORTE_FLAG_TEST(app, ORTE_APP_FLAG_USED_ON_NODE) || NULL == sload) {
704         OPAL_OUTPUT_VERBOSE((30, mca_sstore_stage_component.super.output_handle,
705                              "sstore:stage:(local): fetch_app_deps(%3d): Not for this daemon (%s, %d, %s)",
706                              app->idx, (ORTE_FLAG_TEST(app, ORTE_APP_FLAG_USED_ON_NODE) ? "T" : "F"),
707                              (int)app->num_procs, sload));
708         /* Nothing to do */
709         goto cleanup;
710     }
711 
712     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
713                          "sstore:stage:(local): fetch_app_deps(%3d): %s",
714                          app->idx, sload));
715 
716     /*
717      * Extract the 'ref:seq' parameter
718      */
719     sstore_args = opal_argv_split(sload, ':');
720     req_snap_loc        = strdup(sstore_args[0]);
721     req_snap_global_ref = strdup(sstore_args[1]);
722     req_snap_ref        = strdup(sstore_args[2]);
723     if( NULL == sstore_args[4] ) { /* Not compressed */
724         req_snap_seq        = atoi(  sstore_args[3]);
725     } else {
726         req_snap_compress   = strdup(sstore_args[3]);
727         req_snap_postfix    = strdup(sstore_args[4]);
728         req_snap_seq        = atoi(  sstore_args[5]);
729     }
730 
731     handle_info = find_handle_info_ref(req_snap_global_ref, req_snap_seq);
732     if( NULL == handle_info ) {
733         /* No checkpoints known, just preload the checkpoint */
734         OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
735                              "sstore:stage:(local): fetch_app_deps(%3d): No known checkpoint [%s, %d]",
736                              app->idx,
737                              req_snap_ref,
738                              req_snap_seq));
739         goto filem_preload;
740     }
741 
742     /*
743      * If caching enabled, then look to see if we have this snapshot cached
744      * Do not cache if migrating, since checkpoints taken while migrating are
745      * not guaranteed to be globally taken.
746      */
747     if( orte_sstore_stage_enabled_caching && !handle_info->migrating ) {
748         /*
749          * Find the process
750          */
751         for (i=0; i < orte_local_children->size; i++) {
752             if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
753                 continue;
754             }
755 
756             if( app->idx == child->app_idx ) {
757                 /*
758                  * Find the app snapshot ref
759                  */
760                 app_info = find_app_handle_info(handle_info, &child->name);
761                 break;
762             }
763         }
764 
765         if( NULL == app_info ) {
766             OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
767                                  "sstore:stage:(local): fetch_app_deps(%3d): No processes known for this app context",
768                                  app->idx));
769             goto filem_preload;
770         }
771 
772         /*
773          * Do we have a cached version of this file?
774          */
775         if( NULL != app_info->local_cache_location &&
776             0 == (ret = access(app_info->local_cache_location, F_OK)) ) {
777             OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
778                                  "sstore:stage:(local): fetch_app_deps(%3d): Using local cache. (%s)",
779                                  app->idx,
780                                  app_info->local_cache_location));
781 
782             opal_argv_append(&loc_argc, &(app->argv), "-c");
783             opal_argv_append(&loc_argc, &(app->argv), app_info->local_cache_location);
784             goto cleanup;
785         } else {
786             OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
787                                  "sstore:stage:(local): fetch_app_deps(%3d): No cache available for %s. (%s)",
788                                  app->idx,
789                                  ORTE_NAME_PRINT(&app_info->name),
790                                  app_info->local_cache_location));
791         }
792     }
793 
794  filem_preload:
795     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
796                          "sstore:stage:(local): fetch_app_deps(%3d): Fetch files from Central storage",
797                          app->idx));
798 
799     /*
800      * If we got here, then there is no cached directory, so just preload the
801      * files, update the argument set, and carry on.
802      */
803     if( ORTE_SUCCESS != (ret = orte_sstore_stage_local_preload_files(&local_location,
804                                                                      &skip_xfer,
805                                                                      req_snap_loc,
806                                                                      req_snap_ref,
807                                                                      req_snap_postfix,
808                                                                      req_snap_seq)) ) {
809         ORTE_ERROR_LOG(ret);
810         exit_status = ret;
811         goto cleanup;
812     }
813     opal_argv_append(&loc_argc, &(app->argv), "-l");
814     opal_argv_append(&loc_argc, &(app->argv), local_location);
815 
816     /*
817      * Decompress files:
818      *  opal-restart will do this for us on launch
819      */
820     if( !skip_xfer ) {
821         if( NULL != req_snap_compress && 0 < strlen(req_snap_compress) ) {
822             opal_argv_append(&loc_argc, &(app->argv), "-d");
823             opal_argv_append(&loc_argc, &(app->argv), req_snap_compress);
824         }
825         if( NULL != req_snap_postfix && 0 < strlen(req_snap_postfix) ) {
826             opal_argv_append(&loc_argc, &(app->argv), "-p");
827             opal_argv_append(&loc_argc, &(app->argv), req_snap_postfix);
828         }
829     }
830 
831     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
832                          "sstore:stage:(local): fetch_app_deps(%3d): Fetching to (%s)",
833                          app->idx,
834                          local_location));
835 
836  cleanup:
837     if( NULL != req_snap_compress ) {
838         free(req_snap_compress);
839         req_snap_compress = NULL;
840     }
841 
842     if( NULL != tmp_str ) {
843         free(tmp_str);
844         tmp_str = NULL;
845     }
846 
847     if( NULL != compress_local_location ) {
848         free(compress_local_location);
849         compress_local_location = NULL;
850     }
851 
852     if( NULL != compress_ref ) {
853         free(compress_ref);
854         compress_ref = NULL;
855     }
856 
857     if( NULL != sstore_args ) {
858         opal_argv_free(sstore_args);
859         sstore_args = NULL;
860     }
861 
862     if( NULL != req_snap_ref ) {
863         free(req_snap_ref);
864         req_snap_ref = NULL;
865     }
866 
867     if( NULL != req_snap_postfix ) {
868         free(req_snap_postfix);
869         req_snap_postfix = NULL;
870     }
871 
872     if( NULL != req_snap_loc ) {
873         free(req_snap_loc);
874         req_snap_loc = NULL;
875     }
876 
877     if( NULL != req_snap_global_ref ) {
878         free(req_snap_global_ref);
879         req_snap_global_ref = NULL;
880     }
881 
882     return exit_status;
883 }
884 
orte_sstore_stage_local_wait_all_deps(void)885 int orte_sstore_stage_local_wait_all_deps(void)
886 {
887     int ret, exit_status = ORTE_SUCCESS;
888     opal_list_item_t* item = NULL;
889 
890     /* Nothing being preloaded, so just move on */
891     if( 0 >= opal_list_get_size(preload_filem_requests) ) {
892         return ORTE_SUCCESS;
893     }
894 
895     /*
896      * Wait for all files to move
897      */
898     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
899                          "sstore:stage:(local): wait_all_deps(): Waiting on %d requests",
900                          (int)opal_list_get_size(preload_filem_requests)));
901 
902     if(ORTE_SUCCESS != (ret = orte_filem.wait_all(preload_filem_requests)) ) {
903         ORTE_ERROR_LOG(ret);
904         exit_status = ret;
905         goto cleanup;
906     }
907 
908     /*
909      * Cache the restart files locally, so we can restart faster next time
910      * JJH: We already check the restart directory for a local copy before
911      *      starting the transfer. So this feels unnecessary since the
912      *      restart directory is always used as a cache, whether or not
913      *      caching is enabled. The extra copy to the cache directory
914      *      does not buy us anything.
915      */
916 
917     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
918                          "sstore:stage:(local): wait_all_deps(): Finished waiting on %d requests!",
919                          (int)opal_list_get_size(preload_filem_requests)));
920 
921  cleanup:
922     while (NULL != (item = opal_list_remove_first(preload_filem_requests) ) ) {
923         OBJ_RELEASE(item);
924     }
925 
926     return exit_status;
927 }
928 
929 /**************************
930  * Local functions
931  **************************/
create_new_handle_info(orte_sstore_base_handle_t handle)932 static orte_sstore_stage_local_snapshot_info_t *create_new_handle_info(orte_sstore_base_handle_t handle)
933 {
934     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
935     int i;
936     orte_proc_t *child = NULL;
937 
938     if( NULL == active_handles ) {
939         active_handles = OBJ_NEW(opal_list_t);
940     }
941 
942     handle_info = OBJ_NEW(orte_sstore_stage_local_snapshot_info_t);
943 
944     handle_info->id = handle;
945 
946     opal_list_append(active_handles, &(handle_info->super));
947 
948     /*
949      * Create a sub structure for each child
950      */
951     for (i=0; i < orte_local_children->size; i++) {
952 	    if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
953             continue;
954         }
955         append_new_app_handle_info(handle_info, &child->name);
956     }
957 
958     handle_info->status = SSTORE_LOCAL_INIT;
959 
960     return handle_info;
961 }
962 
find_handle_info(orte_sstore_base_handle_t handle)963 static orte_sstore_stage_local_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle)
964 {
965     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
966     opal_list_item_t* item = NULL;
967 
968     if( NULL == active_handles ) {
969         return NULL;
970     }
971 
972     for(item  = opal_list_get_first(active_handles);
973         item != opal_list_get_end(active_handles);
974         item  = opal_list_get_next(item) ) {
975         handle_info = (orte_sstore_stage_local_snapshot_info_t*)item;
976 
977         if( handle_info->id == handle ) {
978             return handle_info;
979         }
980     }
981 
982     return NULL;
983 }
984 
find_handle_info_ref(char * ref,int seq)985 static orte_sstore_stage_local_snapshot_info_t *find_handle_info_ref(char * ref, int seq)
986 {
987     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
988     opal_list_item_t* item = NULL;
989 
990     if( NULL == active_handles ) {
991         return NULL;
992     }
993 
994     for(item  = opal_list_get_first(active_handles);
995         item != opal_list_get_end(active_handles);
996         item  = opal_list_get_next(item) ) {
997         handle_info = (orte_sstore_stage_local_snapshot_info_t*)item;
998 
999         if( 0 == strncmp(handle_info->global_ref_name, ref, strlen(ref)) &&
1000             handle_info->seq_num == seq ) {
1001             return handle_info;
1002         }
1003     }
1004 
1005     return NULL;
1006 }
1007 
append_new_app_handle_info(orte_sstore_stage_local_snapshot_info_t * handle_info,orte_process_name_t * name)1008 static int append_new_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
1009                                       orte_process_name_t *name)
1010 {
1011     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1012 
1013     app_info = OBJ_NEW(orte_sstore_stage_local_app_snapshot_info_t);
1014 
1015     app_info->name.jobid = name->jobid;
1016     app_info->name.vpid  = name->vpid;
1017 
1018     opal_list_append(handle_info->app_info_handle, &(app_info->super));
1019 
1020     return ORTE_SUCCESS;
1021 }
1022 
find_app_handle_info(orte_sstore_stage_local_snapshot_info_t * handle_info,orte_process_name_t * name)1023 static orte_sstore_stage_local_app_snapshot_info_t *find_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
1024                                                                            orte_process_name_t *name)
1025 {
1026     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1027     opal_list_item_t* item = NULL;
1028     orte_ns_cmp_bitmask_t mask;
1029 
1030     for(item  = opal_list_get_first(handle_info->app_info_handle);
1031         item != opal_list_get_end(handle_info->app_info_handle);
1032         item  = opal_list_get_next(item) ) {
1033         app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1034 
1035         mask = ORTE_NS_CMP_ALL;
1036 
1037         if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &app_info->name, name)) {
1038             return app_info;
1039         }
1040     }
1041 
1042     return NULL;
1043 }
1044 
sstore_stage_local_start_listener(void)1045 static int sstore_stage_local_start_listener(void)
1046 {
1047     if( is_global_listener_active ) {
1048         return ORTE_SUCCESS;
1049     }
1050 
1051     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL,
1052                             ORTE_RML_PERSISTENT, sstore_stage_local_recv, NULL);
1053 
1054     is_global_listener_active = true;
1055     return ORTE_SUCCESS;
1056 }
1057 
sstore_stage_local_stop_listener(void)1058 static int sstore_stage_local_stop_listener(void)
1059 {
1060     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL);
1061     is_global_listener_active = false;
1062     return ORTE_SUCCESS;
1063 }
1064 
sstore_stage_local_recv(int status,orte_process_name_t * sender,opal_buffer_t * buffer,orte_rml_tag_t tag,void * cbdata)1065 static void sstore_stage_local_recv(int status,
1066                                     orte_process_name_t* sender,
1067                                     opal_buffer_t* buffer,
1068                                     orte_rml_tag_t tag,
1069                                     void* cbdata)
1070 {
1071     int ret;
1072     orte_sstore_stage_cmd_flag_t command;
1073     orte_std_cntr_t count;
1074     orte_sstore_base_handle_t loc_id;
1075 
1076     if( ORTE_RML_TAG_SSTORE_INTERNAL != tag ) {
1077         return;
1078     }
1079 
1080     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1081                          "sstore:stage:(local): process_cmd(%s)",
1082                          ORTE_NAME_PRINT(sender)));
1083 
1084     count = 1;
1085     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_STAGE_CMD))) {
1086         ORTE_ERROR_LOG(ret);
1087         goto cleanup;
1088     }
1089 
1090     count = 1;
1091     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE )) ) {
1092         ORTE_ERROR_LOG(ret);
1093         goto cleanup;
1094     }
1095 
1096     orte_sstore_stage_local_process_cmd_action(sender, command, loc_id, buffer);
1097 
1098  cleanup:
1099     return;
1100 }
1101 
orte_sstore_stage_local_process_cmd_action(orte_process_name_t * sender,orte_sstore_stage_cmd_flag_t command,orte_sstore_base_handle_t loc_id,opal_buffer_t * buffer)1102 int orte_sstore_stage_local_process_cmd_action(orte_process_name_t *sender,
1103                                                orte_sstore_stage_cmd_flag_t command,
1104                                                orte_sstore_base_handle_t loc_id,
1105                                                opal_buffer_t* buffer)
1106 {
1107     orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
1108 
1109     /*
1110      * Find the referenced handle (Create if it does not exist)
1111      */
1112     if(NULL == (handle_info = find_handle_info(loc_id)) ) {
1113         handle_info = create_new_handle_info(loc_id);
1114     }
1115 
1116     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1117                          "sstore:stage:(local): process_cmd(%s) - Command = %s",
1118                          ORTE_NAME_PRINT(sender),
1119                          (ORTE_SSTORE_STAGE_PULL == command ? "Pull" :
1120                           (ORTE_SSTORE_STAGE_PUSH == command ? "Push" :
1121                            (ORTE_SSTORE_STAGE_REMOVE == command ? "Remove" :
1122                             (ORTE_SSTORE_STAGE_DONE == command ? "Done" : "Unknown")))) ));
1123 
1124     /*
1125      * Process the command
1126      */
1127     if( ORTE_SSTORE_STAGE_PULL == command ) {
1128         if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_HNP, sender)) {
1129             process_global_pull(sender, buffer, handle_info);
1130         } else {
1131             process_app_pull(sender, buffer, handle_info);
1132         }
1133     }
1134     else if( ORTE_SSTORE_STAGE_PUSH == command ) {
1135         if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_HNP, sender)) {
1136             process_global_push(sender, buffer, handle_info);
1137         } else {
1138             process_app_push(sender, buffer, handle_info);
1139         }
1140     }
1141     else if( ORTE_SSTORE_STAGE_REMOVE == command ) {
1142         /* The xcast from the root makes the 'sender' equal to this process :/
1143          * so we know it is the HNP, so just use that name */
1144         process_global_remove(ORTE_PROC_MY_HNP, buffer, handle_info);
1145     }
1146 
1147     return ORTE_SUCCESS;
1148 }
1149 
process_global_pull(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_local_snapshot_info_t * handle_info)1150 static int process_global_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1151 {
1152     /* JJH should be as simple as calling push_handle_info() */
1153     opal_output(0, "sstore:stage:(local): process_global_pull() Not implemented!");
1154     return ORTE_ERR_NOT_IMPLEMENTED;
1155 }
1156 
process_global_push(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_local_snapshot_info_t * handle_info)1157 static int process_global_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1158 {
1159     int ret, exit_status = ORTE_SUCCESS;
1160     orte_std_cntr_t count;
1161     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1162     opal_list_item_t* item = NULL;
1163 
1164     count = 1;
1165     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->seq_num), &count, OPAL_INT))) {
1166         ORTE_ERROR_LOG(ret);
1167         exit_status = ret;
1168         goto cleanup;
1169     }
1170 
1171     count = 1;
1172     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
1173         ORTE_ERROR_LOG(ret);
1174         exit_status = ret;
1175         goto cleanup;
1176     }
1177 
1178     count = 1;
1179     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->location_fmt), &count, OPAL_STRING))) {
1180         ORTE_ERROR_LOG(ret);
1181         exit_status = ret;
1182         goto cleanup;
1183     }
1184 
1185     if( orte_sstore_stage_enabled_caching ) {
1186         count = 1;
1187         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->cache_location_fmt), &count, OPAL_STRING))) {
1188             ORTE_ERROR_LOG(ret);
1189             exit_status = ret;
1190             goto cleanup;
1191         }
1192     }
1193 
1194     count = 1;
1195     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->migrating), &count, OPAL_BOOL))) {
1196         ORTE_ERROR_LOG(ret);
1197         exit_status = ret;
1198         goto cleanup;
1199     }
1200 
1201     /*
1202      * For each process we are working with
1203      */
1204     for(item  = opal_list_get_first(handle_info->app_info_handle);
1205         item != opal_list_get_end(handle_info->app_info_handle);
1206         item  = opal_list_get_next(item) ) {
1207         app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1208 
1209         if( NULL != app_info->local_location ) {
1210             free(app_info->local_location);
1211             app_info->local_location = NULL;
1212         }
1213         asprintf(&(app_info->local_location), handle_info->location_fmt, app_info->name.vpid);
1214 
1215         if( orte_sstore_stage_enabled_caching ) {
1216             if( NULL != app_info->local_cache_location ) {
1217                 free(app_info->local_cache_location);
1218                 app_info->local_cache_location = NULL;
1219             }
1220             asprintf(&(app_info->local_cache_location), handle_info->cache_location_fmt, app_info->name.vpid);
1221         }
1222 
1223         if( NULL != app_info->metadata_filename ) {
1224             free(app_info->metadata_filename);
1225             app_info->metadata_filename = NULL;
1226         }
1227         asprintf(&(app_info->metadata_filename), "%s/%s",
1228                  app_info->local_location,
1229                  orte_sstore_base_local_metadata_filename);
1230     }
1231 
1232  cleanup:
1233     if( ORTE_SUCCESS == exit_status ) {
1234         handle_info->status = SSTORE_LOCAL_READY;
1235     } else {
1236         handle_info->status = SSTORE_LOCAL_ERROR;
1237     }
1238 
1239     return exit_status;
1240 }
1241 
process_global_remove(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_local_snapshot_info_t * handle_info)1242 static int process_global_remove(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1243 {
1244     int ret, exit_status = ORTE_SUCCESS;
1245     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1246     opal_list_item_t* item = NULL;
1247     opal_buffer_t *loc_buffer = NULL;
1248     orte_sstore_stage_cmd_flag_t command;
1249     size_t list_size;
1250     char * cmd = NULL;
1251 
1252     /*
1253      * If not caching, then just remove the local copy
1254      * Or if migrating, since we do not cache checkpoints generated while
1255      * migrating.
1256      */
1257     if( !orte_sstore_stage_enabled_caching || handle_info->migrating ) {
1258         for(item  = opal_list_get_first(handle_info->app_info_handle);
1259             item != opal_list_get_end(handle_info->app_info_handle);
1260             item  = opal_list_get_next(item) ) {
1261             app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1262 
1263             asprintf(&cmd, "rm -rf %s", app_info->local_location);
1264             OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1265                                  "sstore:stage:(local): update_cache(): Removing with command (%s)",
1266                                  cmd));
1267             system(cmd);
1268 
1269             if( orte_sstore_stage_enabled_compression && NULL != app_info->compressed_local_location) {
1270                 free(cmd);
1271                 cmd = NULL;
1272 
1273                 asprintf(&cmd, "rm -rf %s", app_info->compressed_local_location);
1274                 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1275                                      "sstore:stage:(local): update_cache(): Removing with command (%s)",
1276                                      cmd));
1277                 system(cmd);
1278             }
1279         }
1280     }
1281     else {
1282           /*
1283            * Update the local cache
1284            */
1285           if( ORTE_SUCCESS != (ret = sstore_stage_update_cache(handle_info)) ) {
1286               ORTE_ERROR_LOG(ret);
1287               exit_status = ret;
1288               goto cleanup;
1289           }
1290     }
1291 
1292     loc_buffer = OBJ_NEW(opal_buffer_t);
1293 
1294     command = ORTE_SSTORE_STAGE_DONE;
1295     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1296         ORTE_ERROR_LOG(ret);
1297         exit_status = ret;
1298         goto cleanup;
1299     }
1300 
1301     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1302         ORTE_ERROR_LOG(ret);
1303         exit_status = ret;
1304         goto cleanup;
1305     }
1306 
1307     list_size = opal_list_get_size(handle_info->app_info_handle);
1308     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &list_size, 1, OPAL_SIZE))) {
1309         ORTE_ERROR_LOG(ret);
1310         exit_status = ret;
1311         goto cleanup;
1312     }
1313 
1314     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1315                                                        orte_rml_send_callback, NULL))) {
1316         ORTE_ERROR_LOG(ret);
1317         exit_status = ret;
1318         goto cleanup;
1319     }
1320 
1321     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1322                          "sstore:stage:(local): remove(): Sent done for %d files to %s",
1323                          (int)list_size,
1324                          ORTE_NAME_PRINT(peer)));
1325 
1326     handle_info->status = SSTORE_LOCAL_DONE;
1327     /* loc_buffer should not be released here; the callback releases it */
1328     loc_buffer = NULL;
1329 
1330  cleanup:
1331     if( NULL != cmd ) {
1332         free(cmd);
1333         cmd = NULL;
1334     }
1335 
1336     if (NULL != loc_buffer) {
1337         OBJ_RELEASE(loc_buffer);
1338         loc_buffer = NULL;
1339     }
1340 
1341     return exit_status;
1342 }
1343 
process_app_pull(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_local_snapshot_info_t * handle_info)1344 static int process_app_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1345 {
1346     int ret, exit_status = ORTE_SUCCESS;
1347     opal_buffer_t *loc_buffer = NULL;
1348     orte_sstore_stage_cmd_flag_t command;
1349     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1350 
1351     /*
1352      * Find this app's data
1353      */
1354     app_info = find_app_handle_info(handle_info, peer);
1355 
1356     /*
1357      * Push back the requested information
1358      */
1359     loc_buffer = OBJ_NEW(opal_buffer_t);
1360 
1361     command = ORTE_SSTORE_STAGE_PUSH;
1362     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1363         ORTE_ERROR_LOG(ret);
1364         exit_status = ret;
1365         goto cleanup;
1366     }
1367 
1368     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1369         ORTE_ERROR_LOG(ret);
1370         exit_status = ret;
1371         goto cleanup;
1372     }
1373 
1374     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->seq_num), 1, OPAL_INT))) {
1375         ORTE_ERROR_LOG(ret);
1376         exit_status = ret;
1377         goto cleanup;
1378     }
1379 
1380     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->global_ref_name), 1, OPAL_STRING))) {
1381         ORTE_ERROR_LOG(ret);
1382         exit_status = ret;
1383         goto cleanup;
1384     }
1385 
1386     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(app_info->local_location), 1, OPAL_STRING))) {
1387         ORTE_ERROR_LOG(ret);
1388         exit_status = ret;
1389         goto cleanup;
1390     }
1391 
1392     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(app_info->metadata_filename), 1, OPAL_STRING))) {
1393         ORTE_ERROR_LOG(ret);
1394         exit_status = ret;
1395         goto cleanup;
1396     }
1397 
1398     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1399                                                        orte_rml_send_callback, NULL))) {
1400         ORTE_ERROR_LOG(ret);
1401         exit_status = ret;
1402         goto cleanup;
1403     }
1404 
1405     /* loc_buffer should not be released here; the callback releases it */
1406     loc_buffer = NULL;
1407 
1408  cleanup:
1409     if (NULL != loc_buffer) {
1410         OBJ_RELEASE(loc_buffer);
1411         loc_buffer = NULL;
1412     }
1413 
1414     return exit_status;
1415 }
1416 
process_app_push(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_local_snapshot_info_t * handle_info)1417 static int process_app_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1418 {
1419     int ret, exit_status = ORTE_SUCCESS;
1420     orte_std_cntr_t count;
1421     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1422 
1423     /*
1424      * Find this app's data
1425      */
1426     app_info = find_app_handle_info(handle_info, peer);
1427 
1428     /*
1429      * Unpack the data
1430      */
1431     count = 1;
1432     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(app_info->ckpt_skipped), &count, OPAL_BOOL))) {
1433         ORTE_ERROR_LOG(ret);
1434         exit_status = ret;
1435         goto cleanup;
1436     }
1437 
1438     if( !app_info->ckpt_skipped ) {
1439         count = 1;
1440         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(app_info->crs_comp), &count, OPAL_STRING))) {
1441             ORTE_ERROR_LOG(ret);
1442             exit_status = ret;
1443             goto cleanup;
1444         }
1445     }
1446 
1447     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1448                          "sstore:stage:(local): app_push(%s, skip=%s, %s)",
1449                          ORTE_NAME_PRINT(&(app_info->name)),
1450                          (app_info->ckpt_skipped ? "T" : "F"),
1451                          app_info->crs_comp));
1452 
1453     /* Compression started on sync() */
1454 
1455  cleanup:
1456     return exit_status;
1457 }
1458 
wait_all_apps_updated(orte_sstore_stage_local_snapshot_info_t * handle_info)1459 static int wait_all_apps_updated(orte_sstore_stage_local_snapshot_info_t *handle_info)
1460 {
1461     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1462     opal_list_item_t *item = NULL;
1463     bool is_done = true;
1464 
1465     do {
1466         is_done = true;
1467         for(item  = opal_list_get_first(handle_info->app_info_handle);
1468             item != opal_list_get_end(handle_info->app_info_handle);
1469             item  = opal_list_get_next(item) ) {
1470             app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1471 
1472             if( NULL == app_info->crs_comp && !app_info->ckpt_skipped ) {
1473                 is_done = false;
1474                 break;
1475             }
1476         }
1477 
1478         if( !is_done ) {
1479             OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1480                                  "sstore:stage:(local): Waiting for appliccation %s",
1481                                  ORTE_NAME_PRINT(&(app_info->name)) ));
1482             opal_progress();
1483         }
1484     } while(!is_done);
1485 
1486     return ORTE_SUCCESS;
1487 }
1488 
start_compression(orte_sstore_stage_local_snapshot_info_t * handle_info,orte_sstore_stage_local_app_snapshot_info_t * app_info)1489 static int start_compression(orte_sstore_stage_local_snapshot_info_t *handle_info,
1490                              orte_sstore_stage_local_app_snapshot_info_t *app_info)
1491 {
1492     int ret, exit_status = ORTE_SUCCESS;
1493     char * postfix = NULL;
1494     orte_proc_t *proc;
1495 
1496     /* Sanity Check */
1497     if( !orte_sstore_stage_enabled_compression ) {
1498         goto cleanup;
1499     }
1500 
1501     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1502                          "sstore:stage:(local): start_compression() Starting compression for process %s of (%s)",
1503                          ORTE_NAME_PRINT(&(app_info->name)),
1504                          app_info->local_location ));
1505 
1506     /*
1507      * Start compression (nonblocking)
1508      */
1509     if( ORTE_SUCCESS != (ret = opal_compress.compress_nb(app_info->local_location,
1510                                                          &(app_info->compressed_local_location),
1511                                                          &(postfix),
1512                                                          &(app_info->compress_pid))) ) {
1513         ORTE_ERROR_LOG(ret);
1514         exit_status = ret;
1515         goto cleanup;
1516     }
1517     if( app_info->compress_pid <= 0 ) {
1518         ORTE_ERROR_LOG(ORTE_ERROR);
1519         exit_status = ORTE_ERROR;
1520         goto cleanup;
1521     }
1522 
1523     if( NULL == handle_info->compress_comp ) {
1524         handle_info->compress_comp = strdup(opal_compress_base_selected_component.base_version.mca_component_name);
1525         handle_info->compress_postfix = strdup(postfix);
1526     }
1527 
1528     /*
1529      * Setup a callback for when it is finished
1530      */
1531     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1532                          "sstore:stage:(local): start_compression() Waiting for compression (%d) for process %s",
1533                          app_info->compress_pid,
1534                          ORTE_NAME_PRINT(&(app_info->name)) ));
1535 
1536     proc = OBJ_NEW(orte_proc_t);
1537     proc->pid = app_info->compress_pid;
1538     /* be sure to mark it as alive so we don't instantly fire */
1539     ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_ALIVE);
1540 
1541     orte_wait_cb(proc, sstore_stage_local_compress_waitpid_cb, app_info);
1542 
1543  cleanup:
1544     if( NULL != postfix ) {
1545         free(postfix);
1546         postfix = NULL;
1547     }
1548 
1549     return exit_status;
1550 }
1551 
sstore_stage_local_compress_waitpid_cb(orte_proc_t * proc,void * cbdata)1552 static void sstore_stage_local_compress_waitpid_cb(orte_proc_t *proc, void* cbdata)
1553 {
1554     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1555     orte_wait_tracker_t *t2 = (orte_wait_tracker_t *)cbdata;
1556 
1557     app_info = (orte_sstore_stage_local_app_snapshot_info_t*)t2->cbdata;
1558 
1559     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1560                          "sstore:stage:(local): waitpid(%6d) Compression finished for Process %s",
1561                          (int)proc->pid,
1562                          ORTE_NAME_PRINT(&(app_info->name)) ));
1563 
1564     app_info->compress_pid = 0;
1565     OBJ_RELEASE(proc);
1566     OBJ_RELEASE(t2);
1567 }
1568 
wait_all_compressed(orte_sstore_stage_local_snapshot_info_t * handle_info)1569 static int wait_all_compressed(orte_sstore_stage_local_snapshot_info_t *handle_info)
1570 {
1571     int ret, exit_status = ORTE_SUCCESS;
1572     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1573     opal_list_item_t *item = NULL;
1574     bool is_done = true;
1575     int usleep_time = 1000;
1576     int s_time = 0, max_wait_time;
1577 
1578     /* Sanity Check */
1579     if( !orte_sstore_stage_enabled_compression ) {
1580         return ORTE_SUCCESS;
1581     }
1582 
1583     /*
1584      * Start all compression
1585      */
1586     if( orte_sstore_stage_compress_delay > 0 ) {
1587         OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1588                              "sstore:stage:(local): Delaying %d second before starting compression...",
1589                              orte_sstore_stage_compress_delay));
1590         max_wait_time = orte_sstore_stage_compress_delay * (1000000/usleep_time);
1591         for( s_time = 0; s_time < max_wait_time; ++s_time) {
1592             opal_progress();
1593             usleep(1000);
1594         }
1595     }
1596 
1597     for(item  = opal_list_get_first(handle_info->app_info_handle);
1598         item != opal_list_get_end(handle_info->app_info_handle);
1599         item  = opal_list_get_next(item) ) {
1600         app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1601 
1602         if( ORTE_SUCCESS != (ret = start_compression(handle_info, app_info)) ) {
1603             ORTE_ERROR_LOG(ret);
1604             exit_status = ret;
1605             goto cleanup;
1606         }
1607     }
1608 
1609     /*
1610      * Wait for compression to finish
1611      */
1612     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1613                          "sstore:stage:(local): Waiting for compression to finish..."));
1614     do {
1615         is_done = true;
1616         for(item  = opal_list_get_first(handle_info->app_info_handle);
1617             item != opal_list_get_end(handle_info->app_info_handle);
1618             item  = opal_list_get_next(item) ) {
1619             app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1620 
1621             if( 0 < app_info->compress_pid ) {
1622                 is_done = false;
1623                 break;
1624             }
1625         }
1626 
1627         if( !is_done ) {
1628             OPAL_OUTPUT_VERBOSE((30, mca_sstore_stage_component.super.output_handle,
1629                                  "sstore:stage:(local): Waiting for compression to finish for appliccation %s",
1630                                  ORTE_NAME_PRINT(&(app_info->name)) ));
1631             opal_progress();
1632         }
1633     } while(!is_done);
1634 
1635     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1636                          "sstore:stage:(local): Compression finished!"));
1637  cleanup:
1638     return exit_status;
1639 }
1640 
pull_handle_info(orte_sstore_stage_local_snapshot_info_t * handle_info)1641 static int pull_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info )
1642 {
1643     int ret, exit_status = ORTE_SUCCESS;
1644     opal_buffer_t *buffer = NULL;
1645     orte_sstore_stage_cmd_flag_t command;
1646 
1647     /*
1648      * Check to see if this is necessary
1649      * (Did we get all of the info from the handle unpack?)
1650      */
1651     if( 0 <= handle_info->seq_num &&
1652         NULL != handle_info->global_ref_name &&
1653         NULL != handle_info->location_fmt ) {
1654         handle_info->status = SSTORE_LOCAL_READY;
1655         return ORTE_SUCCESS;
1656     }
1657 
1658     buffer = OBJ_NEW(opal_buffer_t);
1659 
1660     /*
1661      * Ask the daemon to send us the info that we need
1662      */
1663     command = ORTE_SSTORE_STAGE_PULL;
1664     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1665         ORTE_ERROR_LOG(ret);
1666         exit_status = ret;
1667         goto cleanup;
1668     }
1669 
1670     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1671         ORTE_ERROR_LOG(ret);
1672         exit_status = ret;
1673         goto cleanup;
1674     }
1675 
1676     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
1677                                                        ORTE_RML_TAG_SSTORE_INTERNAL,
1678                                                        orte_rml_send_callback, NULL))) {
1679         ORTE_ERROR_LOG(ret);
1680         exit_status = ret;
1681         goto cleanup;
1682     }
1683 
1684     /* buffer should not be released here; the callback releases it */
1685     buffer = NULL;
1686 
1687  cleanup:
1688     if (NULL != buffer) {
1689         OBJ_RELEASE(buffer);
1690         buffer = NULL;
1691     }
1692 
1693     return exit_status;
1694 }
1695 
push_handle_info(orte_sstore_stage_local_snapshot_info_t * handle_info)1696 static int push_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info )
1697 {
1698     int ret, exit_status = ORTE_SUCCESS;
1699     opal_buffer_t *buffer = NULL;
1700     orte_sstore_stage_cmd_flag_t command;
1701     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1702     opal_list_item_t *item = NULL;
1703     size_t list_size;
1704 
1705     buffer = OBJ_NEW(opal_buffer_t);
1706 
1707     command = ORTE_SSTORE_STAGE_PUSH;
1708     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1709         ORTE_ERROR_LOG(ret);
1710         exit_status = ret;
1711         goto cleanup;
1712     }
1713 
1714     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1715         ORTE_ERROR_LOG(ret);
1716         exit_status = ret;
1717         goto cleanup;
1718     }
1719 
1720     list_size = opal_list_get_size(handle_info->app_info_handle);
1721     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &list_size, 1, OPAL_SIZE))) {
1722         ORTE_ERROR_LOG(ret);
1723         exit_status = ret;
1724         goto cleanup;
1725     }
1726 
1727     /*
1728      * For each process we are working with
1729      */
1730     for(item  = opal_list_get_first(handle_info->app_info_handle);
1731         item != opal_list_get_end(handle_info->app_info_handle);
1732         item  = opal_list_get_next(item) ) {
1733         app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1734 
1735         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->name), 1, ORTE_NAME))) {
1736             ORTE_ERROR_LOG(ret);
1737             exit_status = ret;
1738             goto cleanup;
1739         }
1740 
1741         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->ckpt_skipped), 1, OPAL_BOOL))) {
1742             ORTE_ERROR_LOG(ret);
1743             exit_status = ret;
1744             goto cleanup;
1745         }
1746 
1747         if( !app_info->ckpt_skipped ) {
1748             if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->crs_comp), 1, OPAL_STRING))) {
1749                 ORTE_ERROR_LOG(ret);
1750                 exit_status = ret;
1751                 goto cleanup;
1752             }
1753 
1754             if( orte_sstore_stage_enabled_compression ) {
1755                 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->compress_comp), 1, OPAL_STRING))) {
1756                     ORTE_ERROR_LOG(ret);
1757                     exit_status = ret;
1758                     goto cleanup;
1759                 }
1760                 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->compress_postfix), 1, OPAL_STRING))) {
1761                     ORTE_ERROR_LOG(ret);
1762                     exit_status = ret;
1763                     goto cleanup;
1764                 }
1765             }
1766         }
1767     }
1768 
1769     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1770                                                        orte_rml_send_callback, NULL))) {
1771         ORTE_ERROR_LOG(ret);
1772         exit_status = ret;
1773         goto cleanup;
1774     }
1775 
1776     /* buffer should not be released here; the callback releases it */
1777     buffer = NULL;
1778 
1779  cleanup:
1780     if (NULL != buffer) {
1781         OBJ_RELEASE(buffer);
1782         buffer = NULL;
1783     }
1784 
1785     return exit_status;
1786 }
1787 
sstore_stage_create_local_dir(void)1788 static int sstore_stage_create_local_dir(void)
1789 {
1790     int ret, exit_status = ORTE_SUCCESS;
1791     mode_t my_mode = S_IRWXU;
1792 
1793     if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(sstore_stage_local_basedir, my_mode)) ) {
1794         ORTE_ERROR_LOG(ret);
1795         exit_status = ret;
1796         goto cleanup;
1797     }
1798 
1799  cleanup:
1800     return exit_status;
1801 }
1802 
sstore_stage_destroy_local_dir(void)1803 static int sstore_stage_destroy_local_dir(void)
1804 {
1805     int ret, exit_status = ORTE_SUCCESS;
1806     char * basedir_root = NULL;
1807 
1808     asprintf(&basedir_root, "%s/%s",
1809              orte_sstore_stage_local_snapshot_dir,
1810              ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME);
1811 
1812     if(OPAL_SUCCESS != (ret = opal_os_dirpath_destroy(basedir_root, true, NULL)) ) {
1813         ORTE_ERROR_LOG(ret);
1814         exit_status = ret;
1815         goto cleanup;
1816     }
1817 
1818  cleanup:
1819     if( NULL != basedir_root ) {
1820         free(basedir_root);
1821         basedir_root = NULL;
1822     }
1823 
1824     return exit_status;
1825 }
1826 
sstore_stage_create_cache(void)1827 static int sstore_stage_create_cache(void)
1828 {
1829     int ret, exit_status = ORTE_SUCCESS;
1830     mode_t my_mode = S_IRWXU;
1831 
1832     /* Sanity check */
1833     if( !orte_sstore_stage_enabled_caching ) {
1834         goto cleanup;
1835     }
1836 
1837     if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(sstore_stage_cache_basedir, my_mode)) ) {
1838         ORTE_ERROR_LOG(ret);
1839         exit_status = ret;
1840         goto cleanup;
1841     }
1842 
1843  cleanup:
1844     return exit_status;
1845 }
1846 
sstore_stage_destroy_cache(void)1847 static int sstore_stage_destroy_cache(void)
1848 {
1849     int ret, exit_status = ORTE_SUCCESS;
1850 
1851     /* Sanity check */
1852     if( !orte_sstore_stage_enabled_caching ) {
1853         goto cleanup;
1854     }
1855 
1856     if(OPAL_SUCCESS != (ret = opal_os_dirpath_destroy(sstore_stage_cache_basedir, true, NULL)) ) {
1857         ORTE_ERROR_LOG(ret);
1858         exit_status = ret;
1859         goto cleanup;
1860     }
1861 
1862  cleanup:
1863     return exit_status;
1864 }
1865 
sstore_stage_update_cache(orte_sstore_stage_local_snapshot_info_t * handle_info)1866 static int sstore_stage_update_cache(orte_sstore_stage_local_snapshot_info_t *handle_info)
1867 {
1868     int ret, exit_status = ORTE_SUCCESS;
1869     char *cmd = NULL;
1870     mode_t my_mode = S_IRWXU;
1871     char *cache_dirname = NULL;
1872     orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1873     opal_list_item_t* item = NULL;
1874     size_t list_size;
1875 
1876     /* Sanity Check */
1877     if( !orte_sstore_stage_enabled_caching || handle_info->migrating) {
1878         goto cleanup;
1879     }
1880 
1881     list_size = opal_list_get_size(handle_info->app_info_handle);
1882     if( 0 >= list_size ) {
1883         /* No processes on this node, skip */
1884         exit_status = ORTE_SUCCESS;
1885         goto cleanup;
1886     }
1887 
1888     app_info = (orte_sstore_stage_local_app_snapshot_info_t*)opal_list_get_first(handle_info->app_info_handle);
1889     if( NULL == app_info ) {
1890         ORTE_ERROR_LOG(ORTE_ERROR);
1891         exit_status = ORTE_ERROR;
1892         goto cleanup;
1893     }
1894 
1895     /*
1896      * Create the base cache directory
1897      */
1898     cache_dirname = opal_dirname(app_info->local_cache_location);
1899     if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(cache_dirname, my_mode)) ) {
1900         ORTE_ERROR_LOG(ret);
1901         exit_status = ret;
1902         goto cleanup;
1903     }
1904 
1905     /*
1906      * For each process, move the current checkpoint to the cache directory
1907      * Cached snapshots are always stored uncompressed.
1908      */
1909     for(item  = opal_list_get_first(handle_info->app_info_handle);
1910         item != opal_list_get_end(handle_info->app_info_handle);
1911         item  = opal_list_get_next(item) ) {
1912         app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1913 
1914         asprintf(&cmd, "mv %s %s", app_info->local_location, cache_dirname);
1915         OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1916                              "sstore:stage:(local): update_cache(): Caching snapshot for process %s [%s]",
1917                              ORTE_NAME_PRINT(&app_info->name),
1918                              cmd));
1919         system(cmd);
1920 
1921         /* (JJH) Remove the cached files */
1922         if( orte_sstore_stage_enabled_compression && NULL != app_info->compressed_local_location) {
1923             free(cmd);
1924             cmd = NULL;
1925 
1926             asprintf(&cmd, "rm -rf %s", app_info->compressed_local_location);
1927             OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1928                                  "sstore:stage:(local): update_cache(): Removing with command (%s)",
1929                                  cmd));
1930             system(cmd);
1931         }
1932     }
1933 
1934     /*
1935      * Remove the previous cached checkpoint
1936      */
1937     if( NULL != sstore_stage_cache_last_dir ) {
1938         asprintf(&cmd, "rm -rf %s", sstore_stage_cache_last_dir);
1939         OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1940                              "sstore:stage:(local): update_cache(): Removing old cache dir command (%s)",
1941                              sstore_stage_cache_last_dir));
1942         system(cmd);
1943     }
1944 
1945     /*
1946      * Update 'last' cache pointer
1947      */
1948     if( NULL != sstore_stage_cache_last_dir ) {
1949         free(sstore_stage_cache_last_dir);
1950         sstore_stage_cache_last_dir = NULL;
1951     }
1952     if( NULL != sstore_stage_cache_current_dir ) {
1953         sstore_stage_cache_last_dir    = strdup(sstore_stage_cache_current_dir);
1954     }
1955 
1956     /*
1957      * Update 'current' cache pointer
1958      */
1959     if( NULL != sstore_stage_cache_current_dir ) {
1960         free(sstore_stage_cache_current_dir);
1961         sstore_stage_cache_current_dir = NULL;
1962     }
1963     sstore_stage_cache_current_dir = strdup(cache_dirname);
1964 
1965     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1966                          "sstore:stage:(local): update_cache(): Cache Pointers cur(%s), last(%s)",
1967                          sstore_stage_cache_current_dir, sstore_stage_cache_last_dir));
1968 
1969  cleanup:
1970     if( NULL != cmd ) {
1971         free(cmd);
1972         cmd = NULL;
1973     }
1974 
1975     return exit_status;
1976 }
1977 
orte_sstore_stage_local_preload_files(char ** local_location,bool * skip_xfer,char * global_loc,char * ref,char * postfix,int seq)1978 static int orte_sstore_stage_local_preload_files(char **local_location, bool *skip_xfer,
1979                                                  char *global_loc, char *ref, char *postfix, int seq)
1980 {
1981     int ret, exit_status = ORTE_SUCCESS;
1982     mode_t my_mode = S_IRWXU;
1983     orte_filem_base_request_t *filem_request;
1984     orte_filem_base_process_set_t *p_set = NULL;
1985     orte_filem_base_file_set_t * f_set = NULL;
1986     char * full_local_location = NULL;
1987 
1988     *skip_xfer = false;
1989 
1990     if( NULL != *local_location) {
1991         free(*local_location);
1992         *local_location = NULL;
1993     }
1994 
1995     /*
1996      * If the global directory is shared, then just reference directly
1997      *
1998      * Skip this optimization if compressing. Since decompressing on the
1999      * central storage would typically require a transfer to the local
2000      * disk to decompress, then transfer back. Eliminating all benefits
2001      * of the optimization.
2002      */
2003     /* (JJH) If we are going to use the preloaded restart files for subsequent
2004      *       restarts then we actually always want to preload the files. This
2005      *       way if we need to restart from the same checkpoint again, then
2006      *       we can from the local restart cache.
2007      */
2008 #if 0
2009     if( orte_sstore_stage_global_is_shared &&
2010         (NULL == postfix || 0 >= strlen(postfix) ) ) {
2011         *local_location = strdup(global_loc);
2012         *skip_xfer = true;
2013         goto cleanup;
2014     }
2015 #endif
2016 
2017     asprintf(local_location, "%s/%s/%s/%d",
2018              orte_sstore_stage_local_snapshot_dir,
2019              ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
2020              ORTE_SSTORE_LOCAL_SNAPSHOT_RESTART_DIR_NAME,
2021              seq);
2022     asprintf(&full_local_location, "%s/%s",
2023              *local_location,
2024              ref);
2025 
2026     /*
2027      * If the snapshot already exists locally, just reuse it instead of
2028      * transfering it again.
2029      */
2030     if( 0 == (ret = access(full_local_location, F_OK)) ) {
2031         OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
2032                              "sstore:stage:(local): preload_files() Local snapshot already exists, reuse it (%s)",
2033                              full_local_location));
2034         *skip_xfer = true;
2035         goto cleanup;
2036     }
2037 
2038     /*
2039      * Create the local restart directory
2040      */
2041     if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(*local_location, my_mode)) ) {
2042         ORTE_ERROR_LOG(ret);
2043         exit_status = ret;
2044         goto cleanup;
2045     }
2046 
2047     /*
2048      * FileM request to move the checkpoint to the local directory
2049      */
2050     filem_request = OBJ_NEW(orte_filem_base_request_t);
2051 
2052     /* Define the process set */
2053     p_set = OBJ_NEW(orte_filem_base_process_set_t);
2054     if( ORTE_PROC_IS_HNP ) {
2055         /* if I am the HNP, then use me as the source */
2056         p_set->source.jobid = ORTE_PROC_MY_NAME->jobid;
2057         p_set->source.vpid  = ORTE_PROC_MY_NAME->vpid;
2058     }
2059     else {
2060         /* otherwise, set the HNP as the source */
2061         p_set->source.jobid = ORTE_PROC_MY_HNP->jobid;
2062         p_set->source.vpid  = ORTE_PROC_MY_HNP->vpid;
2063     }
2064     p_set->sink.jobid   = ORTE_PROC_MY_NAME->jobid;
2065     p_set->sink.vpid    = ORTE_PROC_MY_NAME->vpid;
2066     opal_list_append(&(filem_request->process_sets), &(p_set->super) );
2067 
2068     /* Define the file set */
2069     f_set = OBJ_NEW(orte_filem_base_file_set_t);
2070 
2071     f_set->local_target = strdup(*local_location);
2072     if( NULL != postfix && 0 < strlen(postfix) ) {
2073         asprintf(&(f_set->remote_target), "%s/%s%s",
2074                  global_loc,
2075                  ref,
2076                  postfix);
2077     } else {
2078         asprintf(&(f_set->remote_target), "%s/%s",
2079                  global_loc,
2080                  ref);
2081     }
2082     if( NULL != postfix && 0 < strlen(postfix) ) {
2083         f_set->target_flag = ORTE_FILEM_TYPE_FILE;
2084     } else {
2085         f_set->target_flag = ORTE_FILEM_TYPE_DIR;
2086     }
2087 
2088     opal_list_append(&(filem_request->file_sets), &(f_set->super) );
2089 
2090     /* Start getting the files */
2091     opal_list_append(preload_filem_requests, &(filem_request->super));
2092     if(ORTE_SUCCESS != (ret = orte_filem.get_nb(filem_request)) ) {
2093         ORTE_ERROR_LOG(ret);
2094         exit_status = ret;
2095         goto cleanup;
2096     }
2097 
2098  cleanup:
2099     if( NULL != full_local_location ) {
2100         free(full_local_location);
2101         full_local_location = NULL;
2102     }
2103 
2104     return exit_status;
2105 }
2106