1 /*
2  * Copyright (c)      2010 The Trustees of Indiana University.
3  *                         All rights reserved.
4  * $COPYRIGHT$
5  *
6  * Additional copyrights may follow
7  *
8  * $HEADER$
9  */
10 
11 /*
12  *
13  */
14 
15 #include "orte_config.h"
16 
17 #include <string.h>
18 #include <stdlib.h>
19 #include <sys/types.h>
20 #include <sys/stat.h>
21 #include <sys/wait.h>
22 #ifdef HAVE_UNISTD_H
23 #include <unistd.h>
24 #endif  /* HAVE_UNISTD_H */
25 
26 #include "orte/mca/mca.h"
27 #include "opal/mca/base/base.h"
28 
29 #include "opal/mca/event/event.h"
30 
31 #include "orte/constants.h"
32 #include "orte/util/show_help.h"
33 #include "opal/util/argv.h"
34 #include "opal/util/output.h"
35 #include "opal/util/show_help.h"
36 #include "opal/util/opal_environ.h"
37 #include "opal/util/basename.h"
38 #include "opal/util/os_dirpath.h"
39 
40 #include "opal/threads/mutex.h"
41 #include "opal/threads/condition.h"
42 
43 #include "orte/util/name_fns.h"
44 #include "orte/util/proc_info.h"
45 #include "orte/runtime/orte_globals.h"
46 #include "orte/runtime/orte_wait.h"
47 #include "orte/mca/errmgr/errmgr.h"
48 #include "orte/mca/rml/rml.h"
49 #include "orte/mca/rml/rml_types.h"
50 
51 #include "orte/mca/sstore/sstore.h"
52 #include "orte/mca/sstore/base/base.h"
53 
54 #include "sstore_stage.h"
55 
56 /**********
57  * Object stuff
58  **********/
59 struct  orte_sstore_stage_app_snapshot_info_t {
60     /** List super object */
61     opal_list_item_t super;
62 
63     /** */
64     orte_sstore_base_handle_t id;
65 
66     /** Global Sequence Number */
67     int seq_num;
68 
69     /** Global Reference Name */
70     char * global_ref_name;
71 
72     /** Local Location (Absolute Path) */
73     char * local_location;
74 
75     /** Metadata File Name (Absolute Path) */
76     char *metadata_filename;
77 
78     /** Metadata File Descriptor */
79     FILE *metadata;
80 
81     /** CRS Component used */
82     char * crs_comp;
83 
84     /** Did this process skip the checkpoint? */
85     bool ckpt_skipped;
86 };
87 typedef struct orte_sstore_stage_app_snapshot_info_t orte_sstore_stage_app_snapshot_info_t;
88 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_stage_app_snapshot_info_t);
89 
90 void orte_sstore_stage_app_snapshot_info_construct(orte_sstore_stage_app_snapshot_info_t *info);
91 void orte_sstore_stage_app_snapshot_info_destruct( orte_sstore_stage_app_snapshot_info_t *info);
92 
93 OBJ_CLASS_INSTANCE(orte_sstore_stage_app_snapshot_info_t,
94                    opal_list_item_t,
95                    orte_sstore_stage_app_snapshot_info_construct,
96                    orte_sstore_stage_app_snapshot_info_destruct);
97 
98 
99 /**********
100  * Local Function and Variable Declarations
101  **********/
102 static orte_sstore_stage_app_snapshot_info_t *create_new_handle_info(orte_sstore_base_handle_t handle);
103 static orte_sstore_stage_app_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle);
104 
105 static int init_local_snapshot_directory(orte_sstore_stage_app_snapshot_info_t *handle_info);
106 static int pull_handle_info(orte_sstore_stage_app_snapshot_info_t *handle_info );
107 static int push_handle_info(orte_sstore_stage_app_snapshot_info_t *handle_info );
108 
109 static int metadata_open(orte_sstore_stage_app_snapshot_info_t * handle_info);
110 static int metadata_close(orte_sstore_stage_app_snapshot_info_t * handle_info);
111 static int metadata_write_str(orte_sstore_stage_app_snapshot_info_t * handle_info, char * key, char *value);
112 static int metadata_write_int(orte_sstore_stage_app_snapshot_info_t * handle_info, char *key, int value);
113 static int metadata_write_timestamp(orte_sstore_stage_app_snapshot_info_t * handle_info);
114 
115 static opal_list_t *active_handles = NULL;
116 
117 /**********
118  * Object stuff
119  **********/
orte_sstore_stage_app_snapshot_info_construct(orte_sstore_stage_app_snapshot_info_t * info)120 void orte_sstore_stage_app_snapshot_info_construct(orte_sstore_stage_app_snapshot_info_t *info)
121 {
122     info->id      = 0;
123 
124     info->seq_num = -1;
125 
126     info->global_ref_name = NULL;
127     info->local_location  = NULL;
128 
129     info->metadata_filename = NULL;
130     info->metadata = NULL;
131 
132     info->crs_comp = NULL;
133 
134     info->ckpt_skipped = false;
135 }
136 
orte_sstore_stage_app_snapshot_info_destruct(orte_sstore_stage_app_snapshot_info_t * info)137 void orte_sstore_stage_app_snapshot_info_destruct( orte_sstore_stage_app_snapshot_info_t *info)
138 {
139     info->id      = 0;
140     info->seq_num = -1;
141 
142     if( NULL != info->global_ref_name ) {
143         free( info->global_ref_name );
144         info->global_ref_name  = NULL;
145     }
146 
147     if( NULL != info->local_location ) {
148         free( info->local_location );
149         info->local_location = NULL;
150     }
151 
152     if( NULL != info->metadata_filename ) {
153         free( info->metadata_filename ) ;
154         info->metadata_filename = NULL;
155     }
156 
157     if( NULL != info->metadata ) {
158         fclose(info->metadata);
159         info->metadata = NULL;
160     }
161 
162     if( NULL != info->crs_comp ) {
163         free( info->crs_comp );
164         info->crs_comp = NULL;
165     }
166 
167     info->ckpt_skipped = false;
168 }
169 
170 /******************
171  * Local functions
172  ******************/
orte_sstore_stage_app_module_init(void)173 int orte_sstore_stage_app_module_init(void)
174 {
175     if( NULL == active_handles ) {
176         active_handles = OBJ_NEW(opal_list_t);
177     }
178 
179     return ORTE_SUCCESS;
180 }
181 
orte_sstore_stage_app_module_finalize(void)182 int orte_sstore_stage_app_module_finalize(void)
183 {
184     if( NULL != active_handles ) {
185         OBJ_RELEASE(active_handles);
186     }
187 
188     return ORTE_SUCCESS;
189 }
190 
orte_sstore_stage_app_request_checkpoint_handle(orte_sstore_base_handle_t * handle,int seq,orte_jobid_t jobid)191 int orte_sstore_stage_app_request_checkpoint_handle(orte_sstore_base_handle_t *handle, int seq, orte_jobid_t jobid)
192 {
193     opal_output(0, "sstore:stage:(app): request_checkpoint_handle() Not implemented!");
194     return ORTE_ERR_NOT_IMPLEMENTED;
195 }
196 
orte_sstore_stage_app_register(orte_sstore_base_handle_t handle)197 int orte_sstore_stage_app_register(orte_sstore_base_handle_t handle)
198 {
199     int ret, exit_status = ORTE_SUCCESS;
200     orte_sstore_stage_app_snapshot_info_t *handle_info = NULL;
201 
202     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
203                          "sstore:stage:(app): register(%d)", (int)handle));
204 
205     /*
206      * Create a handle
207      */
208     orte_sstore_handle_current = handle;
209     handle_info = find_handle_info(handle);
210     if( NULL != handle_info ) {
211         /* Remove the old, stale handle */
212         opal_list_remove_item(active_handles, &(handle_info->super));
213     }
214     handle_info = create_new_handle_info(handle);
215 
216     /*
217      * Get basic information from Local SStore
218      */
219     if( ORTE_SUCCESS != (ret = pull_handle_info(handle_info)) ) {
220         ORTE_ERROR_LOG(ret);
221         exit_status = ret;
222         goto cleanup;
223     }
224 
225     /*
226      * Setup the storage directory
227      */
228     if( ORTE_SUCCESS != (ret = init_local_snapshot_directory(handle_info)) ) {
229         ORTE_ERROR_LOG(ret);
230         exit_status = ret;
231         goto cleanup;
232     }
233 
234  cleanup:
235     return exit_status;
236 }
237 
orte_sstore_stage_app_get_attr(orte_sstore_base_handle_t handle,orte_sstore_base_key_t key,char ** value)238 int orte_sstore_stage_app_get_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char **value)
239 {
240     int exit_status = ORTE_SUCCESS;
241     orte_sstore_stage_app_snapshot_info_t *handle_info = NULL;
242 
243     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
244                          "sstore:stage:(app): get_attr(%d)", key));
245 
246     /*
247      * Lookup the handle
248      */
249     handle_info = find_handle_info(handle);
250 
251     /*
252      * Access metadata
253      */
254     if( SSTORE_METADATA_GLOBAL_SNAP_SEQ == key ) {
255         asprintf(value, "%d", handle_info->seq_num);
256     }
257     else if( SSTORE_METADATA_LOCAL_SNAP_LOC == key) {
258         *value = strdup(handle_info->local_location);
259     }
260     else if( SSTORE_METADATA_LOCAL_SNAP_META == key ) {
261         *value = strdup(handle_info->metadata_filename);
262     }
263     else if( SSTORE_METADATA_GLOBAL_SNAP_REF == key ) {
264         *value = strdup(handle_info->global_ref_name);
265     }
266     else {
267         exit_status = ORTE_ERR_NOT_SUPPORTED;
268         goto cleanup;
269     }
270 
271  cleanup:
272     return exit_status;
273 }
274 
orte_sstore_stage_app_set_attr(orte_sstore_base_handle_t handle,orte_sstore_base_key_t key,char * value)275 int orte_sstore_stage_app_set_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char *value)
276 {
277     int ret, exit_status = ORTE_SUCCESS;
278     orte_sstore_stage_app_snapshot_info_t *handle_info = NULL;
279     char *key_str = NULL;
280 
281     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
282                          "sstore:stage:(app): set_attr(%d = %s)", key, value));
283 
284     if( NULL == value ) {
285         ORTE_ERROR_LOG(ORTE_ERROR);
286         exit_status = ORTE_ERROR;
287         goto cleanup;
288     }
289 
290     if( key >= SSTORE_METADATA_MAX ) {
291         ORTE_ERROR_LOG(ORTE_ERROR);
292         exit_status = ORTE_ERROR;
293         goto cleanup;
294     }
295 
296     /*
297      * Lookup the handle
298      */
299     handle_info = find_handle_info(handle);
300 
301     /*
302      * Access metadata
303      */
304     if( SSTORE_METADATA_LOCAL_CRS_COMP == key ) {
305         if( NULL != handle_info->crs_comp ) {
306             free(handle_info->crs_comp);
307         }
308         handle_info->crs_comp = strdup(value);
309     }
310     else if(SSTORE_METADATA_LOCAL_SKIP_CKPT == key ) {
311         handle_info->ckpt_skipped = true;
312     }
313     else if( SSTORE_METADATA_LOCAL_MKDIR == key ||
314              SSTORE_METADATA_LOCAL_TOUCH == key ) {
315         orte_sstore_base_convert_key_to_string(key, &key_str);
316         if( ORTE_SUCCESS != (ret = metadata_write_str(handle_info, key_str, value))) {
317             ORTE_ERROR_LOG(ret);
318             exit_status = ret;
319             goto cleanup;
320         }
321     }
322     else {
323         exit_status = ORTE_ERROR;
324         goto cleanup;
325     }
326 
327  cleanup:
328     return exit_status;
329 }
330 
orte_sstore_stage_app_sync(orte_sstore_base_handle_t handle)331 int orte_sstore_stage_app_sync(orte_sstore_base_handle_t handle)
332 {
333     int ret, exit_status = ORTE_SUCCESS;
334     orte_sstore_stage_app_snapshot_info_t *handle_info = NULL;
335 
336     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
337                          "sstore:stage:(app): sync()"));
338 
339     /*
340      * Lookup the handle
341      */
342     handle_info = find_handle_info(handle);
343 
344     /*
345      * Finalize and close the metadata
346      */
347     if( ORTE_SUCCESS != (ret = metadata_write_timestamp(handle_info)) ) {
348         ORTE_ERROR_LOG(ret);
349         exit_status = ret;
350         goto cleanup;
351     }
352 
353     if( ORTE_SUCCESS != (ret = metadata_close(handle_info)) ) {
354         ORTE_ERROR_LOG(ret);
355         exit_status = ret;
356         goto cleanup;
357     }
358 
359     /*
360      * Push information to the Local coordinator
361      */
362     if( ORTE_SUCCESS != (ret = push_handle_info(handle_info)) ) {
363         ORTE_ERROR_LOG(ret);
364         exit_status = ret;
365         goto cleanup;
366     }
367 
368  cleanup:
369     orte_sstore_handle_current = ORTE_SSTORE_HANDLE_INVALID;
370 
371     return exit_status;
372 }
373 
orte_sstore_stage_app_remove(orte_sstore_base_handle_t handle)374 int orte_sstore_stage_app_remove(orte_sstore_base_handle_t handle)
375 {
376     opal_output(0, "sstore:stage:(app): remove() Not implemented!");
377     return ORTE_ERR_NOT_IMPLEMENTED;
378 }
379 
orte_sstore_stage_app_pack(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_base_handle_t handle)380 int orte_sstore_stage_app_pack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t handle)
381 {
382     opal_output(0, "sstore:stage:(app): pack() Not implemented!");
383     return ORTE_ERR_NOT_IMPLEMENTED;
384 }
385 
orte_sstore_stage_app_unpack(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_base_handle_t * handle)386 int orte_sstore_stage_app_unpack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t *handle)
387 {
388     opal_output(0, "sstore:stage:(app): unpack() Not implemented!");
389     return ORTE_ERR_NOT_IMPLEMENTED;
390 }
391 
392 /**************************
393  * Local functions
394  **************************/
create_new_handle_info(orte_sstore_base_handle_t handle)395 static orte_sstore_stage_app_snapshot_info_t *create_new_handle_info(orte_sstore_base_handle_t handle)
396 {
397     orte_sstore_stage_app_snapshot_info_t *handle_info = NULL;
398 
399     handle_info = OBJ_NEW(orte_sstore_stage_app_snapshot_info_t);
400 
401     handle_info->id = handle;
402 
403     opal_list_append(active_handles, &(handle_info->super));
404 
405     return handle_info;
406 }
407 
find_handle_info(orte_sstore_base_handle_t handle)408 static orte_sstore_stage_app_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle)
409 {
410     orte_sstore_stage_app_snapshot_info_t *handle_info = NULL;
411     opal_list_item_t* item = NULL;
412 
413     for(item  = opal_list_get_first(active_handles);
414         item != opal_list_get_end(active_handles);
415         item  = opal_list_get_next(item) ) {
416         handle_info = (orte_sstore_stage_app_snapshot_info_t*)item;
417 
418         if( handle_info->id == handle ) {
419             return handle_info;
420         }
421     }
422 
423     return NULL;
424 }
425 
pull_handle_info(orte_sstore_stage_app_snapshot_info_t * handle_info)426 static int pull_handle_info(orte_sstore_stage_app_snapshot_info_t *handle_info )
427 {
428     int ret, exit_status = ORTE_SUCCESS;
429     opal_buffer_t *buffer = NULL;
430     orte_sstore_stage_cmd_flag_t command;
431     orte_std_cntr_t count;
432     orte_sstore_base_handle_t loc_id;
433     orte_rml_recv_cb_t *rb = NULL;
434 
435     buffer = OBJ_NEW(opal_buffer_t);
436 
437     /*
438      * Ask the daemon to send us the info that we need
439      */
440     command = ORTE_SSTORE_STAGE_PULL;
441     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
442         ORTE_ERROR_LOG(ret);
443         exit_status = ret;
444         goto cleanup;
445     }
446 
447     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
448         ORTE_ERROR_LOG(ret);
449         exit_status = ret;
450         goto cleanup;
451     }
452 
453     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
454                                                        ORTE_RML_TAG_SSTORE_INTERNAL,
455                                                        orte_rml_send_callback, NULL))) {
456         ORTE_ERROR_LOG(ret);
457         exit_status = ret;
458         goto cleanup;
459     }
460 
461     /* buffer should not be released here; the callback releases it */
462     buffer = NULL;
463 
464     /*
465      * Receive the response
466      */
467     OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
468                          "sstore:stage:(app): pull() from %s -> %s",
469                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
470                          ORTE_NAME_PRINT(ORTE_PROC_MY_DAEMON)));
471 
472     rb = OBJ_NEW(orte_rml_recv_cb_t);
473     rb->active = true;
474     orte_rml.recv_buffer_nb(ORTE_PROC_MY_DAEMON, ORTE_RML_TAG_SSTORE_INTERNAL,
475                             0, orte_rml_recv_callback, rb);
476     ORTE_WAIT_FOR_COMPLETION(rb->active);
477 
478     count = 1;
479     if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SSTORE_STAGE_CMD))) {
480         ORTE_ERROR_LOG(ret);
481         exit_status = ret;
482         goto cleanup;
483     }
484 
485     count = 1;
486     if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &loc_id, &count, ORTE_SSTORE_HANDLE))) {
487         ORTE_ERROR_LOG(ret);
488         exit_status = ret;
489         goto cleanup;
490     }
491     if( loc_id != handle_info->id ) {
492         ; /* JJH Big problem */
493     }
494 
495     count = 1;
496     if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->seq_num), &count, OPAL_INT))) {
497         ORTE_ERROR_LOG(ret);
498         exit_status = ret;
499         goto cleanup;
500     }
501 
502     count = 1;
503     if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
504         ORTE_ERROR_LOG(ret);
505         exit_status = ret;
506         goto cleanup;
507     }
508 
509     count = 1;
510     if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->local_location), &count, OPAL_STRING))) {
511         ORTE_ERROR_LOG(ret);
512         exit_status = ret;
513         goto cleanup;
514     }
515 
516     count = 1;
517     if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->metadata_filename), &count, OPAL_STRING))) {
518         ORTE_ERROR_LOG(ret);
519         exit_status = ret;
520         goto cleanup;
521     }
522 
523  cleanup:
524     if (NULL != buffer) {
525         OBJ_RELEASE(buffer);
526         buffer = NULL;
527     }
528     if (NULL != rb) {
529         OBJ_RELEASE(rb);
530         buffer = NULL;
531     }
532 
533     return exit_status;
534 }
535 
push_handle_info(orte_sstore_stage_app_snapshot_info_t * handle_info)536 static int push_handle_info(orte_sstore_stage_app_snapshot_info_t *handle_info )
537 {
538     int ret, exit_status = ORTE_SUCCESS;
539     opal_buffer_t *buffer = NULL;
540     orte_sstore_stage_cmd_flag_t command;
541 
542     buffer = OBJ_NEW(opal_buffer_t);
543 
544     command = ORTE_SSTORE_STAGE_PUSH;
545     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
546         ORTE_ERROR_LOG(ret);
547         exit_status = ret;
548         goto cleanup;
549     }
550 
551     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
552         ORTE_ERROR_LOG(ret);
553         exit_status = ret;
554         goto cleanup;
555     }
556 
557     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->ckpt_skipped), 1, OPAL_BOOL))) {
558         ORTE_ERROR_LOG(ret);
559         exit_status = ret;
560         goto cleanup;
561     }
562 
563     if( !handle_info->ckpt_skipped ) {
564         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->crs_comp), 1, OPAL_STRING))) {
565             ORTE_ERROR_LOG(ret);
566             exit_status = ret;
567             goto cleanup;
568         }
569     }
570 
571     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
572                                                        ORTE_RML_TAG_SSTORE_INTERNAL,
573                                                        orte_rml_send_callback, NULL))) {
574         ORTE_ERROR_LOG(ret);
575         exit_status = ret;
576         goto cleanup;
577     }
578     /* buffer should not be released here; the callback releases it */
579     buffer = NULL;
580 
581  cleanup:
582     if (NULL != buffer) {
583         OBJ_RELEASE(buffer);
584         buffer = NULL;
585     }
586 
587     return exit_status;
588 }
589 
init_local_snapshot_directory(orte_sstore_stage_app_snapshot_info_t * handle_info)590 static int init_local_snapshot_directory(orte_sstore_stage_app_snapshot_info_t *handle_info)
591 {
592     int ret, exit_status = ORTE_SUCCESS;
593     mode_t my_mode = S_IRWXU;
594 
595     /*
596      * Make the snapshot directory from the uniq_global_snapshot_name
597      */
598     if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(handle_info->local_location, my_mode)) ) {
599         opal_show_help("help-orte-sstore-stage.txt", "fail_path_create", true,
600                        ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
601                        orte_process_info.nodename,
602                        handle_info->local_location);
603         ORTE_ERROR_LOG(ret);
604         exit_status = ret;
605         goto cleanup;
606     }
607 
608     /*
609      * Open up the metadata file
610      */
611     if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
612         ORTE_ERROR_LOG(ret);
613         exit_status = ret;
614         goto cleanup;
615     }
616 
617     /*
618      * Add a timestamp and the PID of this process
619      */
620     if( ORTE_SUCCESS != (ret = metadata_write_timestamp(handle_info)) ) {
621         ORTE_ERROR_LOG(ret);
622         exit_status = ret;
623         goto cleanup;
624     }
625 
626     if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info, SSTORE_METADATA_LOCAL_PID_STR, (int)getpid())) ) {
627         ORTE_ERROR_LOG(ret);
628         exit_status = ret;
629         goto cleanup;
630     }
631 
632     if( ORTE_SUCCESS != (ret = metadata_close(handle_info)) ) {
633         ORTE_ERROR_LOG(ret);
634         exit_status = ret;
635         goto cleanup;
636     }
637 
638  cleanup:
639     return exit_status;
640 }
641 
642 
643 /**************************
644  * Metadata functions
645  **************************/
metadata_open(orte_sstore_stage_app_snapshot_info_t * handle_info)646 static int metadata_open(orte_sstore_stage_app_snapshot_info_t * handle_info)
647 {
648     /* If already open, then just return */
649     if( NULL != handle_info->metadata ) {
650         return ORTE_SUCCESS;
651     }
652 
653     if (NULL == (handle_info->metadata = fopen(handle_info->metadata_filename, "a")) ) {
654         opal_output(orte_sstore_base_framework.framework_output,
655                     "sstore:stage:(global):init_dir() Unable to open the file (%s)\n",
656                     handle_info->metadata_filename);
657         ORTE_ERROR_LOG(ORTE_ERROR);
658         return ORTE_ERROR;
659    }
660 
661    return ORTE_SUCCESS;
662 }
663 
metadata_close(orte_sstore_stage_app_snapshot_info_t * handle_info)664 static int metadata_close(orte_sstore_stage_app_snapshot_info_t * handle_info)
665 {
666     /* If already closed, then just return */
667     if( NULL == handle_info->metadata ) {
668         return ORTE_SUCCESS;
669     }
670 
671     fclose(handle_info->metadata);
672     handle_info->metadata = NULL;
673 
674     return ORTE_SUCCESS;
675 }
676 
metadata_write_str(orte_sstore_stage_app_snapshot_info_t * handle_info,char * key,char * value)677 static int metadata_write_str(orte_sstore_stage_app_snapshot_info_t * handle_info, char *key, char *value)
678 {
679     int ret, exit_status = ORTE_SUCCESS;
680 
681     /* Make sure the metadata file is open */
682     if( NULL == handle_info->metadata ) {
683         if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
684             ORTE_ERROR_LOG(ret);
685             exit_status = ret;
686             goto cleanup;
687         }
688     }
689 
690     fprintf(handle_info->metadata, "%s%s\n", key, value);
691 
692  cleanup:
693     /* Must close the metadata each time, since if we try to checkpoint the
694      * CRS might want to restore the FD, and will likely fail if the snapshot
695      * moved */
696     if( NULL != handle_info->metadata ) {
697         fclose(handle_info->metadata);
698         handle_info->metadata = NULL;
699     }
700 
701     return exit_status;
702 }
703 
metadata_write_int(orte_sstore_stage_app_snapshot_info_t * handle_info,char * key,int value)704 static int metadata_write_int(orte_sstore_stage_app_snapshot_info_t * handle_info, char *key, int value)
705 {
706     int ret, exit_status = ORTE_SUCCESS;
707 
708     /* Make sure the metadata file is open */
709     if( NULL == handle_info->metadata ) {
710         if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
711             ORTE_ERROR_LOG(ret);
712             exit_status = ret;
713             goto cleanup;
714         }
715     }
716 
717     fprintf(handle_info->metadata, "%s%d\n", key, value);
718 
719  cleanup:
720     return exit_status;
721 }
722 
metadata_write_timestamp(orte_sstore_stage_app_snapshot_info_t * handle_info)723 static int metadata_write_timestamp(orte_sstore_stage_app_snapshot_info_t * handle_info)
724 {
725     int ret, exit_status = ORTE_SUCCESS;
726     time_t timestamp;
727 
728     /* Make sure the metadata file is open */
729     if( NULL == handle_info->metadata ) {
730         if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
731             ORTE_ERROR_LOG(ret);
732             exit_status = ret;
733             goto cleanup;
734         }
735     }
736 
737     timestamp = time(NULL);
738     fprintf(handle_info->metadata, "%s%s", SSTORE_METADATA_INTERNAL_TIME_STR, ctime(&timestamp));
739 
740  cleanup:
741     return exit_status;
742 }
743