1 /*
2  * Copyright (c)      2011 The Trustees of Indiana University.
3  *                         All rights reserved.
4  * Copyright (c) 2004-2011 The University of Tennessee and The University
5  *                         of Tennessee Research Foundation.  All rights
6  *                         reserved.
7  * $COPYRIGHT$
8  *
9  * Additional copyrights may follow
10  *
11  * $HEADER$
12  */
13 
14 /*
15  *
16  */
17 
18 #include "orte_config.h"
19 
20 #include <string.h>
21 #include <stdlib.h>
22 #include <sys/types.h>
23 #include <sys/stat.h>
24 #include <sys/wait.h>
25 #ifdef HAVE_UNISTD_H
26 #include <unistd.h>
27 #endif  /* HAVE_UNISTD_H */
28 
29 #include "orte/mca/mca.h"
30 #include "opal/mca/base/base.h"
31 
32 #include "opal/mca/event/event.h"
33 
34 #include "orte/constants.h"
35 #include "opal/util/argv.h"
36 #include "opal/util/output.h"
37 #include "opal/util/show_help.h"
38 #include "opal/util/opal_environ.h"
39 #include "opal/util/basename.h"
40 #include "opal/util/os_dirpath.h"
41 #include "opal/util/opal_getcwd.h"
42 
43 #include "opal/threads/mutex.h"
44 #include "opal/threads/condition.h"
45 
46 #include "orte/util/show_help.h"
47 #include "orte/util/name_fns.h"
48 #include "orte/util/proc_info.h"
49 #include "orte/runtime/orte_globals.h"
50 #include "orte/runtime/orte_wait.h"
51 
52 #include "orte/mca/errmgr/errmgr.h"
53 #include "orte/mca/errmgr/base/base.h"
54 #include "orte/mca/errmgr/base/errmgr_private.h"
55 #include "orte/mca/ess/ess.h"
56 #include "orte/mca/rml/rml.h"
57 #include "orte/mca/rml/rml_types.h"
58 #include "orte/mca/filem/filem.h"
59 #include "orte/mca/grpcomm/grpcomm.h"
60 #include "orte/mca/snapc/snapc.h"
61 #include "orte/mca/snapc/base/base.h"
62 
63 #include "orte/mca/sstore/sstore.h"
64 #include "orte/mca/sstore/base/base.h"
65 
66 #include "sstore_stage.h"
67 
68 #define SSTORE_HANDLE_TYPE_NONE    0
69 #define SSTORE_HANDLE_TYPE_CKPT    1
70 #define SSTORE_HANDLE_TYPE_RESTART 2
71 
72 #define SSTORE_GLOBAL_NONE    0
73 #define SSTORE_GLOBAL_ERROR   1
74 #define SSTORE_GLOBAL_INIT    2
75 #define SSTORE_GLOBAL_REG     3
76 #define SSTORE_GLOBAL_SYNCING 4
77 #define SSTORE_GLOBAL_SYNCED  5
78 
79 /**********
80  * Object Stuff
81  **********/
82 struct  orte_sstore_stage_global_snapshot_info_t {
83     /** List super object */
84     opal_list_item_t super;
85 
86     /** */
87     orte_sstore_base_handle_t id;
88 
89     /** Job ID */
90     orte_jobid_t jobid;
91 
92     /** State */
93     int state;
94 
95     /** Handle type */
96     int handle_type;
97 
98     /** Sequence Number */
99     int seq_num;
100 
101     /** Reference Name */
102     char * ref_name;
103 
104     /** Local Location (Relative Path to base_location) */
105     char * local_location;
106 
107     /** Application location format (Global) */
108     char * app_global_location_fmt;
109 
110     /** Application location format (Local) */
111     char * app_local_location_fmt;
112 
113     /** Application location format (Local) */
114     char * app_local_cache_location_fmt;
115 
116     /** Base location */
117     char * base_location;
118 
119     /** Metadata File Name */
120     char *metadata_filename;
121 
122     /** Metadata File Descriptor */
123     FILE *metadata;
124 
125     /** Num procs reported as locally synced */
126     int num_procs_synced;
127 
128     /** Num procs reported as done */
129     int num_procs_done;
130 
131     /** Num procs total in job */
132     int num_procs_total;
133 
134     /** List of FileM Requests to wait upon */
135     opal_list_t *filem_requests;
136 
137     /** Is this checkpoint representing a migration? */
138     bool migrating;
139 
140     /** JJH: Assume all processes are compressed the same way */
141     char * compress_comp;
142     char * compress_postfix;
143 
144     /** Progress Meter */
145     double last_progress_report;
146 };
147 typedef struct orte_sstore_stage_global_snapshot_info_t orte_sstore_stage_global_snapshot_info_t;
148 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_stage_global_snapshot_info_t);
149 
150 void orte_sstore_stage_global_snapshot_info_construct(orte_sstore_stage_global_snapshot_info_t *info);
151 void orte_sstore_stage_global_snapshot_info_destruct( orte_sstore_stage_global_snapshot_info_t *info);
152 
153 OBJ_CLASS_INSTANCE(orte_sstore_stage_global_snapshot_info_t,
154                    opal_list_item_t,
155                    orte_sstore_stage_global_snapshot_info_construct,
156                    orte_sstore_stage_global_snapshot_info_destruct);
157 
158 
159 /**********
160  * Local Function and Variable Declarations
161  **********/
162 static bool is_global_listener_active = false;
163 static int sstore_stage_global_start_listener(void);
164 static int sstore_stage_global_stop_listener(void);
165 static void sstore_stage_global_recv(int status,
166                                        orte_process_name_t* sender,
167                                        opal_buffer_t* buffer,
168                                        orte_rml_tag_t tag,
169                                        void* cbdata);
170 
171 static int process_local_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info);
172 static int process_local_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info);
173 static int process_local_done(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info);
174 static int xcast_remove_all(orte_sstore_stage_global_snapshot_info_t *handle_info);
175 
176 static orte_sstore_stage_global_snapshot_info_t *create_new_handle_info(int seq, int type, orte_jobid_t jobid);
177 static orte_sstore_stage_global_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle);
178 
179 static int metadata_open(orte_sstore_stage_global_snapshot_info_t * handle_info);
180 static int metadata_close(orte_sstore_stage_global_snapshot_info_t * handle_info);
181 static int metadata_write_int(orte_sstore_stage_global_snapshot_info_t * handle_info, char * key, int value);
182 static int metadata_write_str(orte_sstore_stage_global_snapshot_info_t * handle_info, char * key, char *value);
183 static int metadata_write_timestamp(orte_sstore_stage_global_snapshot_info_t * handle_info);
184 
185 static int init_global_snapshot_directory(orte_sstore_stage_global_snapshot_info_t *handle_info);
186 static int stage_snapshot_sort_compare_fn(opal_list_item_t **a,
187                                           opal_list_item_t **b);
188 static int orte_sstore_stage_extract_global_metadata(orte_sstore_stage_global_snapshot_info_t * handle_info,
189                                                      orte_sstore_base_global_snapshot_info_t *global_snapshot);
190 
191 static int wait_all_filem(orte_sstore_stage_global_snapshot_info_t *handle_info);
192 static void sync_global_dir(orte_sstore_stage_global_snapshot_info_t *handle_info);
193 
194 static int next_handle_id = 1;
195 static opal_list_t *active_handles = NULL;
196 
197 /*
198  * Progress
199  */
200 static void sstore_stage_report_progress(orte_sstore_stage_global_snapshot_info_t *handle_info);
201 
202 #define SSTORE_STAGE_REPORT_PROGRESS(handle_info)                       \
203     {                                                                   \
204         if(OPAL_UNLIKELY(orte_sstore_stage_progress_meter > 0)) {       \
205             sstore_stage_report_progress(handle_info);                  \
206         }                                                               \
207     }
208 
209 /**********
210  * Object stuff
211  **********/
orte_sstore_stage_global_snapshot_info_construct(orte_sstore_stage_global_snapshot_info_t * info)212 void orte_sstore_stage_global_snapshot_info_construct(orte_sstore_stage_global_snapshot_info_t *info)
213 {
214     info->id      = next_handle_id;
215     next_handle_id++;
216 
217     info->jobid = ORTE_JOBID_INVALID;
218 
219     info->state = SSTORE_GLOBAL_NONE;
220 
221     info->handle_type = SSTORE_HANDLE_TYPE_NONE;
222 
223     info->seq_num = -1;
224 
225     info->base_location  = strdup(orte_sstore_base_global_snapshot_dir);
226 
227     info->ref_name       = NULL;
228     info->local_location = NULL;
229     info->app_global_location_fmt = NULL;
230     info->app_local_location_fmt = NULL;
231     info->app_local_cache_location_fmt = NULL;
232 
233     info->metadata_filename = NULL;
234     info->metadata = NULL;
235 
236     info->filem_requests = OBJ_NEW(opal_list_t);
237 
238     info->num_procs_synced = 0;
239     info->num_procs_done   = 0;
240     info->num_procs_total  = 0;
241 
242     info->migrating = false;
243 
244     info->compress_comp    = NULL;
245     info->compress_postfix = NULL;
246 
247     info->last_progress_report = 0.0;
248 }
249 
orte_sstore_stage_global_snapshot_info_destruct(orte_sstore_stage_global_snapshot_info_t * info)250 void orte_sstore_stage_global_snapshot_info_destruct( orte_sstore_stage_global_snapshot_info_t *info)
251 {
252     info->id      = 0;
253     info->seq_num = -1;
254 
255     info->jobid = ORTE_JOBID_INVALID;
256 
257     info->state = SSTORE_GLOBAL_NONE;
258 
259     info->handle_type = SSTORE_HANDLE_TYPE_NONE;
260 
261     if( NULL != info->ref_name ) {
262         free( info->ref_name );
263         info->ref_name  = NULL;
264     }
265 
266     if( NULL != info->local_location ) {
267         free( info->local_location );
268         info->local_location = NULL;
269     }
270 
271     if( NULL != info->app_global_location_fmt ) {
272         free( info->app_global_location_fmt );
273         info->app_global_location_fmt = NULL;
274     }
275 
276     if( NULL != info->app_local_location_fmt ) {
277         free( info->app_local_location_fmt );
278         info->app_local_location_fmt = NULL;
279     }
280 
281     if( NULL != info->app_local_cache_location_fmt ) {
282         free( info->app_local_cache_location_fmt );
283         info->app_local_cache_location_fmt = NULL;
284     }
285 
286     if( NULL != info->base_location ) {
287         free( info->base_location );
288         info->base_location = NULL;
289     }
290 
291     if( NULL != info->metadata_filename ) {
292         free( info->metadata_filename ) ;
293         info->metadata_filename = NULL;
294     }
295 
296     if( NULL != info->metadata ) {
297         fclose(info->metadata);
298         info->metadata = NULL;
299     }
300 
301     if( NULL != info->filem_requests ) {
302         OBJ_RELEASE(info->filem_requests);
303         info->filem_requests = NULL;
304     }
305 
306     info->num_procs_synced = 0;
307     info->num_procs_done   = 0;
308     info->num_procs_total  = 0;
309 
310     info->migrating = false;
311 
312     if( NULL != info->compress_comp ) {
313         free(info->compress_comp);
314         info->compress_comp = NULL;
315     }
316 
317     if( NULL != info->compress_postfix ) {
318         free(info->compress_postfix);
319         info->compress_postfix = NULL;
320     }
321 
322     info->last_progress_report = 0.0;
323 }
324 
325 /******************
326  * Local functions
327  ******************/
orte_sstore_stage_global_module_init(void)328 int orte_sstore_stage_global_module_init(void)
329 {
330     int ret, exit_status = ORTE_SUCCESS;
331 
332     if( NULL == active_handles ) {
333         active_handles = OBJ_NEW(opal_list_t);
334     }
335 
336     /*
337      * If user has not enabled recovery, but enabled Caching  then caching does
338      * not benefit the job. Continue using it, but warn the user.
339      */
340     if( orte_sstore_stage_enabled_caching && !orte_enable_recovery ) {
341         opal_show_help("help-orte-sstore-stage.txt", "caching_no_recovery", true);
342     }
343 
344     /*
345      * Setup a listener for the HNP/Apps
346      */
347     if( ORTE_SUCCESS != (ret = sstore_stage_global_start_listener()) ) {
348         ORTE_ERROR_LOG(ret);
349         exit_status = ret;
350         goto cleanup;
351     }
352 
353     exit_status = orte_sstore_stage_local_module_init();
354 
355  cleanup:
356     return exit_status;
357 }
358 
orte_sstore_stage_global_module_finalize(void)359 int orte_sstore_stage_global_module_finalize(void)
360 {
361     int ret, exit_status = ORTE_SUCCESS;
362     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
363     opal_list_item_t* item = NULL;
364     bool done = false;
365     int cur_time = 0, max_time = 120;
366 
367     /*
368      * Wait for all active transfers to finish
369      */
370     done = false;
371     while( 0 < opal_list_get_size(active_handles) && !done ) {
372         done = true;
373         for(item  = opal_list_get_first(active_handles);
374             item != opal_list_get_end(active_handles);
375             item  = opal_list_get_next(item) ) {
376             handle_info = (orte_sstore_stage_global_snapshot_info_t*)item;
377             if( SSTORE_GLOBAL_SYNCED != handle_info->state &&
378                 SSTORE_GLOBAL_NONE   != handle_info->state ) {
379                 done = false;
380                 break;
381             }
382         }
383         if( done ) {
384             break;
385         }
386         else {
387             if( cur_time != 0 && cur_time % 30 == 0 ) {
388                 opal_output(0, "---> Waiting for sync(): %3d / %3d\n",
389                             cur_time, max_time);
390             }
391 
392             opal_progress();
393             if( cur_time >= max_time ) {
394                 break;
395             } else {
396                 sleep(1);
397             }
398             cur_time++;
399         }
400     }
401 
402     exit_status = orte_sstore_stage_local_module_finalize();
403 
404     if( NULL != active_handles ) {
405         OBJ_RELEASE(active_handles);
406     }
407 
408     /*
409      * Shutdown the listener for the HNP/Apps
410      */
411     if( ORTE_SUCCESS != (ret = sstore_stage_global_stop_listener()) ) {
412         ORTE_ERROR_LOG(ret);
413         exit_status = ret;
414         goto cleanup;
415     }
416 
417  cleanup:
418     return exit_status;
419 }
420 
orte_sstore_stage_global_request_checkpoint_handle(orte_sstore_base_handle_t * handle,int seq,orte_jobid_t jobid)421 int orte_sstore_stage_global_request_checkpoint_handle(orte_sstore_base_handle_t *handle, int seq, orte_jobid_t jobid)
422 {
423     int ret, exit_status = ORTE_SUCCESS;
424     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
425 
426     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
427                          "sstore:stage:(global): request_checkpoint_handle()"));
428 
429     /*
430      * Construct a handle
431      *  - Associate all of the necessary information
432      */
433     handle_info = create_new_handle_info(seq, SSTORE_HANDLE_TYPE_CKPT, jobid);
434 
435     /*
436      * Create the global checkpoint directory
437      */
438     if( ORTE_SUCCESS != (ret = init_global_snapshot_directory(handle_info)) ) {
439         ORTE_ERROR_LOG(ret);
440         exit_status = ret;
441         goto cleanup;
442     }
443 
444     /*
445      * Return the handle
446      */
447     *handle = handle_info->id;
448 
449  cleanup:
450     return exit_status;
451 }
452 
orte_sstore_stage_global_request_global_snapshot_data(orte_sstore_base_handle_t * handle,orte_sstore_base_global_snapshot_info_t * snapshot)453 int orte_sstore_stage_global_request_global_snapshot_data(orte_sstore_base_handle_t *handle,
454                                                           orte_sstore_base_global_snapshot_info_t *snapshot)
455 {
456     int ret, exit_status = ORTE_SUCCESS;
457     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
458 
459     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
460                          "sstore:stage:(global): request_global_snapshot_data()"));
461 
462     /*
463      * Lookup the handle (if NULL, use last stable)
464      */
465     if( NULL != handle ) {
466         handle_info = find_handle_info(*handle);
467         snapshot->ss_handle = *handle;
468     } else {
469         handle_info = find_handle_info(orte_sstore_handle_last_stable);
470         snapshot->ss_handle = orte_sstore_handle_last_stable;
471     }
472 
473     /*
474      * Construct the snapshot from local data, and metadata file
475      */
476     snapshot->seq_num   = handle_info->seq_num;
477     snapshot->reference = strdup(handle_info->ref_name);
478     snapshot->basedir   = strdup(handle_info->base_location);
479     snapshot->metadata_filename = strdup(handle_info->metadata_filename);
480 
481     /* If this is the current checkpoint, pull data from local cache */
482     if( orte_sstore_handle_current == snapshot->ss_handle ) {
483         if( ORTE_SUCCESS != (ret = orte_sstore_stage_extract_global_metadata(handle_info, snapshot)) ) {
484             ORTE_ERROR_LOG(ret);
485             exit_status = ret;
486             goto cleanup;
487         }
488     }
489     /* Otherwise, pull from metadata */
490     else {
491         if( ORTE_SUCCESS != (ret = orte_sstore_base_extract_global_metadata(snapshot)) ) {
492             ORTE_ERROR_LOG(ret);
493             exit_status = ret;
494             goto cleanup;
495         }
496     }
497 
498     opal_list_sort(&snapshot->local_snapshots, stage_snapshot_sort_compare_fn);
499 
500  cleanup:
501     return exit_status;
502 }
503 
orte_sstore_stage_global_register(orte_sstore_base_handle_t handle)504 int orte_sstore_stage_global_register(orte_sstore_base_handle_t handle)
505 {
506     int ret, exit_status = ORTE_SUCCESS;
507     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
508 
509     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
510                          "sstore:stage:(global): register(%d) - Global", handle));
511 
512     /*
513      * Lookup the handle
514      */
515     handle_info = find_handle_info(handle);
516     if( SSTORE_GLOBAL_REG != handle_info->state ) {
517         handle_info->state = SSTORE_GLOBAL_REG;
518     } else {
519         return orte_sstore_stage_local_register(handle);
520     }
521 
522     orte_sstore_handle_current = handle;
523 
524     /*
525      * Associate the metadata
526      */
527     if( handle_info->migrating ) {
528         if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
529                                                       SSTORE_METADATA_INTERNAL_MIG_SEQ_STR,
530                                                       handle_info->seq_num)) ) {
531             ORTE_ERROR_LOG(ret);
532             exit_status = ret;
533             goto cleanup;
534         }
535     } else {
536         if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
537                                                       SSTORE_METADATA_GLOBAL_SNAP_SEQ_STR,
538                                                       handle_info->seq_num)) ) {
539             ORTE_ERROR_LOG(ret);
540             exit_status = ret;
541             goto cleanup;
542         }
543     }
544 
545     if( ORTE_SUCCESS != (ret = metadata_write_str(handle_info,
546                                                   SSTORE_METADATA_LOCAL_SNAP_REF_FMT_STR,
547                                                   orte_sstore_base_local_snapshot_fmt)) ) {
548         ORTE_ERROR_LOG(ret);
549         exit_status = ret;
550         goto cleanup;
551     }
552 
553     if( ORTE_SUCCESS != (ret = metadata_write_timestamp(handle_info)) ) {
554         ORTE_ERROR_LOG(ret);
555         exit_status = ret;
556         goto cleanup;
557     }
558 
559  cleanup:
560     return exit_status;
561 }
562 
orte_sstore_stage_global_get_attr(orte_sstore_base_handle_t handle,orte_sstore_base_key_t key,char ** value)563 int orte_sstore_stage_global_get_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char **value)
564 {
565     int exit_status = ORTE_SUCCESS;
566     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
567 
568     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
569                          "sstore:stage:(global): get_attr()"));
570 
571     /*
572      * Lookup the handle
573      */
574     handle_info = find_handle_info(handle);
575 
576     /*
577      * Access metadata
578      */
579     /* Used by snapc */
580     if( SSTORE_METADATA_GLOBAL_SNAP_REF == key ) {
581         *value = strdup(handle_info->ref_name);
582     }
583     /* Used by snapc */
584     else if( SSTORE_METADATA_GLOBAL_SNAP_SEQ == key ) {
585         asprintf(value, "%d", handle_info->seq_num);
586     }
587     /* Used by orte-restart and RecoS and snapc (kinda) */
588     else if( SSTORE_METADATA_LOCAL_SNAP_LOC == key ) {
589         asprintf(value, "%s/%s/%d",
590                  handle_info->base_location,
591                  handle_info->ref_name,
592                  handle_info->seq_num);
593     }
594     /* Used by orte-restart and RecoS */
595     else if( SSTORE_METADATA_LOCAL_SNAP_REF_FMT == key ) {
596         *value = strdup(orte_sstore_base_local_snapshot_fmt);
597     }
598     /* Used by orte-restart and RecoS */
599     else if( SSTORE_METADATA_LOCAL_SNAP_REF_LOC_FMT == key ) {
600         asprintf(value, "%s/%s/%d/%s",
601                  handle_info->base_location,
602                  handle_info->ref_name,
603                  handle_info->seq_num,
604                  orte_sstore_base_local_snapshot_fmt);
605     }
606     else {
607         exit_status = ORTE_ERR_NOT_SUPPORTED;
608     }
609 
610     return exit_status;
611 }
612 
orte_sstore_stage_global_set_attr(orte_sstore_base_handle_t handle,orte_sstore_base_key_t key,char * value)613 int orte_sstore_stage_global_set_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char *value)
614 {
615     int ret, exit_status = ORTE_SUCCESS;
616     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
617     char *key_str = NULL;
618 
619     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
620                          "sstore:stage:(global): set_attr()"));
621 
622     /*
623      * Lookup the handle
624      */
625     handle_info = find_handle_info(handle);
626 
627     /*
628      * Process key (Access metadata)
629      */
630     if( key == SSTORE_METADATA_GLOBAL_MIGRATING ) {
631         handle_info->migrating = true;
632     }
633     else {
634         orte_sstore_base_convert_key_to_string(key, &key_str);
635         if( NULL == key_str ) {
636             ORTE_ERROR_LOG(ORTE_ERROR);
637             exit_status = ORTE_ERROR;
638             goto cleanup;
639         }
640 
641         if( ORTE_SUCCESS != (ret = metadata_write_str(handle_info, key_str, value))) {
642             ORTE_ERROR_LOG(ret);
643             exit_status = ret;
644             goto cleanup;
645         }
646     }
647 
648  cleanup:
649     if( NULL != key_str ) {
650         free(key_str);
651         key_str = NULL;
652     }
653 
654     return exit_status;
655 }
656 
orte_sstore_stage_global_sync(orte_sstore_base_handle_t handle)657 int orte_sstore_stage_global_sync(orte_sstore_base_handle_t handle)
658 {
659     int ret, exit_status = ORTE_SUCCESS;
660     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
661 
662     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
663                          "sstore:stage:(global): sync()"));
664 
665     /*
666      * Lookup the handle
667      */
668     handle_info = find_handle_info(handle);
669     if( SSTORE_GLOBAL_SYNCING != handle_info->state ) {
670         handle_info->state = SSTORE_GLOBAL_SYNCING;
671         if( ORTE_SNAPC_LOCAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_LOCAL_COORD_TYPE) ) {
672             return orte_sstore_stage_local_sync(handle);
673         }
674     }
675 
676     /*
677      * Wait for all the processes to report in before waiting on all the requests
678      */
679     while(handle_info->num_procs_synced < handle_info->num_procs_total) {
680         opal_progress();
681     }
682 
683     /*
684      * Synchronize all of the files
685      * Wait on FileM operations
686      */
687     if( !orte_sstore_stage_skip_filem ) {
688         if( ORTE_SUCCESS != (ret = wait_all_filem(handle_info))) {
689             ORTE_ERROR_LOG(ret);
690             exit_status = ret;
691             goto cleanup;
692         }
693     }
694 
695     /*
696      * Finalize and close the metadata
697      */
698     if( ORTE_SUCCESS != (ret = metadata_write_timestamp(handle_info)) ) {
699         ORTE_ERROR_LOG(ret);
700         exit_status = ret;
701         goto cleanup;
702     }
703 
704     if( handle_info->migrating ) {
705         if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
706                                                       SSTORE_METADATA_INTERNAL_DONE_MIG_SEQ_STR,
707                                                       handle_info->seq_num)) ) {
708             ORTE_ERROR_LOG(ret);
709             exit_status = ret;
710             goto cleanup;
711         }
712     } else {
713         if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
714                                                       SSTORE_METADATA_INTERNAL_DONE_SEQ_STR,
715                                                       handle_info->seq_num)) ) {
716             ORTE_ERROR_LOG(ret);
717             exit_status = ret;
718             goto cleanup;
719         }
720     }
721 
722     if( ORTE_SUCCESS != (ret = metadata_close(handle_info)) ) {
723         ORTE_ERROR_LOG(ret);
724         exit_status = ret;
725         goto cleanup;
726     }
727 
728     /* JJH: We should lock this var! */
729     if( !handle_info->migrating ) {
730         orte_sstore_base_is_checkpoint_available = true;
731         orte_sstore_handle_last_stable = orte_sstore_handle_current;
732     }
733 
734     handle_info->state = SSTORE_GLOBAL_SYNCED;
735 
736  cleanup:
737     return exit_status;
738 }
739 
sync_global_dir(orte_sstore_stage_global_snapshot_info_t * handle_info)740 static void sync_global_dir(orte_sstore_stage_global_snapshot_info_t *handle_info)
741 {
742     opal_list_item_t* item = NULL, *f_item = NULL;
743     orte_filem_base_request_t *filem_request = NULL;
744     orte_filem_base_file_set_t * f_set = NULL;
745     char * fs_str = NULL;
746     char cwd[OPAL_PATH_MAX];
747 
748     opal_getcwd(cwd, OPAL_PATH_MAX);
749 
750     /*
751      * Sync the Sequence num dir
752      */
753     asprintf(&fs_str, "%s/%s/%d",
754              handle_info->base_location,
755              handle_info->ref_name,
756              handle_info->seq_num);
757     OPAL_OUTPUT_VERBOSE((20, mca_sstore_stage_component.super.output_handle,
758                          "sstore:stage:(global): sync_dir(): Sync'ing on %s",
759                          fs_str));
760     if( 0 != chdir(fs_str) ) {
761         opal_output(0, "sstore:stage:(global): Failed to chdir(%s)",
762                     fs_str);
763         goto cleanup;
764     }
765     system("sync ; sync ; ls > /dev/null");
766 
767     /*
768      * Sync each of the local snapshots
769      * if compressing, then this is already covered above
770      */
771     if( orte_sstore_stage_enabled_compression ) {
772         goto cleanup;
773     }
774 
775     for(f_item  = opal_list_get_first(handle_info->filem_requests);
776         f_item != opal_list_get_end(handle_info->filem_requests);
777         f_item  = opal_list_get_next(f_item) ) {
778         filem_request = (orte_filem_base_request_t *)f_item;
779 
780         for(item  = opal_list_get_first(&(filem_request->file_sets));
781             item != opal_list_get_end(&(filem_request->file_sets));
782             item  = opal_list_get_next(item) ) {
783             f_set = (orte_filem_base_file_set_t *) item;
784 
785             if( NULL != fs_str ) {
786                 free(fs_str);
787                 fs_str = NULL;
788             }
789 
790             if( ORTE_FILEM_TYPE_FILE != f_set->target_flag ) {
791                 OPAL_OUTPUT_VERBOSE((20, mca_sstore_stage_component.super.output_handle,
792                                      "sstore:stage:(global): sync_dir(): Sync'ing on %s",
793                                      f_set->local_target));
794                 if( 0 != chdir(f_set->local_target) ) {
795                     opal_output(0, "sstore:stage:(global): Failed to chdir(%s)",
796                                 f_set->local_target);
797                 } else {
798                     system("sync ; sync ");
799                 }
800             }
801         }
802     }
803 
804  cleanup:
805     chdir(cwd);
806 
807     if( NULL != fs_str ) {
808         free(fs_str);
809         fs_str = NULL;
810     }
811 
812     return;
813 }
814 
orte_sstore_stage_global_remove(orte_sstore_base_handle_t handle)815 int orte_sstore_stage_global_remove(orte_sstore_base_handle_t handle)
816 {
817     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
818                          "sstore:stage:(global): remove()"));
819 
820     /*
821      * Lookup the handle
822      */
823 
824     return ORTE_SUCCESS;
825 }
826 
orte_sstore_stage_global_pack(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_base_handle_t handle)827 int orte_sstore_stage_global_pack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t handle)
828 {
829     int ret, exit_status = ORTE_SUCCESS;
830     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
831 
832     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
833                          "sstore:stage:(global): pack()"));
834 
835     /*
836      * Lookup the handle
837      */
838     handle_info = find_handle_info(handle);
839 
840     /*
841      * Pack the handle ID
842      */
843     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &handle, 1, ORTE_SSTORE_HANDLE))) {
844         ORTE_ERROR_LOG(ret);
845         exit_status = ret;
846         goto cleanup;
847     }
848 
849     /*
850      * Pack any metadata
851      */
852     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->seq_num), 1, OPAL_INT )) ) {
853         ORTE_ERROR_LOG(ret);
854         exit_status = ret;
855         goto cleanup;
856     }
857 
858     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->ref_name), 1, OPAL_STRING )) ) {
859         ORTE_ERROR_LOG(ret);
860         exit_status = ret;
861         goto cleanup;
862     }
863 
864     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->app_local_location_fmt), 1, OPAL_STRING )) ) {
865         ORTE_ERROR_LOG(ret);
866         exit_status = ret;
867         goto cleanup;
868     }
869 
870     if( orte_sstore_stage_enabled_caching ) {
871         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->app_local_cache_location_fmt), 1, OPAL_STRING )) ) {
872             ORTE_ERROR_LOG(ret);
873             exit_status = ret;
874             goto cleanup;
875         }
876     }
877 
878     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->migrating), 1, OPAL_BOOL )) ) {
879         ORTE_ERROR_LOG(ret);
880         exit_status = ret;
881         goto cleanup;
882     }
883 
884  cleanup:
885     return exit_status;
886 }
887 
orte_sstore_stage_global_unpack(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_base_handle_t * handle)888 int orte_sstore_stage_global_unpack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t *handle)
889 {
890     int ret, exit_status = ORTE_SUCCESS;
891 
892     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
893                          "sstore:stage:(global): unpack()"));
894 
895     /*
896      * Unpack the handle id
897      */
898     if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_JOBID,
899                                                    ORTE_PROC_MY_NAME,
900                                                    peer)) {
901         /*
902          * Differ to the orted version, so if we have application then they get updated too
903          */
904         if( ORTE_SUCCESS != (ret = orte_sstore_stage_local_unpack(peer, buffer, handle)) ) {
905             ORTE_ERROR_LOG(ret);
906             exit_status = ret;
907             goto cleanup;
908         }
909     }
910 
911  cleanup:
912     return exit_status;
913 }
914 
915 /**************************
916  * Local functions
917  **************************/
create_new_handle_info(int seq,int type,orte_jobid_t jobid)918 static orte_sstore_stage_global_snapshot_info_t *create_new_handle_info(int seq, int type, orte_jobid_t jobid)
919 {
920     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
921     orte_job_t *jdata = NULL;
922 
923     handle_info = OBJ_NEW(orte_sstore_stage_global_snapshot_info_t);
924 
925     handle_info->jobid = jobid;
926 
927     handle_info->state = SSTORE_GLOBAL_INIT;
928 
929     handle_info->handle_type = type;
930 
931     handle_info->seq_num = seq;
932 
933     orte_sstore_base_get_global_snapshot_ref(&(handle_info->ref_name), getpid());
934 
935     asprintf(&(handle_info->local_location), "%s/%d",
936              handle_info->ref_name, handle_info->seq_num);
937 
938     /* This is used by the application to establish the local directory */
939     asprintf(&(handle_info->app_local_location_fmt), "%s/%s/%s/%s",
940              orte_sstore_stage_local_snapshot_dir,
941              ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
942              ORTE_SSTORE_LOCAL_SNAPSHOT_STAGE_DIR_NAME,
943              orte_sstore_base_local_snapshot_fmt);
944 
945     if( orte_sstore_stage_enabled_caching ) {
946         asprintf(&(handle_info->app_local_cache_location_fmt), "%s/%s/%s/%d/%s",
947                  orte_sstore_stage_local_snapshot_dir,
948                  ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
949                  ORTE_SSTORE_LOCAL_SNAPSHOT_CACHE_DIR_NAME,
950                  handle_info->seq_num,
951                  orte_sstore_base_local_snapshot_fmt);
952     }
953 
954     /* This is used by the HNP to remember where it should place each process */
955     asprintf(&(handle_info->app_global_location_fmt), "%s/%s/%s",
956              handle_info->base_location,
957              handle_info->local_location,
958              orte_sstore_base_local_snapshot_fmt);
959 
960     asprintf(&(handle_info->metadata_filename), "%s/%s/%s",
961              handle_info->base_location,
962              handle_info->ref_name,
963              orte_sstore_base_global_metadata_filename);
964 
965     jdata = orte_get_job_data_object(handle_info->jobid);
966     handle_info->num_procs_total = (int)jdata->num_procs;
967 
968     opal_list_append(active_handles, &(handle_info->super));
969 
970     return handle_info;
971 }
972 
find_handle_info(orte_sstore_base_handle_t handle)973 static orte_sstore_stage_global_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle)
974 {
975     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
976     opal_list_item_t* item = NULL;
977 
978     for(item  = opal_list_get_first(active_handles);
979         item != opal_list_get_end(active_handles);
980         item  = opal_list_get_next(item) ) {
981         handle_info = (orte_sstore_stage_global_snapshot_info_t*)item;
982 
983         if( handle_info->id == handle ) {
984             return handle_info;
985         }
986     }
987 
988     return NULL;
989 }
990 
sstore_stage_global_start_listener(void)991 static int sstore_stage_global_start_listener(void)
992 {
993     if( is_global_listener_active ) {
994         return ORTE_SUCCESS;
995     }
996 
997     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL,
998                             ORTE_RML_PERSISTENT, sstore_stage_global_recv, NULL);
999 
1000     is_global_listener_active = true;
1001     return ORTE_SUCCESS;
1002 }
1003 
sstore_stage_global_stop_listener(void)1004 static int sstore_stage_global_stop_listener(void)
1005 {
1006     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL);
1007 
1008     is_global_listener_active = false;
1009     return ORTE_SUCCESS;
1010 }
1011 
sstore_stage_global_recv(int status,orte_process_name_t * sender,opal_buffer_t * buffer,orte_rml_tag_t tag,void * cbdata)1012 static void sstore_stage_global_recv(int status,
1013                                        orte_process_name_t* sender,
1014                                        opal_buffer_t* buffer,
1015                                        orte_rml_tag_t tag,
1016                                        void* cbdata)
1017 {
1018     int ret;
1019     orte_sstore_stage_cmd_flag_t command;
1020     orte_std_cntr_t count;
1021     orte_sstore_base_handle_t loc_id;
1022     orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
1023 
1024     if( ORTE_RML_TAG_SSTORE_INTERNAL != tag ) {
1025         return;
1026     }
1027 
1028     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1029                          "sstore:stage:(global): process_cmd(%s)",
1030                          ORTE_NAME_PRINT(sender)));
1031 
1032     count = 1;
1033     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_STAGE_CMD))) {
1034         ORTE_ERROR_LOG(ret);
1035         goto cleanup;
1036     }
1037 
1038     count = 1;
1039     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE )) ) {
1040         ORTE_ERROR_LOG(ret);
1041         goto cleanup;
1042     }
1043 
1044     /*
1045      * If this was an application process contacting us, then act like an orted
1046      * instead of an HNP
1047      */
1048     if(OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_JOBID,
1049                                                    ORTE_PROC_MY_NAME,
1050                                                    sender)) {
1051 
1052         OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1053                              "sstore:stage:(local): process_cmd(%s)",
1054                              ORTE_NAME_PRINT(sender)));
1055 
1056         orte_sstore_stage_local_process_cmd_action(sender, command, loc_id, buffer);
1057         return;
1058     }
1059 
1060     /*
1061      * Find the referenced handle
1062      */
1063     if(NULL == (handle_info = find_handle_info(loc_id)) ) {
1064         ; /* JJH big problem */
1065     }
1066 
1067     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1068                          "sstore:stage:(global): process_cmd(%s) - Command = %s",
1069                          ORTE_NAME_PRINT(sender),
1070                          (ORTE_SSTORE_STAGE_PULL == command ? "Pull" :
1071                           (ORTE_SSTORE_STAGE_PUSH == command ? "Push" :
1072                            (ORTE_SSTORE_STAGE_REMOVE == command ? "Remove" :
1073                             (ORTE_SSTORE_STAGE_DONE == command ? "Done" : "Unknown")))) ));
1074 
1075     /*
1076      * Process the command
1077      */
1078     if( ORTE_SSTORE_STAGE_PULL == command ) {
1079         process_local_pull(sender, buffer, handle_info);
1080     }
1081     else if( ORTE_SSTORE_STAGE_PUSH == command ) {
1082         process_local_push(sender, buffer, handle_info);
1083     }
1084     else if( ORTE_SSTORE_STAGE_REMOVE == command ) {
1085         /* This is actually intended for the local coordinator */
1086         orte_sstore_stage_local_process_cmd_action(sender, command, loc_id, buffer);
1087     }
1088     else if( ORTE_SSTORE_STAGE_DONE == command ) {
1089         process_local_done(sender, buffer, handle_info);
1090     }
1091 
1092  cleanup:
1093     return;
1094 }
1095 
process_local_pull(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_global_snapshot_info_t * handle_info)1096 static int process_local_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info)
1097 {
1098     int ret, exit_status = ORTE_SUCCESS;
1099     opal_buffer_t *loc_buffer = NULL;
1100     orte_sstore_stage_cmd_flag_t command;
1101 
1102     /*
1103      * Push back the requested information
1104      */
1105     loc_buffer = OBJ_NEW(opal_buffer_t);
1106 
1107     command = ORTE_SSTORE_STAGE_PUSH;
1108     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1109         ORTE_ERROR_LOG(ret);
1110         exit_status = ret;
1111         goto cleanup;
1112     }
1113 
1114     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1115         ORTE_ERROR_LOG(ret);
1116         exit_status = ret;
1117         goto cleanup;
1118     }
1119 
1120     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->seq_num), 1, OPAL_INT))) {
1121         ORTE_ERROR_LOG(ret);
1122         exit_status = ret;
1123         goto cleanup;
1124     }
1125 
1126     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->ref_name), 1, OPAL_STRING))) {
1127         ORTE_ERROR_LOG(ret);
1128         exit_status = ret;
1129         goto cleanup;
1130     }
1131 
1132     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->app_local_location_fmt), 1, OPAL_STRING))) {
1133         ORTE_ERROR_LOG(ret);
1134         exit_status = ret;
1135         goto cleanup;
1136     }
1137 
1138     if( orte_sstore_stage_enabled_caching ) {
1139         if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->app_local_cache_location_fmt), 1, OPAL_STRING))) {
1140             ORTE_ERROR_LOG(ret);
1141             exit_status = ret;
1142             goto cleanup;
1143         }
1144     }
1145 
1146     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->migrating), 1, OPAL_BOOL))) {
1147         ORTE_ERROR_LOG(ret);
1148         exit_status = ret;
1149         goto cleanup;
1150     }
1151 
1152     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1153                                                        orte_rml_send_callback, NULL))) {
1154         ORTE_ERROR_LOG(ret);
1155         exit_status = ret;
1156         goto cleanup;
1157     }
1158     /* loc_buffer should not be released here; the callback releases it */
1159     loc_buffer = NULL;
1160 
1161  cleanup:
1162     if (NULL != loc_buffer) {
1163         OBJ_RELEASE(loc_buffer);
1164         loc_buffer = NULL;
1165     }
1166 
1167     return exit_status;
1168 }
1169 
process_local_push(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_global_snapshot_info_t * handle_info)1170 static int process_local_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info)
1171 {
1172     int ret, exit_status = ORTE_SUCCESS;
1173     orte_std_cntr_t count;
1174     size_t num_entries, i;
1175     orte_process_name_t name;
1176     bool ckpt_skipped = false;
1177     char * crs_comp = NULL;
1178     char * compress_comp = NULL;
1179     char * compress_postfix = NULL;
1180     char * proc_name = NULL;
1181     char * tmp_str = NULL;
1182     orte_filem_base_request_t *filem_request = NULL;
1183     orte_filem_base_process_set_t *p_set = NULL;
1184     orte_filem_base_file_set_t * f_set = NULL;
1185 
1186     if( !orte_sstore_stage_skip_filem ) {
1187         filem_request = OBJ_NEW(orte_filem_base_request_t);
1188         /*
1189          * Define the process set:
1190          * Source (daemon) -> Sink (HNP)
1191          */
1192         p_set = OBJ_NEW(orte_filem_base_process_set_t);
1193         p_set->source.jobid = peer->jobid;
1194         p_set->source.vpid  = peer->vpid;
1195         p_set->sink.jobid   = ORTE_PROC_MY_NAME->jobid;
1196         p_set->sink.vpid    = ORTE_PROC_MY_NAME->vpid;
1197         opal_list_append(&(filem_request->process_sets), &(p_set->super) );
1198     }
1199 
1200     /*
1201      * Unpack the data
1202      */
1203     count = 1;
1204     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_entries, &count, OPAL_SIZE))) {
1205         ORTE_ERROR_LOG(ret);
1206         exit_status = ret;
1207         goto cleanup;
1208     }
1209 
1210     for(i = 0; i < num_entries; ++i ) {
1211         count = 1;
1212         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &name, &count, ORTE_NAME))) {
1213             ORTE_ERROR_LOG(ret);
1214             exit_status = ret;
1215             goto cleanup;
1216         }
1217 
1218         count = 1;
1219         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &ckpt_skipped, &count, OPAL_BOOL))) {
1220             ORTE_ERROR_LOG(ret);
1221             exit_status = ret;
1222             goto cleanup;
1223         }
1224 
1225         if( !ckpt_skipped ) {
1226             count = 1;
1227             if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &crs_comp, &count, OPAL_STRING))) {
1228                 ORTE_ERROR_LOG(ret);
1229                 exit_status = ret;
1230                 goto cleanup;
1231             }
1232 
1233             if( orte_sstore_stage_enabled_compression ) {
1234                 count = 1;
1235                 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &compress_comp, &count, OPAL_STRING))) {
1236                     ORTE_ERROR_LOG(ret);
1237                     exit_status = ret;
1238                     goto cleanup;
1239                 }
1240                 if( NULL == handle_info->compress_comp ) {
1241                     handle_info->compress_comp = strdup(compress_comp);
1242                 }
1243 
1244                 count = 1;
1245                 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &compress_postfix, &count, OPAL_STRING))) {
1246                     ORTE_ERROR_LOG(ret);
1247                     exit_status = ret;
1248                     goto cleanup;
1249                 }
1250                 if( NULL == handle_info->compress_postfix ) {
1251                     handle_info->compress_postfix = strdup(compress_postfix);
1252                 }
1253             }
1254 
1255             if( !orte_sstore_stage_skip_filem ) {
1256                 /*
1257                  * Append to the file set for movement
1258                  */
1259                 f_set = OBJ_NEW(orte_filem_base_file_set_t);
1260                 if( orte_sstore_stage_enabled_compression ) {
1261                     f_set->target_flag   = ORTE_FILEM_TYPE_FILE;
1262                 } else {
1263                     f_set->target_flag   = ORTE_FILEM_TYPE_DIR;
1264                 }
1265 
1266                 if( orte_sstore_stage_enabled_compression ) {
1267                     asprintf(&tmp_str,
1268                              handle_info->app_global_location_fmt,
1269                              name.vpid);
1270                     asprintf(&(f_set->local_target), "%s%s",
1271                              tmp_str,
1272                              compress_postfix);
1273                 } else {
1274                     asprintf(&(f_set->local_target),
1275                              handle_info->app_global_location_fmt,
1276                              name.vpid);
1277                 }
1278 
1279                 if( orte_sstore_stage_global_is_shared ) {
1280                     f_set->local_hint = ORTE_FILEM_HINT_SHARED;
1281                 }
1282 
1283                 if( orte_sstore_stage_enabled_compression ) {
1284                     asprintf(&tmp_str,
1285                              handle_info->app_local_location_fmt,
1286                              name.vpid);
1287                     asprintf(&(f_set->remote_target), "%s%s",
1288                              tmp_str,
1289                              compress_postfix);
1290                 } else {
1291                     asprintf(&(f_set->remote_target),
1292                              handle_info->app_local_location_fmt,
1293                              name.vpid);
1294                 }
1295 
1296                 opal_list_append(&(filem_request->file_sets), &(f_set->super) );
1297 
1298                 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1299                                      "sstore:stage:(global): push(): Pulling remote file <%s> to <%s>",
1300                                      f_set->remote_target,
1301                                      f_set->local_target));
1302             }
1303 
1304             /*
1305              * Write this information to the global metadata
1306              */
1307             orte_util_convert_process_name_to_string(&proc_name, &name);
1308 
1309             metadata_write_str(handle_info,
1310                                SSTORE_METADATA_INTERNAL_PROCESS_STR,
1311                                proc_name);
1312             metadata_write_str(handle_info,
1313                                SSTORE_METADATA_LOCAL_CRS_COMP_STR,
1314                                crs_comp);
1315             if( orte_sstore_stage_enabled_compression ) {
1316                 metadata_write_str(handle_info,
1317                                    SSTORE_METADATA_LOCAL_COMPRESS_COMP_STR,
1318                                    compress_comp);
1319                 metadata_write_str(handle_info,
1320                                    SSTORE_METADATA_LOCAL_COMPRESS_POSTFIX_STR,
1321                                    compress_postfix);
1322             }
1323         }
1324 
1325         if( NULL != crs_comp ) {
1326             free(crs_comp);
1327             crs_comp = NULL;
1328         }
1329         if( NULL != compress_comp ) {
1330             free(compress_comp);
1331             compress_comp = NULL;
1332         }
1333         if( NULL != compress_postfix ) {
1334             free(compress_postfix);
1335             compress_postfix = NULL;
1336         }
1337         if( NULL != proc_name ) {
1338             free(proc_name);
1339             proc_name = NULL;
1340         }
1341         if( NULL != tmp_str ) {
1342             free(tmp_str);
1343             tmp_str = NULL;
1344         }
1345 
1346         (handle_info->num_procs_synced)++;
1347     }
1348 
1349     if( !orte_sstore_stage_skip_filem && 0 < opal_list_get_size(&(filem_request->file_sets)) ) {
1350         /*
1351          * Start to pull the files to global storage
1352          */
1353         OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1354                              "sstore:stage:(global): push(): Pulling remote files from %s (%3d of %3d done)",
1355                              ORTE_NAME_PRINT(peer),
1356                              handle_info->num_procs_synced,
1357                              handle_info->num_procs_total));
1358         opal_list_append(handle_info->filem_requests, &(filem_request->super));
1359         if(ORTE_SUCCESS != (ret = orte_filem.get_nb(filem_request)) ) {
1360             ORTE_ERROR_LOG(ret);
1361             exit_status = ret;
1362             goto cleanup;
1363         }
1364     }
1365 
1366  cleanup:
1367     if( NULL != crs_comp ) {
1368         free(crs_comp);
1369         crs_comp = NULL;
1370     }
1371     if( NULL != compress_comp ) {
1372         free(compress_comp);
1373         compress_comp = NULL;
1374     }
1375     if( NULL != compress_postfix ) {
1376         free(compress_postfix);
1377         compress_postfix = NULL;
1378     }
1379     if( NULL != proc_name ) {
1380         free(proc_name);
1381         proc_name = NULL;
1382     }
1383     if( NULL != tmp_str ) {
1384         free(tmp_str);
1385         tmp_str = NULL;
1386     }
1387 
1388     return exit_status;
1389 }
1390 
process_local_done(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_global_snapshot_info_t * handle_info)1391 static int process_local_done(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info)
1392 {
1393     int ret, exit_status = ORTE_SUCCESS;
1394     orte_std_cntr_t count;
1395     size_t num_entries;
1396 
1397     /*
1398      * Unpack the data
1399      */
1400     count = 1;
1401     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_entries, &count, OPAL_SIZE))) {
1402         ORTE_ERROR_LOG(ret);
1403         exit_status = ret;
1404         goto cleanup;
1405     }
1406 
1407     (handle_info->num_procs_done) += (int)num_entries;
1408 
1409     SSTORE_STAGE_REPORT_PROGRESS(handle_info);
1410 
1411     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1412                          "sstore:stage:(global): done(): [Peer %s] Moved %d files (%3d of %3d reported as done)",
1413                          ORTE_NAME_PRINT(peer),
1414                          (int)num_entries,
1415                          handle_info->num_procs_done,
1416                          handle_info->num_procs_total));
1417 
1418  cleanup:
1419     return exit_status;
1420 }
1421 
init_global_snapshot_directory(orte_sstore_stage_global_snapshot_info_t * handle_info)1422 static int init_global_snapshot_directory(orte_sstore_stage_global_snapshot_info_t *handle_info)
1423 {
1424     int ret, exit_status = ORTE_SUCCESS;
1425     char * dir_name = NULL;
1426     mode_t my_mode = S_IRWXU;
1427 
1428     /*
1429      * Make the snapshot directory from the uniq_global_snapshot_name
1430      */
1431     asprintf(&dir_name, "%s/%s",
1432              handle_info->base_location,
1433              handle_info->local_location);
1434     if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(dir_name, my_mode)) ) {
1435         ORTE_ERROR_LOG(ret);
1436         exit_status = ret;
1437         goto cleanup;
1438     }
1439 
1440     /*
1441      * Open up the metadata file
1442      */
1443     if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1444         ORTE_ERROR_LOG(ret);
1445         exit_status = ret;
1446         goto cleanup;
1447     }
1448 
1449  cleanup:
1450     if(NULL != dir_name) {
1451         free(dir_name);
1452         dir_name = NULL;
1453     }
1454 
1455     return exit_status;
1456 }
1457 
wait_all_filem(orte_sstore_stage_global_snapshot_info_t * handle_info)1458 static int wait_all_filem(orte_sstore_stage_global_snapshot_info_t *handle_info)
1459 {
1460     int ret, exit_status = ORTE_SUCCESS;
1461     opal_list_item_t* item = NULL;
1462 
1463     if( orte_sstore_stage_skip_filem ) {
1464         return exit_status;
1465     }
1466 
1467     /*
1468      * Wait for all the transfers to complete
1469      */
1470     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1471                          "sstore:stage:(global): wait_all_filem(): Waiting on all outstanding FileM requests (%d)",
1472                          (int)opal_list_get_size(handle_info->filem_requests) ));
1473 
1474     if(ORTE_SUCCESS != (ret = orte_filem.wait_all(handle_info->filem_requests)) ) {
1475         ORTE_ERROR_LOG(ret);
1476         exit_status = ret;
1477         goto cleanup;
1478     }
1479 
1480     /*
1481      * Remove the data on the remote side
1482      */
1483     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1484                          "sstore:stage:(global): wait_all_filem(): Removing all local files"));
1485     if( ORTE_SUCCESS != (ret = xcast_remove_all(handle_info))) {
1486         ORTE_ERROR_LOG(ret);
1487         exit_status = ret;
1488         goto cleanup;
1489     }
1490 
1491     /*
1492      * Touch all local checkpoints
1493      */
1494     sync_global_dir(handle_info);
1495 
1496     /*
1497      * Wait for the removal to complete
1498      */
1499     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1500                          "sstore:stage:(global): wait_all_filem(): Waiting for remove to finish..."));
1501     while(handle_info->num_procs_done < handle_info->num_procs_total) {
1502         opal_progress();
1503     }
1504 
1505     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1506                          "sstore:stage:(global): wait_all_filem(): All files have been transfered"));
1507 
1508  cleanup:
1509     while (NULL != (item = opal_list_remove_first(handle_info->filem_requests) ) ) {
1510         OBJ_RELEASE(item);
1511     }
1512     OBJ_DESTRUCT(handle_info->filem_requests);
1513 
1514     return exit_status;
1515 }
1516 
xcast_remove_all(orte_sstore_stage_global_snapshot_info_t * handle_info)1517 static int xcast_remove_all(orte_sstore_stage_global_snapshot_info_t *handle_info)
1518 {
1519     int ret, exit_status = ORTE_SUCCESS;
1520     opal_buffer_t *loc_buffer = NULL;
1521     orte_sstore_stage_cmd_flag_t command;
1522     orte_grpcomm_signature_t *sig;
1523 
1524     handle_info->num_procs_done = 0;
1525 
1526     loc_buffer = OBJ_NEW(opal_buffer_t);
1527 
1528     command = ORTE_SSTORE_STAGE_REMOVE;
1529     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1530         ORTE_ERROR_LOG(ret);
1531         exit_status = ret;
1532         goto cleanup;
1533     }
1534 
1535     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1536         ORTE_ERROR_LOG(ret);
1537         exit_status = ret;
1538         goto cleanup;
1539     }
1540 
1541     /* goes to all daemons */
1542     sig = OBJ_NEW(orte_grpcomm_signature_t);
1543     sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
1544     sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
1545     sig->signature[0].vpid = ORTE_VPID_WILDCARD;
1546     if (ORTE_SUCCESS != (ret = orte_grpcomm.xcast(sig, ORTE_RML_TAG_SSTORE_INTERNAL, loc_buffer))) {
1547         ORTE_ERROR_LOG(ret);
1548         exit_status = ret;
1549         goto cleanup;
1550     }
1551 
1552     /* loc_buffer should not be released here; the callback releases it */
1553     loc_buffer = NULL;
1554 
1555  cleanup:
1556     if (NULL != loc_buffer) {
1557         OBJ_RELEASE(loc_buffer);
1558         loc_buffer = NULL;
1559     }
1560 
1561     OBJ_RELEASE(sig);
1562 
1563     return exit_status;
1564 }
1565 
1566 /**************************
1567  * Metadata functions
1568  **************************/
metadata_open(orte_sstore_stage_global_snapshot_info_t * handle_info)1569 static int metadata_open(orte_sstore_stage_global_snapshot_info_t * handle_info)
1570 {
1571     /* If already open, then just return */
1572     if( NULL != handle_info->metadata ) {
1573         return ORTE_SUCCESS;
1574     }
1575 
1576     if (NULL == (handle_info->metadata = fopen(handle_info->metadata_filename, "a")) ) {
1577         opal_output(orte_sstore_base_framework.framework_output,
1578                     "sstore:stage:(global):init_dir() Unable to open the file (%s)\n",
1579                     handle_info->metadata_filename);
1580         ORTE_ERROR_LOG(ORTE_ERROR);
1581         return ORTE_ERROR;
1582    }
1583 
1584    return ORTE_SUCCESS;
1585 }
1586 
metadata_close(orte_sstore_stage_global_snapshot_info_t * handle_info)1587 static int metadata_close(orte_sstore_stage_global_snapshot_info_t * handle_info)
1588 {
1589     /* If already closed, then just return */
1590     if( NULL == handle_info->metadata ) {
1591         return ORTE_SUCCESS;
1592     }
1593 
1594     fclose(handle_info->metadata);
1595     handle_info->metadata = NULL;
1596 
1597     return ORTE_SUCCESS;
1598 }
1599 
metadata_write_int(orte_sstore_stage_global_snapshot_info_t * handle_info,char * key,int value)1600 static int metadata_write_int(orte_sstore_stage_global_snapshot_info_t * handle_info, char *key, int value)
1601 {
1602     int ret, exit_status = ORTE_SUCCESS;
1603 
1604     /* Make sure the metadata file is open */
1605     if( NULL == handle_info->metadata ) {
1606         if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1607             ORTE_ERROR_LOG(ret);
1608             exit_status = ret;
1609             goto cleanup;
1610         }
1611     }
1612 
1613     fprintf(handle_info->metadata, "%s%d\n", key, value);
1614 
1615  cleanup:
1616     return exit_status;
1617 }
1618 
metadata_write_str(orte_sstore_stage_global_snapshot_info_t * handle_info,char * key,char * value)1619 static int metadata_write_str(orte_sstore_stage_global_snapshot_info_t * handle_info, char *key, char *value)
1620 {
1621     int ret, exit_status = ORTE_SUCCESS;
1622 
1623     /* Make sure the metadata file is open */
1624     if( NULL == handle_info->metadata ) {
1625         if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1626             ORTE_ERROR_LOG(ret);
1627             exit_status = ret;
1628             goto cleanup;
1629         }
1630     }
1631 
1632     fprintf(handle_info->metadata, "%s%s\n", key, value);
1633 
1634  cleanup:
1635     return exit_status;
1636 }
1637 
metadata_write_timestamp(orte_sstore_stage_global_snapshot_info_t * handle_info)1638 static int metadata_write_timestamp(orte_sstore_stage_global_snapshot_info_t * handle_info)
1639 {
1640     int ret, exit_status = ORTE_SUCCESS;
1641     time_t timestamp;
1642 
1643     /* Make sure the metadata file is open */
1644     if( NULL == handle_info->metadata ) {
1645         if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1646             ORTE_ERROR_LOG(ret);
1647             exit_status = ret;
1648             goto cleanup;
1649         }
1650     }
1651 
1652     timestamp = time(NULL);
1653     fprintf(handle_info->metadata, "%s%s",
1654             SSTORE_METADATA_INTERNAL_TIME_STR,
1655             ctime(&timestamp));
1656 
1657  cleanup:
1658     return exit_status;
1659 }
1660 
orte_sstore_stage_extract_global_metadata(orte_sstore_stage_global_snapshot_info_t * handle_info,orte_sstore_base_global_snapshot_info_t * global_snapshot)1661 static int orte_sstore_stage_extract_global_metadata(orte_sstore_stage_global_snapshot_info_t * handle_info,
1662                                                      orte_sstore_base_global_snapshot_info_t *global_snapshot)
1663 {
1664     int exit_status = ORTE_SUCCESS;
1665     orte_sstore_base_local_snapshot_info_t *vpid_snapshot = NULL;
1666     opal_list_item_t* item = NULL;
1667     int i = 0;
1668 
1669     /*
1670      * Cleanup the structure a bit, so we can refresh it below
1671      */
1672     while (NULL != (item = opal_list_remove_first(&global_snapshot->local_snapshots))) {
1673         OBJ_RELEASE(item);
1674     }
1675 
1676     if( NULL != global_snapshot->start_time ) {
1677         free( global_snapshot->start_time );
1678         global_snapshot->start_time = NULL;
1679     }
1680 
1681     if( NULL != global_snapshot->end_time ) {
1682         free( global_snapshot->end_time );
1683         global_snapshot->end_time = NULL;
1684     }
1685 
1686     /*
1687      * Create a structure for each application process
1688      */
1689     for(i = 0; i < handle_info->num_procs_total; ++i) {
1690         vpid_snapshot = OBJ_NEW(orte_sstore_base_local_snapshot_info_t);
1691         vpid_snapshot->ss_handle = handle_info->id;
1692 
1693         vpid_snapshot->process_name.jobid  = handle_info->jobid;
1694         vpid_snapshot->process_name.vpid   = i;
1695 
1696         /* JJH: Currently we do not have this information since we do not save
1697          * individual vpid info in the Global SStore. It is in the metadata
1698          * though.
1699          */
1700         vpid_snapshot->crs_comp      = NULL;
1701         if( NULL != handle_info->compress_comp ) {
1702             vpid_snapshot->compress_comp = strdup(handle_info->compress_comp);
1703         } else {
1704             vpid_snapshot->compress_comp = NULL;
1705         }
1706         if( NULL != handle_info->compress_postfix ) {
1707             vpid_snapshot->compress_postfix = strdup(handle_info->compress_postfix);
1708         } else {
1709             vpid_snapshot->compress_postfix = NULL;
1710         }
1711         vpid_snapshot->start_time = NULL;
1712         vpid_snapshot->end_time   = NULL;
1713 
1714         opal_list_append(&global_snapshot->local_snapshots, &(vpid_snapshot->super));
1715     }
1716 
1717     return exit_status;
1718 }
1719 
stage_snapshot_sort_compare_fn(opal_list_item_t ** a,opal_list_item_t ** b)1720 static int stage_snapshot_sort_compare_fn(opal_list_item_t **a,
1721                                           opal_list_item_t **b)
1722 {
1723     orte_sstore_base_local_snapshot_info_t *snap_a, *snap_b;
1724 
1725     snap_a = (orte_sstore_base_local_snapshot_info_t*)(*a);
1726     snap_b = (orte_sstore_base_local_snapshot_info_t*)(*b);
1727 
1728     if( snap_a->process_name.vpid > snap_b->process_name.vpid ) {
1729         return 1;
1730     }
1731     else if( snap_a->process_name.vpid == snap_b->process_name.vpid ) {
1732         return 0;
1733     }
1734     else {
1735         return -1;
1736     }
1737 }
1738 
sstore_stage_report_progress(orte_sstore_stage_global_snapshot_info_t * handle_info)1739 static void sstore_stage_report_progress(orte_sstore_stage_global_snapshot_info_t *handle_info)
1740 {
1741     double perc_done;
1742 
1743     perc_done = (handle_info->num_procs_total - handle_info->num_procs_done);
1744     perc_done = perc_done / (1.0 * handle_info->num_procs_total);
1745     perc_done = (perc_done-1)*(-100.0);
1746 
1747     if( perc_done >= (handle_info->last_progress_report + orte_sstore_stage_progress_meter ) ||
1748         handle_info->last_progress_report == 0.0 ) {
1749         handle_info->last_progress_report = perc_done;
1750         opal_output(0, "sstore:stage: progress: %10.2f %c Finished\n",
1751                     perc_done, '%');
1752     }
1753 
1754     return;
1755 }
1756