1 /*
2  * Copyright (c)      2011 The Trustees of Indiana University.
3  *                         All rights reserved.
4  * Copyright (c) 2004-2011 The University of Tennessee and The University
5  *                         of Tennessee Research Foundation.  All rights
6  *                         reserved.
7  * $COPYRIGHT$
8  *
9  * Additional copyrights may follow
10  *
11  * $HEADER$
12  */
13 
14 /*
15  *
16  */
17 
18 #include "orte_config.h"
19 
20 #include <string.h>
21 #include <stdlib.h>
22 #include <sys/types.h>
23 #include <sys/stat.h>
24 #include <sys/wait.h>
25 #ifdef HAVE_UNISTD_H
26 #include <unistd.h>
27 #endif  /* HAVE_UNISTD_H */
28 
29 #include "orte/mca/mca.h"
30 #include "opal/mca/base/base.h"
31 
32 #include "opal/mca/event/event.h"
33 
34 #include "orte/constants.h"
35 #include "orte/util/show_help.h"
36 #include "opal/util/argv.h"
37 #include "opal/util/output.h"
38 #include "opal/util/opal_environ.h"
39 #include "opal/util/basename.h"
40 #include "opal/util/os_dirpath.h"
41 
42 #include "opal/threads/mutex.h"
43 #include "opal/threads/condition.h"
44 
45 #include "orte/util/name_fns.h"
46 #include "orte/util/proc_info.h"
47 #include "orte/runtime/orte_globals.h"
48 #include "orte/runtime/orte_wait.h"
49 #include "orte/mca/errmgr/errmgr.h"
50 #include "orte/mca/ess/ess.h"
51 #include "orte/mca/rml/rml.h"
52 #include "orte/mca/rml/rml_types.h"
53 #include "orte/mca/snapc/snapc.h"
54 #include "orte/mca/snapc/base/base.h"
55 
56 #include "orte/mca/sstore/sstore.h"
57 #include "orte/mca/sstore/base/base.h"
58 
59 #include "sstore_central.h"
60 
61 #define SSTORE_HANDLE_TYPE_NONE    0
62 #define SSTORE_HANDLE_TYPE_CKPT    1
63 #define SSTORE_HANDLE_TYPE_RESTART 2
64 
65 #define SSTORE_GLOBAL_NONE    0
66 #define SSTORE_GLOBAL_ERROR   1
67 #define SSTORE_GLOBAL_INIT    2
68 #define SSTORE_GLOBAL_REG     3
69 #define SSTORE_GLOBAL_SYNCING 4
70 #define SSTORE_GLOBAL_SYNCED  5
71 
72 /**********
73  * Object Stuff
74  **********/
75 struct  orte_sstore_central_global_snapshot_info_t {
76     /** List super object */
77     opal_list_item_t super;
78 
79     /** */
80     orte_sstore_base_handle_t id;
81 
82     /** Job ID */
83     orte_jobid_t jobid;
84 
85     /** State */
86     int state;
87 
88     /** Handle type */
89     int handle_type;
90 
91     /** Sequence Number */
92     int seq_num;
93 
94     /** Reference Name */
95     char * ref_name;
96 
97     /** Local Location (Relative Path to base_location) */
98     char * local_location;
99 
100     /** Application location format */
101     char * app_location_fmt;
102 
103     /** Base location */
104     char * base_location;
105 
106     /** Metadata File Name */
107     char *metadata_filename;
108 
109     /** Metadata File Descriptor */
110     FILE *metadata;
111 
112     /** Num procs in job */
113     int num_procs_total;
114 
115     /** Num procs synced */
116     int num_procs_synced;
117 
118     /** Is this checkpoint representing a migration? */
119     bool migrating;
120 };
121 typedef struct orte_sstore_central_global_snapshot_info_t orte_sstore_central_global_snapshot_info_t;
122 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_central_global_snapshot_info_t);
123 
124 void orte_sstore_central_global_snapshot_info_construct(orte_sstore_central_global_snapshot_info_t *info);
125 void orte_sstore_central_global_snapshot_info_destruct( orte_sstore_central_global_snapshot_info_t *info);
126 
127 OBJ_CLASS_INSTANCE(orte_sstore_central_global_snapshot_info_t,
128                    opal_list_item_t,
129                    orte_sstore_central_global_snapshot_info_construct,
130                    orte_sstore_central_global_snapshot_info_destruct);
131 
132 
133 /**********
134  * Local Function and Variable Declarations
135  **********/
136 static bool is_global_listener_active = false;
137 static int sstore_central_global_start_listener(void);
138 static int sstore_central_global_stop_listener(void);
139 static void sstore_central_global_recv(int status,
140                                        orte_process_name_t* sender,
141                                        opal_buffer_t* buffer,
142                                        orte_rml_tag_t tag,
143                                        void* cbdata);
144 static int process_local_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_global_snapshot_info_t *handle_info);
145 static int process_local_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_global_snapshot_info_t *handle_info);
146 
147 static orte_sstore_central_global_snapshot_info_t *create_new_handle_info(int seq, int type, orte_jobid_t jobid);
148 static orte_sstore_central_global_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle);
149 static orte_sstore_central_global_snapshot_info_t *find_handle_info_from_ref(char *ref, int seq);
150 
151 static int metadata_open(orte_sstore_central_global_snapshot_info_t * handle_info);
152 static int metadata_close(orte_sstore_central_global_snapshot_info_t * handle_info);
153 static int metadata_write_int(orte_sstore_central_global_snapshot_info_t * handle_info, char * key, int value);
154 static int metadata_write_str(orte_sstore_central_global_snapshot_info_t * handle_info, char * key, char *value);
155 static int metadata_write_timestamp(orte_sstore_central_global_snapshot_info_t * handle_info);
156 
157 static int init_global_snapshot_directory(orte_sstore_central_global_snapshot_info_t *handle_info);
158 static int central_snapshot_sort_compare_fn(opal_list_item_t **a,
159                                             opal_list_item_t **b);
160 static int orte_sstore_central_extract_global_metadata(orte_sstore_central_global_snapshot_info_t * handle_info,
161                                                        orte_sstore_base_global_snapshot_info_t *global_snapshot);
162 
163 static int next_handle_id = 1;
164 
165 static opal_list_t *active_handles = NULL;
166 
167 /**********
168  * Object stuff
169  **********/
orte_sstore_central_global_snapshot_info_construct(orte_sstore_central_global_snapshot_info_t * info)170 void orte_sstore_central_global_snapshot_info_construct(orte_sstore_central_global_snapshot_info_t *info)
171 {
172     info->id      = next_handle_id;
173     next_handle_id++;
174 
175     info->jobid = ORTE_JOBID_INVALID;
176 
177     info->state = SSTORE_GLOBAL_NONE;
178 
179     info->handle_type = SSTORE_HANDLE_TYPE_NONE;
180 
181     info->seq_num = -1;
182 
183     info->base_location  = strdup(orte_sstore_base_global_snapshot_dir);
184 
185     info->ref_name       = NULL;
186     info->local_location = NULL;
187     info->app_location_fmt = NULL;
188 
189     info->metadata_filename = NULL;
190     info->metadata = NULL;
191 
192     info->num_procs_total = 0;
193     info->num_procs_synced = 0;
194 
195     info->migrating = false;
196 }
197 
orte_sstore_central_global_snapshot_info_destruct(orte_sstore_central_global_snapshot_info_t * info)198 void orte_sstore_central_global_snapshot_info_destruct( orte_sstore_central_global_snapshot_info_t *info)
199 {
200     info->id      = 0;
201     info->seq_num = -1;
202 
203     info->jobid = ORTE_JOBID_INVALID;
204 
205     info->state = SSTORE_GLOBAL_NONE;
206 
207     info->handle_type = SSTORE_HANDLE_TYPE_NONE;
208 
209     if( NULL != info->ref_name ) {
210         free( info->ref_name );
211         info->ref_name  = NULL;
212     }
213 
214     if( NULL != info->local_location ) {
215         free( info->local_location );
216         info->local_location = NULL;
217     }
218 
219     if( NULL != info->app_location_fmt ) {
220         free( info->app_location_fmt );
221         info->app_location_fmt = NULL;
222     }
223 
224     if( NULL != info->base_location ) {
225         free( info->base_location );
226         info->base_location = NULL;
227     }
228 
229     if( NULL != info->metadata_filename ) {
230         free( info->metadata_filename ) ;
231         info->metadata_filename = NULL;
232     }
233 
234     if( NULL != info->metadata ) {
235         fclose(info->metadata);
236         info->metadata = NULL;
237     }
238 
239     info->num_procs_total = 0;
240     info->num_procs_synced = 0;
241 
242     info->migrating = false;
243 }
244 
245 /******************
246  * Local functions
247  ******************/
orte_sstore_central_global_module_init(void)248 int orte_sstore_central_global_module_init(void)
249 {
250     int ret, exit_status = ORTE_SUCCESS;
251 
252     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
253                          "sstore:central:(global): init()"));
254 
255     if( NULL == active_handles ) {
256         active_handles = OBJ_NEW(opal_list_t);
257     }
258 
259     /*
260      * Setup a listener for the HNP/Apps
261      */
262     if( ORTE_SUCCESS != (ret = sstore_central_global_start_listener()) ) {
263         ORTE_ERROR_LOG(ret);
264         exit_status = ret;
265         goto cleanup;
266     }
267 
268     exit_status = orte_sstore_central_local_module_init();
269 
270  cleanup:
271     return exit_status;
272 }
273 
orte_sstore_central_global_module_finalize(void)274 int orte_sstore_central_global_module_finalize(void)
275 {
276     int ret, exit_status = ORTE_SUCCESS;
277 
278     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
279                          "sstore:central:(global): finalize()"));
280 
281     exit_status = orte_sstore_central_local_module_finalize();
282 
283     if( NULL != active_handles ) {
284         OBJ_RELEASE(active_handles);
285     }
286 
287     /*
288      * Shutdown the listener for the HNP/Apps
289      */
290     if( ORTE_SUCCESS != (ret = sstore_central_global_stop_listener()) ) {
291         ORTE_ERROR_LOG(ret);
292         exit_status = ret;
293         goto cleanup;
294     }
295 
296  cleanup:
297     return exit_status;
298 }
299 
orte_sstore_central_global_request_checkpoint_handle(orte_sstore_base_handle_t * handle,int seq,orte_jobid_t jobid)300 int orte_sstore_central_global_request_checkpoint_handle(orte_sstore_base_handle_t *handle, int seq, orte_jobid_t jobid)
301 {
302     int ret, exit_status = ORTE_SUCCESS;
303     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
304 
305     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
306                          "sstore:central:(global): request_checkpoint_handle()"));
307 
308     /*
309      * Construct a handle
310      *  - Associate all of the necessary information
311      */
312     handle_info = create_new_handle_info(seq, SSTORE_HANDLE_TYPE_CKPT, jobid);
313 
314     /*
315      * Create the global checkpoint directory
316      */
317     if( ORTE_SUCCESS != (ret = init_global_snapshot_directory(handle_info)) ) {
318         ORTE_ERROR_LOG(ret);
319         exit_status = ret;
320         goto cleanup;
321     }
322 
323     /*
324      * Return the handle
325      */
326     *handle = handle_info->id;
327 
328  cleanup:
329     return exit_status;
330 }
331 
orte_sstore_central_global_request_restart_handle(orte_sstore_base_handle_t * handle,char * basedir,char * ref,int seq,orte_sstore_base_global_snapshot_info_t * snapshot)332 int orte_sstore_central_global_request_restart_handle(orte_sstore_base_handle_t *handle, char *basedir,
333                                                       char *ref, int seq,
334                                                       orte_sstore_base_global_snapshot_info_t *snapshot)
335 {
336     int ret, exit_status = ORTE_SUCCESS;
337     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
338 
339     handle_info = find_handle_info_from_ref(ref, seq);
340     if( NULL == handle_info ) {
341         ret = ORTE_ERROR;
342         ORTE_ERROR_LOG(ret);
343         exit_status = ret;
344         goto cleanup;
345     }
346 
347     *handle = handle_info->id;
348 
349  cleanup:
350     return exit_status;
351 }
352 
orte_sstore_central_global_request_global_snapshot_data(orte_sstore_base_handle_t * handle,orte_sstore_base_global_snapshot_info_t * snapshot)353 int orte_sstore_central_global_request_global_snapshot_data(orte_sstore_base_handle_t *handle,
354                                                             orte_sstore_base_global_snapshot_info_t *snapshot)
355 {
356     int ret, exit_status = ORTE_SUCCESS;
357     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
358 
359     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
360                          "sstore:central:(global): request_global_snapshot_data()"));
361 
362     /*
363      * Lookup the handle (if NULL, use last stable)
364      */
365     if( NULL != handle ) {
366         handle_info = find_handle_info(*handle);
367         snapshot->ss_handle = *handle;
368     } else {
369         handle_info = find_handle_info(orte_sstore_handle_last_stable);
370         snapshot->ss_handle = orte_sstore_handle_last_stable;
371     }
372 
373     /*
374      * Construct the snapshot from local data, and metadata file
375      */
376     snapshot->seq_num   = handle_info->seq_num;
377     snapshot->reference = strdup(handle_info->ref_name);
378     snapshot->basedir   = strdup(handle_info->base_location);
379     snapshot->metadata_filename = strdup(handle_info->metadata_filename);
380 
381     /* If this is the current checkpoint, pull data from local cache */
382     if( orte_sstore_handle_current == snapshot->ss_handle ) {
383         if( ORTE_SUCCESS != (ret = orte_sstore_central_extract_global_metadata(handle_info, snapshot)) ) {
384             ORTE_ERROR_LOG(ret);
385             exit_status = ret;
386             goto cleanup;
387         }
388     }
389     /* Otherwise, pull from metadata */
390     else {
391         if( ORTE_SUCCESS != (ret = orte_sstore_base_extract_global_metadata(snapshot)) ) {
392             ORTE_ERROR_LOG(ret);
393             exit_status = ret;
394             goto cleanup;
395         }
396     }
397 
398     opal_list_sort(&snapshot->local_snapshots, central_snapshot_sort_compare_fn);
399 
400  cleanup:
401     return exit_status;
402 }
403 
orte_sstore_central_global_register(orte_sstore_base_handle_t handle)404 int orte_sstore_central_global_register(orte_sstore_base_handle_t handle)
405 {
406     int ret, exit_status = ORTE_SUCCESS;
407     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
408 
409     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
410                          "sstore:central:(global): register(%d) - Global", handle));
411 
412     /*
413      * Lookup the handle
414      */
415     handle_info = find_handle_info(handle);
416     if( SSTORE_GLOBAL_REG != handle_info->state ) {
417         handle_info->state = SSTORE_GLOBAL_REG;
418     } else {
419         return orte_sstore_central_local_register(handle);
420     }
421 
422     orte_sstore_handle_current = handle;
423 
424     /*
425      * Associate the metadata
426      */
427     if( handle_info->migrating ) {
428         if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
429                                                       SSTORE_METADATA_INTERNAL_MIG_SEQ_STR,
430                                                       handle_info->seq_num)) ) {
431             ORTE_ERROR_LOG(ret);
432             exit_status = ret;
433             goto cleanup;
434         }
435     } else {
436         if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
437                                                       SSTORE_METADATA_GLOBAL_SNAP_SEQ_STR,
438                                                       handle_info->seq_num)) ) {
439             ORTE_ERROR_LOG(ret);
440             exit_status = ret;
441             goto cleanup;
442         }
443     }
444 
445     if( ORTE_SUCCESS != (ret = metadata_write_str(handle_info,
446                                                   SSTORE_METADATA_LOCAL_SNAP_REF_FMT_STR,
447                                                   orte_sstore_base_local_snapshot_fmt)) ) {
448         ORTE_ERROR_LOG(ret);
449         exit_status = ret;
450         goto cleanup;
451     }
452 
453     if( ORTE_SUCCESS != (ret = metadata_write_timestamp(handle_info)) ) {
454         ORTE_ERROR_LOG(ret);
455         exit_status = ret;
456         goto cleanup;
457     }
458 
459  cleanup:
460     return exit_status;
461 }
462 
orte_sstore_central_global_get_attr(orte_sstore_base_handle_t handle,orte_sstore_base_key_t key,char ** value)463 int orte_sstore_central_global_get_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char **value)
464 {
465     int exit_status = ORTE_SUCCESS;
466     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
467 
468     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
469                          "sstore:central:(global): get_attr()"));
470 
471     /*
472      * Lookup the handle
473      */
474     handle_info = find_handle_info(handle);
475 
476     /*
477      * Access metadata
478      */
479     if( SSTORE_METADATA_GLOBAL_SNAP_REF == key ) {
480         *value = strdup(handle_info->ref_name);
481     }
482     else if( SSTORE_METADATA_GLOBAL_SNAP_SEQ == key ) {
483         asprintf(value, "%d", handle_info->seq_num);
484     }
485     else if( SSTORE_METADATA_LOCAL_SNAP_REF_FMT == key ) {
486         *value = strdup(orte_sstore_base_local_snapshot_fmt);
487     }
488     /* 'central' does not cache, so these are the same */
489     else if( SSTORE_METADATA_LOCAL_SNAP_LOC       == key ) {
490         asprintf(value, "%s/%s/%d",
491                  handle_info->base_location,
492                  handle_info->ref_name,
493                  handle_info->seq_num);
494     }
495     else if( SSTORE_METADATA_LOCAL_SNAP_REF_LOC_FMT == key ) {
496         asprintf(value, "%s/%s/%d/%s",
497                  handle_info->base_location,
498                  handle_info->ref_name,
499                  handle_info->seq_num,
500                  orte_sstore_base_local_snapshot_fmt);
501     }
502     else {
503         exit_status = ORTE_ERR_NOT_SUPPORTED;
504     }
505 
506     return exit_status;
507 }
508 
orte_sstore_central_global_set_attr(orte_sstore_base_handle_t handle,orte_sstore_base_key_t key,char * value)509 int orte_sstore_central_global_set_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char *value)
510 {
511     int ret, exit_status = ORTE_SUCCESS;
512     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
513     char *key_str = NULL;
514 
515     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
516                          "sstore:central:(global): set_attr()"));
517 
518     /*
519      * Lookup the handle
520      */
521     handle_info = find_handle_info(handle);
522 
523     /*
524      * Process key (Access metadata)
525      */
526     if( key == SSTORE_METADATA_GLOBAL_MIGRATING ) {
527         handle_info->migrating = true;
528     }
529     else {
530         orte_sstore_base_convert_key_to_string(key, &key_str);
531         if( NULL == key_str ) {
532             ORTE_ERROR_LOG(ORTE_ERROR);
533             exit_status = ORTE_ERROR;
534             goto cleanup;
535         }
536 
537         if( ORTE_SUCCESS != (ret = metadata_write_str(handle_info, key_str, value))) {
538             ORTE_ERROR_LOG(ret);
539             exit_status = ret;
540             goto cleanup;
541         }
542     }
543 
544  cleanup:
545     if( NULL != key_str ) {
546         free(key_str);
547         key_str = NULL;
548     }
549 
550     return exit_status;
551 }
552 
orte_sstore_central_global_sync(orte_sstore_base_handle_t handle)553 int orte_sstore_central_global_sync(orte_sstore_base_handle_t handle)
554 {
555     int ret, exit_status = ORTE_SUCCESS;
556     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
557 
558     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
559                          "sstore:central:(global): sync()"));
560 
561     /*
562      * Lookup the handle
563      */
564     handle_info = find_handle_info(handle);
565     if( SSTORE_GLOBAL_SYNCING != handle_info->state ) {
566         handle_info->state = SSTORE_GLOBAL_SYNCING;
567         if( ORTE_SNAPC_LOCAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_LOCAL_COORD_TYPE) ) {
568             return orte_sstore_central_local_sync(handle);
569         }
570     }
571 
572     /*
573      * Synchronize all of the files
574      */
575     while(handle_info->num_procs_synced < handle_info->num_procs_total) {
576         opal_progress();
577     }
578 
579     /*
580      * Finalize and close the metadata
581      */
582     if( ORTE_SUCCESS != (ret = metadata_write_timestamp(handle_info)) ) {
583         ORTE_ERROR_LOG(ret);
584         exit_status = ret;
585         goto cleanup;
586     }
587 
588     if( handle_info->migrating ) {
589         if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
590                                                       SSTORE_METADATA_INTERNAL_DONE_MIG_SEQ_STR,
591                                                       handle_info->seq_num)) ) {
592             ORTE_ERROR_LOG(ret);
593             exit_status = ret;
594             goto cleanup;
595         }
596     } else {
597         if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
598                                                       SSTORE_METADATA_INTERNAL_DONE_SEQ_STR,
599                                                       handle_info->seq_num)) ) {
600             ORTE_ERROR_LOG(ret);
601             exit_status = ret;
602             goto cleanup;
603         }
604     }
605 
606     if( ORTE_SUCCESS != (ret = metadata_close(handle_info)) ) {
607         ORTE_ERROR_LOG(ret);
608         exit_status = ret;
609         goto cleanup;
610     }
611 
612     /* JJH: We should lock this var! */
613     if( !handle_info->migrating ) {
614         orte_sstore_base_is_checkpoint_available = true;
615         orte_sstore_handle_last_stable = orte_sstore_handle_current;
616     }
617 
618  cleanup:
619     return exit_status;
620 }
621 
orte_sstore_central_global_remove(orte_sstore_base_handle_t handle)622 int orte_sstore_central_global_remove(orte_sstore_base_handle_t handle)
623 {
624     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
625                          "sstore:central:(global): remove()"));
626 
627     /*
628      * Lookup the handle
629      */
630 
631     return ORTE_SUCCESS;
632 }
633 
orte_sstore_central_global_pack(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_base_handle_t handle)634 int orte_sstore_central_global_pack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t handle)
635 {
636     int ret, exit_status = ORTE_SUCCESS;
637     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
638 
639     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
640                          "sstore:central:(global): pack()"));
641 
642     /*
643      * Lookup the handle
644      */
645     handle_info = find_handle_info(handle);
646 
647     /*
648      * Pack the handle ID
649      */
650     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &handle, 1, ORTE_SSTORE_HANDLE))) {
651         ORTE_ERROR_LOG(ret);
652         exit_status = ret;
653         goto cleanup;
654     }
655 
656     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
657                          "sstore:central:(global): pack(%d, %d, %s)",
658                          handle_info->id,
659                          handle_info->seq_num,
660                          handle_info->ref_name));
661 
662     /*
663      * Pack any metadata
664      */
665     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->seq_num), 1, OPAL_INT )) ) {
666         ORTE_ERROR_LOG(ret);
667         exit_status = ret;
668         goto cleanup;
669     }
670 
671     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->ref_name), 1, OPAL_STRING )) ) {
672         ORTE_ERROR_LOG(ret);
673         exit_status = ret;
674         goto cleanup;
675     }
676 
677     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->app_location_fmt), 1, OPAL_STRING )) ) {
678         ORTE_ERROR_LOG(ret);
679         exit_status = ret;
680         goto cleanup;
681     }
682 
683  cleanup:
684     return exit_status;
685 }
686 
orte_sstore_central_global_unpack(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_base_handle_t * handle)687 int orte_sstore_central_global_unpack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t *handle)
688 {
689     int ret, exit_status = ORTE_SUCCESS;
690 
691     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
692                          "sstore:central:(global): unpack()"));
693 
694     /*
695      * Unpack the handle id
696      */
697     if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_JOBID,
698                                                    ORTE_PROC_MY_NAME,
699                                                    peer)) {
700         /*
701          * Differ to the orted version, so if we have application then they get updated too
702          */
703         if( ORTE_SUCCESS != (ret = orte_sstore_central_local_unpack(peer, buffer, handle)) ) {
704             ORTE_ERROR_LOG(ret);
705             exit_status = ret;
706             goto cleanup;
707         }
708     }
709 
710  cleanup:
711     return exit_status;
712 }
713 
714 /**************************
715  * Local functions
716  **************************/
create_new_handle_info(int seq,int type,orte_jobid_t jobid)717 static orte_sstore_central_global_snapshot_info_t *create_new_handle_info(int seq, int type, orte_jobid_t jobid)
718 {
719     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
720     orte_job_t *jdata = NULL;
721 
722     handle_info = OBJ_NEW(orte_sstore_central_global_snapshot_info_t);
723 
724     handle_info->jobid = jobid;
725 
726     handle_info->state = SSTORE_GLOBAL_INIT;
727 
728     handle_info->handle_type = type;
729 
730     handle_info->seq_num = seq;
731 
732     orte_sstore_base_get_global_snapshot_ref(&(handle_info->ref_name), getpid());
733 
734     asprintf(&(handle_info->local_location), "%s/%d",
735              handle_info->ref_name, handle_info->seq_num);
736 
737     asprintf(&(handle_info->app_location_fmt), "%s/%s/%s",
738              handle_info->base_location,
739              handle_info->local_location,
740              orte_sstore_base_local_snapshot_fmt);
741 
742     asprintf(&(handle_info->metadata_filename), "%s/%s/%s",
743              handle_info->base_location,
744              handle_info->ref_name,
745              orte_sstore_base_global_metadata_filename);
746 
747     jdata = orte_get_job_data_object(handle_info->jobid);
748     handle_info->num_procs_total = (int)jdata->num_procs;
749 
750     opal_list_append(active_handles, &(handle_info->super));
751 
752     return handle_info;
753 }
754 
find_handle_info(orte_sstore_base_handle_t handle)755 static orte_sstore_central_global_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle)
756 {
757     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
758     opal_list_item_t* item = NULL;
759 
760     for(item  = opal_list_get_first(active_handles);
761         item != opal_list_get_end(active_handles);
762         item  = opal_list_get_next(item) ) {
763         handle_info = (orte_sstore_central_global_snapshot_info_t*)item;
764 
765         if( handle_info->id == handle ) {
766             return handle_info;
767         }
768     }
769 
770     return NULL;
771 }
772 
find_handle_info_from_ref(char * ref,int seq)773 static orte_sstore_central_global_snapshot_info_t *find_handle_info_from_ref(char *ref, int seq)
774 {
775     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
776     opal_list_item_t* item = NULL;
777 
778     for(item  = opal_list_get_first(active_handles);
779         item != opal_list_get_end(active_handles);
780         item  = opal_list_get_next(item) ) {
781         handle_info = (orte_sstore_central_global_snapshot_info_t*)item;
782 
783         if( handle_info->seq_num == seq ) {
784             if( NULL != ref &&
785                 strncmp(handle_info->ref_name, ref, strlen(ref)) ) {
786                 return handle_info;
787             } else {
788                 return handle_info;
789             }
790         }
791     }
792 
793     return NULL;
794 }
795 
sstore_central_global_start_listener(void)796 static int sstore_central_global_start_listener(void)
797 {
798     if( is_global_listener_active ) {
799         return ORTE_SUCCESS;
800     }
801 
802     orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL,
803                             ORTE_RML_PERSISTENT, sstore_central_global_recv, NULL);
804 
805     is_global_listener_active = true;
806     return ORTE_SUCCESS;
807 }
808 
sstore_central_global_stop_listener(void)809 static int sstore_central_global_stop_listener(void)
810 {
811     orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL);
812 
813     is_global_listener_active = false;
814     return ORTE_SUCCESS;
815 }
816 
sstore_central_global_recv(int status,orte_process_name_t * sender,opal_buffer_t * buffer,orte_rml_tag_t tag,void * cbdata)817 static void sstore_central_global_recv(int status,
818                                        orte_process_name_t* sender,
819                                        opal_buffer_t* buffer,
820                                        orte_rml_tag_t tag,
821                                        void* cbdata)
822 {
823     int ret;
824     orte_sstore_central_cmd_flag_t command;
825     orte_std_cntr_t count;
826     orte_sstore_base_handle_t loc_id;
827     orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
828 
829     if( ORTE_RML_TAG_SSTORE_INTERNAL != tag ) {
830         return;
831     }
832 
833     /*
834      * If this was an application process contacting us, then act like an orted
835      * instead of an HNP
836      */
837     if(OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_JOBID,
838                                                    ORTE_PROC_MY_NAME,
839                                                    sender)) {
840         orte_sstore_central_local_recv(status, sender, buffer, tag, cbdata);
841         return;
842     }
843 
844 
845     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
846                          "sstore:central:(global): process_cmd(%s)",
847                          ORTE_NAME_PRINT(sender)));
848 
849     count = 1;
850     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_CENTRAL_CMD))) {
851         ORTE_ERROR_LOG(ret);
852         goto cleanup;
853     }
854 
855     count = 1;
856     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE )) ) {
857         ORTE_ERROR_LOG(ret);
858         goto cleanup;
859     }
860 
861     /*
862      * Find the referenced handle
863      */
864     if(NULL == (handle_info = find_handle_info(loc_id)) ) {
865         ; /* JJH big problem */
866     }
867 
868     /*
869      * Process the command
870      */
871     if( ORTE_SSTORE_CENTRAL_PULL == command ) {
872         process_local_pull(sender, buffer, handle_info);
873     }
874     else if( ORTE_SSTORE_CENTRAL_PUSH == command ) {
875         process_local_push(sender, buffer, handle_info);
876     }
877 
878  cleanup:
879     return;
880 }
881 
process_local_pull(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_central_global_snapshot_info_t * handle_info)882 static int process_local_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_global_snapshot_info_t *handle_info)
883 {
884     int ret, exit_status = ORTE_SUCCESS;
885     opal_buffer_t *loc_buffer = NULL;
886     orte_sstore_central_cmd_flag_t command;
887 
888     /*
889      * Push back the requested information
890      */
891     loc_buffer = OBJ_NEW(opal_buffer_t);
892 
893     command = ORTE_SSTORE_CENTRAL_PUSH;
894     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD))) {
895         ORTE_ERROR_LOG(ret);
896         exit_status = ret;
897         goto cleanup;
898     }
899 
900     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
901         ORTE_ERROR_LOG(ret);
902         exit_status = ret;
903         goto cleanup;
904     }
905 
906     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->seq_num), 1, OPAL_INT))) {
907         ORTE_ERROR_LOG(ret);
908         exit_status = ret;
909         goto cleanup;
910     }
911 
912     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->ref_name), 1, OPAL_STRING))) {
913         ORTE_ERROR_LOG(ret);
914         exit_status = ret;
915         goto cleanup;
916     }
917 
918     if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->app_location_fmt), 1, OPAL_STRING))) {
919         ORTE_ERROR_LOG(ret);
920         exit_status = ret;
921         goto cleanup;
922     }
923 
924     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
925                                                        orte_rml_send_callback, NULL))) {
926         ORTE_ERROR_LOG(ret);
927         exit_status = ret;
928         goto cleanup;
929     }
930     /* loc_buffer should not be released here; the callback releases it */
931     loc_buffer = NULL;
932 
933  cleanup:
934     if (NULL != loc_buffer) {
935         OBJ_RELEASE(loc_buffer);
936         loc_buffer = NULL;
937     }
938 
939     return exit_status;
940 }
941 
process_local_push(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_central_global_snapshot_info_t * handle_info)942 static int process_local_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_global_snapshot_info_t *handle_info)
943 {
944     int ret, exit_status = ORTE_SUCCESS;
945     orte_std_cntr_t count;
946     size_t num_entries, i;
947     orte_process_name_t name;
948     bool ckpt_skipped = false;
949     char * crs_comp = NULL;
950     char * proc_name = NULL;
951 
952     /*
953      * Unpack the data
954      */
955     count = 1;
956     if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_entries, &count, OPAL_SIZE))) {
957         ORTE_ERROR_LOG(ret);
958         exit_status = ret;
959         goto cleanup;
960     }
961 
962     for(i = 0; i < num_entries; ++i ) {
963         count = 1;
964         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &name, &count, ORTE_NAME))) {
965             ORTE_ERROR_LOG(ret);
966             exit_status = ret;
967             goto cleanup;
968         }
969 
970         count = 1;
971         if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &ckpt_skipped, &count, OPAL_BOOL))) {
972             ORTE_ERROR_LOG(ret);
973             exit_status = ret;
974             goto cleanup;
975         }
976 
977         if( !ckpt_skipped ) {
978             count = 1;
979             if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &crs_comp, &count, OPAL_STRING))) {
980                 ORTE_ERROR_LOG(ret);
981                 exit_status = ret;
982                 goto cleanup;
983             }
984 
985             /*
986              * Write this information to the global metadata
987              */
988             orte_util_convert_process_name_to_string(&proc_name, &name);
989 
990             metadata_write_str(handle_info,
991                                SSTORE_METADATA_INTERNAL_PROCESS_STR,
992                                proc_name);
993             metadata_write_str(handle_info,
994                                SSTORE_METADATA_LOCAL_CRS_COMP_STR,
995                                crs_comp);
996         }
997 
998         if( NULL != crs_comp ) {
999             free(crs_comp);
1000             crs_comp = NULL;
1001         }
1002         if( NULL != proc_name ) {
1003             free(proc_name);
1004             proc_name = NULL;
1005         }
1006 
1007         (handle_info->num_procs_synced)++;
1008     }
1009 
1010  cleanup:
1011     if( NULL != crs_comp ) {
1012         free(crs_comp);
1013         crs_comp = NULL;
1014     }
1015     if( NULL != proc_name ) {
1016         free(proc_name);
1017         proc_name = NULL;
1018     }
1019 
1020     return exit_status;
1021 }
1022 
init_global_snapshot_directory(orte_sstore_central_global_snapshot_info_t * handle_info)1023 static int init_global_snapshot_directory(orte_sstore_central_global_snapshot_info_t *handle_info)
1024 {
1025     int ret, exit_status = ORTE_SUCCESS;
1026     char * dir_name = NULL;
1027     mode_t my_mode = S_IRWXU;
1028 
1029     /*
1030      * Make the snapshot directory from the uniq_global_snapshot_name
1031      */
1032     asprintf(&dir_name, "%s/%s",
1033              handle_info->base_location,
1034              handle_info->local_location);
1035     if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(dir_name, my_mode)) ) {
1036         ORTE_ERROR_LOG(ret);
1037         exit_status = ret;
1038         goto cleanup;
1039     }
1040 
1041     /*
1042      * Open up the metadata file
1043      */
1044     if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1045         ORTE_ERROR_LOG(ret);
1046         exit_status = ret;
1047         goto cleanup;
1048     }
1049 
1050  cleanup:
1051     if(NULL != dir_name) {
1052         free(dir_name);
1053         dir_name = NULL;
1054     }
1055 
1056     return exit_status;
1057 }
1058 
1059 /**************************
1060  * Metadata functions
1061  **************************/
metadata_open(orte_sstore_central_global_snapshot_info_t * handle_info)1062 static int metadata_open(orte_sstore_central_global_snapshot_info_t * handle_info)
1063 {
1064     /* If already open, then just return */
1065     if( NULL != handle_info->metadata ) {
1066         return ORTE_SUCCESS;
1067     }
1068 
1069     if (NULL == (handle_info->metadata = fopen(handle_info->metadata_filename, "a")) ) {
1070         opal_output(orte_sstore_base_framework.framework_output,
1071                     "sstore:central:(global):init_dir() Unable to open the file (%s)\n",
1072                     handle_info->metadata_filename);
1073         ORTE_ERROR_LOG(ORTE_ERROR);
1074         return ORTE_ERROR;
1075    }
1076 
1077    return ORTE_SUCCESS;
1078 }
1079 
metadata_close(orte_sstore_central_global_snapshot_info_t * handle_info)1080 static int metadata_close(orte_sstore_central_global_snapshot_info_t * handle_info)
1081 {
1082     /* If already closed, then just return */
1083     if( NULL == handle_info->metadata ) {
1084         return ORTE_SUCCESS;
1085     }
1086 
1087     fclose(handle_info->metadata);
1088     handle_info->metadata = NULL;
1089 
1090     return ORTE_SUCCESS;
1091 }
1092 
metadata_write_int(orte_sstore_central_global_snapshot_info_t * handle_info,char * key,int value)1093 static int metadata_write_int(orte_sstore_central_global_snapshot_info_t * handle_info, char *key, int value)
1094 {
1095     int ret, exit_status = ORTE_SUCCESS;
1096 
1097     /* Make sure the metadata file is open */
1098     if( NULL == handle_info->metadata ) {
1099         if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1100             ORTE_ERROR_LOG(ret);
1101             exit_status = ret;
1102             goto cleanup;
1103         }
1104     }
1105 
1106     fprintf(handle_info->metadata, "%s%d\n", key, value);
1107 
1108  cleanup:
1109     return exit_status;
1110 }
1111 
metadata_write_str(orte_sstore_central_global_snapshot_info_t * handle_info,char * key,char * value)1112 static int metadata_write_str(orte_sstore_central_global_snapshot_info_t * handle_info, char *key, char *value)
1113 {
1114     int ret, exit_status = ORTE_SUCCESS;
1115 
1116     /* Make sure the metadata file is open */
1117     if( NULL == handle_info->metadata ) {
1118         if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1119             ORTE_ERROR_LOG(ret);
1120             exit_status = ret;
1121             goto cleanup;
1122         }
1123     }
1124 
1125     fprintf(handle_info->metadata, "%s%s\n", key, value);
1126 
1127  cleanup:
1128     return exit_status;
1129 }
1130 
metadata_write_timestamp(orte_sstore_central_global_snapshot_info_t * handle_info)1131 static int metadata_write_timestamp(orte_sstore_central_global_snapshot_info_t * handle_info)
1132 {
1133     int ret, exit_status = ORTE_SUCCESS;
1134     time_t timestamp;
1135 
1136     /* Make sure the metadata file is open */
1137     if( NULL == handle_info->metadata ) {
1138         if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1139             ORTE_ERROR_LOG(ret);
1140             exit_status = ret;
1141             goto cleanup;
1142         }
1143     }
1144 
1145     timestamp = time(NULL);
1146     fprintf(handle_info->metadata, "%s%s",
1147             SSTORE_METADATA_INTERNAL_TIME_STR,
1148             ctime(&timestamp));
1149 
1150  cleanup:
1151     return exit_status;
1152 }
1153 
orte_sstore_central_extract_global_metadata(orte_sstore_central_global_snapshot_info_t * handle_info,orte_sstore_base_global_snapshot_info_t * global_snapshot)1154 static int orte_sstore_central_extract_global_metadata(orte_sstore_central_global_snapshot_info_t * handle_info,
1155                                                        orte_sstore_base_global_snapshot_info_t *global_snapshot)
1156 {
1157     int exit_status = ORTE_SUCCESS;
1158     orte_sstore_base_local_snapshot_info_t *vpid_snapshot = NULL;
1159     opal_list_item_t* item = NULL;
1160     int i = 0;
1161 
1162     /*
1163      * Cleanup the structure a bit, so we can refresh it below
1164      */
1165     while (NULL != (item = opal_list_remove_first(&global_snapshot->local_snapshots))) {
1166         OBJ_RELEASE(item);
1167     }
1168 
1169     if( NULL != global_snapshot->start_time ) {
1170         free( global_snapshot->start_time );
1171         global_snapshot->start_time = NULL;
1172     }
1173 
1174     if( NULL != global_snapshot->end_time ) {
1175         free( global_snapshot->end_time );
1176         global_snapshot->end_time = NULL;
1177     }
1178 
1179     /*
1180      * Create a structure for each application process
1181      */
1182     for(i = 0; i < handle_info->num_procs_total; ++i) {
1183         vpid_snapshot = OBJ_NEW(orte_sstore_base_local_snapshot_info_t);
1184         vpid_snapshot->ss_handle = handle_info->id;
1185 
1186         vpid_snapshot->process_name.jobid  = handle_info->jobid;
1187         vpid_snapshot->process_name.vpid   = i;
1188 
1189         vpid_snapshot->crs_comp     = NULL;
1190         global_snapshot->start_time = NULL;
1191         global_snapshot->end_time   = NULL;
1192 
1193         opal_list_append(&global_snapshot->local_snapshots, &(vpid_snapshot->super));
1194     }
1195 
1196     return exit_status;
1197 }
1198 
central_snapshot_sort_compare_fn(opal_list_item_t ** a,opal_list_item_t ** b)1199 static int central_snapshot_sort_compare_fn(opal_list_item_t **a,
1200                                             opal_list_item_t **b)
1201 {
1202     orte_sstore_base_local_snapshot_info_t *snap_a, *snap_b;
1203 
1204     snap_a = (orte_sstore_base_local_snapshot_info_t*)(*a);
1205     snap_b = (orte_sstore_base_local_snapshot_info_t*)(*b);
1206 
1207     if( snap_a->process_name.vpid > snap_b->process_name.vpid ) {
1208         return 1;
1209     }
1210     else if( snap_a->process_name.vpid == snap_b->process_name.vpid ) {
1211         return 0;
1212     }
1213     else {
1214         return -1;
1215     }
1216 }
1217