1 /*
2  * Copyright (c)      2010 The Trustees of Indiana University.
3  *                         All rights reserved.
4  * $COPYRIGHT$
5  *
6  * Additional copyrights may follow
7  *
8  * $HEADER$
9  */
10 
11 /*
12  *
13  */
14 
15 #include "orte_config.h"
16 
17 #include <string.h>
18 #include <stdlib.h>
19 #include <sys/types.h>
20 #include <sys/stat.h>
21 #include <sys/wait.h>
22 #ifdef HAVE_UNISTD_H
23 #include <unistd.h>
24 #endif  /* HAVE_UNISTD_H */
25 
26 #include "orte/mca/mca.h"
27 #include "opal/mca/base/base.h"
28 
29 #include "opal/mca/event/event.h"
30 
31 #include "orte/constants.h"
32 #include "orte/util/show_help.h"
33 #include "opal/util/argv.h"
34 #include "opal/util/output.h"
35 #include "opal/util/show_help.h"
36 #include "opal/util/opal_environ.h"
37 #include "opal/util/basename.h"
38 #include "opal/util/os_dirpath.h"
39 
40 #include "opal/threads/mutex.h"
41 #include "opal/threads/condition.h"
42 
43 #include "orte/util/name_fns.h"
44 #include "orte/util/proc_info.h"
45 #include "orte/runtime/orte_globals.h"
46 #include "orte/runtime/orte_wait.h"
47 #include "orte/mca/errmgr/errmgr.h"
48 #include "orte/mca/rml/rml.h"
49 #include "orte/mca/rml/rml_types.h"
50 
51 #include "orte/mca/sstore/sstore.h"
52 #include "orte/mca/sstore/base/base.h"
53 
54 #include "sstore_central.h"
55 
56 /**********
57  * Object stuff
58  **********/
59 struct  orte_sstore_central_app_snapshot_info_t {
60     /** List super object */
61     opal_list_item_t super;
62 
63     /** */
64     orte_sstore_base_handle_t id;
65 
66     /** Global Sequence Number */
67     int seq_num;
68 
69     /** Global Reference Name */
70     char * global_ref_name;
71 
72     /** Local Location (Absolute Path) */
73     char * local_location;
74 
75     /** Metadata File Name (Absolute Path) */
76     char *metadata_filename;
77 
78     /** Metadata File Descriptor */
79     FILE *metadata;
80 
81     /** CRS Component used */
82     char * crs_comp;
83 
84     /** Did this process skip the checkpoint? */
85     bool ckpt_skipped;
86 };
87 typedef struct orte_sstore_central_app_snapshot_info_t orte_sstore_central_app_snapshot_info_t;
88 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_central_app_snapshot_info_t);
89 
90 void orte_sstore_central_app_snapshot_info_construct(orte_sstore_central_app_snapshot_info_t *info);
91 void orte_sstore_central_app_snapshot_info_destruct( orte_sstore_central_app_snapshot_info_t *info);
92 
93 OBJ_CLASS_INSTANCE(orte_sstore_central_app_snapshot_info_t,
94                    opal_list_item_t,
95                    orte_sstore_central_app_snapshot_info_construct,
96                    orte_sstore_central_app_snapshot_info_destruct);
97 
98 
99 /**********
100  * Local Function and Variable Declarations
101  **********/
102 static orte_sstore_central_app_snapshot_info_t *create_new_handle_info(orte_sstore_base_handle_t handle);
103 static orte_sstore_central_app_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle);
104 
105 static int init_local_snapshot_directory(orte_sstore_central_app_snapshot_info_t *handle_info);
106 static int pull_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info );
107 static int push_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info );
108 
109 static int metadata_open(orte_sstore_central_app_snapshot_info_t * handle_info);
110 static int metadata_close(orte_sstore_central_app_snapshot_info_t * handle_info);
111 static int metadata_write_str(orte_sstore_central_app_snapshot_info_t * handle_info, char * key, char *value);
112 static int metadata_write_int(orte_sstore_central_app_snapshot_info_t * handle_info, char *key, int value);
113 static int metadata_write_timestamp(orte_sstore_central_app_snapshot_info_t * handle_info);
114 
115 static opal_list_t *active_handles = NULL;
116 
117 /**********
118  * Object stuff
119  **********/
orte_sstore_central_app_snapshot_info_construct(orte_sstore_central_app_snapshot_info_t * info)120 void orte_sstore_central_app_snapshot_info_construct(orte_sstore_central_app_snapshot_info_t *info)
121 {
122     info->id      = 0;
123 
124     info->seq_num = -1;
125 
126     info->global_ref_name = NULL;
127     info->local_location  = NULL;
128 
129     info->metadata_filename = NULL;
130     info->metadata = NULL;
131 
132     info->crs_comp = NULL;
133 
134     info->ckpt_skipped = false;
135 }
136 
orte_sstore_central_app_snapshot_info_destruct(orte_sstore_central_app_snapshot_info_t * info)137 void orte_sstore_central_app_snapshot_info_destruct( orte_sstore_central_app_snapshot_info_t *info)
138 {
139     info->id      = 0;
140     info->seq_num = -1;
141 
142     if( NULL != info->global_ref_name ) {
143         free( info->global_ref_name );
144         info->global_ref_name  = NULL;
145     }
146 
147     if( NULL != info->local_location ) {
148         free( info->local_location );
149         info->local_location = NULL;
150     }
151 
152     if( NULL != info->metadata_filename ) {
153         free( info->metadata_filename ) ;
154         info->metadata_filename = NULL;
155     }
156 
157     if( NULL != info->metadata ) {
158         fclose(info->metadata);
159         info->metadata = NULL;
160     }
161 
162     if( NULL != info->crs_comp ) {
163         free( info->crs_comp );
164         info->crs_comp = NULL;
165     }
166 
167     info->ckpt_skipped = false;
168 }
169 
170 /******************
171  * Local functions
172  ******************/
orte_sstore_central_app_module_init(void)173 int orte_sstore_central_app_module_init(void)
174 {
175     if( NULL == active_handles ) {
176         active_handles = OBJ_NEW(opal_list_t);
177     }
178 
179     return ORTE_SUCCESS;
180 }
181 
orte_sstore_central_app_module_finalize(void)182 int orte_sstore_central_app_module_finalize(void)
183 {
184     if( NULL != active_handles ) {
185         OBJ_RELEASE(active_handles);
186     }
187 
188     return ORTE_SUCCESS;
189 }
190 
orte_sstore_central_app_request_checkpoint_handle(orte_sstore_base_handle_t * handle,int seq,orte_jobid_t jobid)191 int orte_sstore_central_app_request_checkpoint_handle(orte_sstore_base_handle_t *handle, int seq, orte_jobid_t jobid)
192 {
193     opal_output(0, "sstore:central:(app): request_checkpoint_handle() Not implemented!");
194     return ORTE_ERR_NOT_IMPLEMENTED;
195 }
196 
orte_sstore_central_app_register(orte_sstore_base_handle_t handle)197 int orte_sstore_central_app_register(orte_sstore_base_handle_t handle)
198 {
199     int ret, exit_status = ORTE_SUCCESS;
200     orte_sstore_central_app_snapshot_info_t *handle_info = NULL;
201 
202     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
203                          "sstore:central:(app): register(%d)", (int)handle));
204 
205     /*
206      * Create a handle
207      */
208     orte_sstore_handle_current = handle;
209     handle_info = find_handle_info(handle);
210     if( NULL != handle_info ) {
211         /* Remove the old, stale handle */
212         opal_list_remove_item(active_handles, &(handle_info->super));
213     }
214     handle_info = create_new_handle_info(handle);
215 
216     /*
217      * Get basic information from Local SStore
218      */
219     if( ORTE_SUCCESS != (ret = pull_handle_info(handle_info)) ) {
220         ORTE_ERROR_LOG(ret);
221         exit_status = ret;
222         goto cleanup;
223     }
224 
225     /*
226      * Setup the storage directory
227      */
228     if( ORTE_SUCCESS != (ret = init_local_snapshot_directory(handle_info)) ) {
229         ORTE_ERROR_LOG(ret);
230         exit_status = ret;
231         goto cleanup;
232     }
233 
234  cleanup:
235     return exit_status;
236 }
237 
orte_sstore_central_app_get_attr(orte_sstore_base_handle_t handle,orte_sstore_base_key_t key,char ** value)238 int orte_sstore_central_app_get_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char **value)
239 {
240     int exit_status = ORTE_SUCCESS;
241     orte_sstore_central_app_snapshot_info_t *handle_info = NULL;
242 
243     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
244                          "sstore:central:(app): get_attr(%d)", key));
245 
246     /*
247      * Lookup the handle
248      */
249     handle_info = find_handle_info(handle);
250 
251     /*
252      * Access metadata
253      */
254     if( SSTORE_METADATA_GLOBAL_SNAP_SEQ == key ) {
255         asprintf(value, "%d", handle_info->seq_num);
256         OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
257                              "sstore:central:(app): get_attr(%d, %d) Seq = <%s>", key, handle_info->id, *value));
258     }
259     else if( SSTORE_METADATA_LOCAL_SNAP_LOC == key) {
260         *value = strdup(handle_info->local_location);
261     }
262     else if( SSTORE_METADATA_LOCAL_SNAP_META == key ) {
263         *value = strdup(handle_info->metadata_filename);
264     }
265     else if( SSTORE_METADATA_GLOBAL_SNAP_REF == key ) {
266         *value = strdup(handle_info->global_ref_name);
267         OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
268                              "sstore:central:(app): get_attr(%d, %d) Ref = <%s>", key, handle_info->id, *value));
269     }
270     else {
271         exit_status = ORTE_ERR_NOT_SUPPORTED;
272         goto cleanup;
273     }
274 
275     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
276                          "sstore:central:(app): get_attr(%d, %d) <%s>", key, handle_info->id, *value));
277  cleanup:
278     return exit_status;
279 }
280 
orte_sstore_central_app_set_attr(orte_sstore_base_handle_t handle,orte_sstore_base_key_t key,char * value)281 int orte_sstore_central_app_set_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char *value)
282 {
283     int ret, exit_status = ORTE_SUCCESS;
284     orte_sstore_central_app_snapshot_info_t *handle_info = NULL;
285     char *key_str = NULL;
286 
287     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
288                          "sstore:central:(app): set_attr(%d = %s)", key, value));
289 
290     if( NULL == value ) {
291         ORTE_ERROR_LOG(ORTE_ERROR);
292         exit_status = ORTE_ERROR;
293         goto cleanup;
294     }
295 
296     if( key >= SSTORE_METADATA_MAX ) {
297         ORTE_ERROR_LOG(ORTE_ERROR);
298         exit_status = ORTE_ERROR;
299         goto cleanup;
300     }
301 
302     /*
303      * Lookup the handle
304      */
305     handle_info = find_handle_info(handle);
306 
307     /*
308      * Access metadata
309      */
310     if( SSTORE_METADATA_LOCAL_CRS_COMP == key ) {
311         if( NULL != handle_info->crs_comp ) {
312             free(handle_info->crs_comp);
313         }
314         handle_info->crs_comp = strdup(value);
315     }
316     else if(SSTORE_METADATA_LOCAL_SKIP_CKPT == key ) {
317         handle_info->ckpt_skipped = true;
318     }
319     else if( SSTORE_METADATA_LOCAL_MKDIR == key ||
320              SSTORE_METADATA_LOCAL_TOUCH == key ) {
321         orte_sstore_base_convert_key_to_string(key, &key_str);
322         if( ORTE_SUCCESS != (ret = metadata_write_str(handle_info, key_str, value))) {
323             ORTE_ERROR_LOG(ret);
324             exit_status = ret;
325             goto cleanup;
326         }
327     }
328     else {
329         exit_status = ORTE_ERROR;
330         goto cleanup;
331     }
332 
333  cleanup:
334     if( NULL != key_str ) {
335         free(key_str);
336         key_str = NULL;
337     }
338 
339     return exit_status;
340 }
341 
orte_sstore_central_app_sync(orte_sstore_base_handle_t handle)342 int orte_sstore_central_app_sync(orte_sstore_base_handle_t handle)
343 {
344     int ret, exit_status = ORTE_SUCCESS;
345     orte_sstore_central_app_snapshot_info_t *handle_info = NULL;
346 
347     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
348                          "sstore:central:(app): sync()"));
349 
350     /*
351      * Lookup the handle
352      */
353     handle_info = find_handle_info(handle);
354 
355     /*
356      * Finalize and close the metadata
357      */
358     if( ORTE_SUCCESS != (ret = metadata_write_timestamp(handle_info)) ) {
359         ORTE_ERROR_LOG(ret);
360         exit_status = ret;
361         goto cleanup;
362     }
363 
364     if( ORTE_SUCCESS != (ret = metadata_close(handle_info)) ) {
365         ORTE_ERROR_LOG(ret);
366         exit_status = ret;
367         goto cleanup;
368     }
369 
370     /*
371      * Push information to the Local coordinator
372      */
373     if( ORTE_SUCCESS != (ret = push_handle_info(handle_info)) ) {
374         ORTE_ERROR_LOG(ret);
375         exit_status = ret;
376         goto cleanup;
377     }
378 
379  cleanup:
380     orte_sstore_handle_current = ORTE_SSTORE_HANDLE_INVALID;
381 
382     return exit_status;
383 }
384 
orte_sstore_central_app_remove(orte_sstore_base_handle_t handle)385 int orte_sstore_central_app_remove(orte_sstore_base_handle_t handle)
386 {
387     opal_output(0, "sstore:central:(app): remove() Not implemented!");
388     return ORTE_ERR_NOT_IMPLEMENTED;
389 }
390 
orte_sstore_central_app_pack(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_base_handle_t handle)391 int orte_sstore_central_app_pack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t handle)
392 {
393     opal_output(0, "sstore:central:(app): pack() Not implemented!");
394     return ORTE_ERR_NOT_IMPLEMENTED;
395 }
396 
orte_sstore_central_app_unpack(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_base_handle_t * handle)397 int orte_sstore_central_app_unpack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t *handle)
398 {
399     opal_output(0, "sstore:central:(app): unpack() Not implemented!");
400     return ORTE_ERR_NOT_IMPLEMENTED;
401 }
402 
403 /**************************
404  * Local functions
405  **************************/
create_new_handle_info(orte_sstore_base_handle_t handle)406 static orte_sstore_central_app_snapshot_info_t *create_new_handle_info(orte_sstore_base_handle_t handle)
407 {
408     orte_sstore_central_app_snapshot_info_t *handle_info = NULL;
409 
410     handle_info = OBJ_NEW(orte_sstore_central_app_snapshot_info_t);
411 
412     handle_info->id = handle;
413 
414     opal_list_append(active_handles, &(handle_info->super));
415 
416     return handle_info;
417 }
418 
find_handle_info(orte_sstore_base_handle_t handle)419 static orte_sstore_central_app_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle)
420 {
421     orte_sstore_central_app_snapshot_info_t *handle_info = NULL;
422     opal_list_item_t* item = NULL;
423 
424     for(item  = opal_list_get_first(active_handles);
425         item != opal_list_get_end(active_handles);
426         item  = opal_list_get_next(item) ) {
427         handle_info = (orte_sstore_central_app_snapshot_info_t*)item;
428 
429         if( handle_info->id == handle ) {
430             return handle_info;
431         }
432     }
433 
434     return NULL;
435 }
436 
pull_handle_info(orte_sstore_central_app_snapshot_info_t * handle_info)437 static int pull_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info )
438 {
439     int ret, exit_status = ORTE_SUCCESS;
440     opal_buffer_t *buffer = NULL;
441     orte_sstore_central_cmd_flag_t command;
442     orte_std_cntr_t count;
443     orte_sstore_base_handle_t loc_id;
444     orte_rml_recv_cb_t* rb = NULL;
445 
446     buffer = OBJ_NEW(opal_buffer_t);
447 
448     /*
449      * Ask the daemon to send us the info that we need
450      */
451     command = ORTE_SSTORE_CENTRAL_PULL;
452     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD))) {
453         ORTE_ERROR_LOG(ret);
454         exit_status = ret;
455         goto cleanup;
456     }
457 
458     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
459         ORTE_ERROR_LOG(ret);
460         exit_status = ret;
461         goto cleanup;
462     }
463 
464     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
465                                                        ORTE_RML_TAG_SSTORE_INTERNAL,
466                                                        orte_rml_send_callback, NULL))) {
467         ORTE_ERROR_LOG(ret);
468         exit_status = ret;
469         goto cleanup;
470     }
471 
472     /* buffer should not be released here; the callback releases it */
473     buffer = NULL;
474 
475     /*
476      * Receive the response
477      */
478     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
479                          "sstore:central:(app): pull() from %s -> %s",
480                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
481                          ORTE_NAME_PRINT(ORTE_PROC_MY_DAEMON)));
482 
483     rb = OBJ_NEW(orte_rml_recv_cb_t);
484     rb->active = true;
485     orte_rml.recv_buffer_nb(ORTE_PROC_MY_DAEMON, ORTE_RML_TAG_SSTORE_INTERNAL,
486                             0, orte_rml_recv_callback, rb);
487     ORTE_WAIT_FOR_COMPLETION(rb->active);
488 
489     count = 1;
490     if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SSTORE_CENTRAL_CMD))) {
491         ORTE_ERROR_LOG(ret);
492         exit_status = ret;
493         goto cleanup;
494     }
495 
496     count = 1;
497     if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &loc_id, &count, ORTE_SSTORE_HANDLE))) {
498         ORTE_ERROR_LOG(ret);
499         exit_status = ret;
500         goto cleanup;
501     }
502     if( loc_id != handle_info->id ) {
503         ; /* JJH Big problem */
504     }
505 
506     count = 1;
507     if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->seq_num), &count, OPAL_INT))) {
508         ORTE_ERROR_LOG(ret);
509         exit_status = ret;
510         goto cleanup;
511     }
512 
513     count = 1;
514     if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
515         ORTE_ERROR_LOG(ret);
516         exit_status = ret;
517         goto cleanup;
518     }
519 
520     count = 1;
521     if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->local_location), &count, OPAL_STRING))) {
522         ORTE_ERROR_LOG(ret);
523         exit_status = ret;
524         goto cleanup;
525     }
526 
527     count = 1;
528     if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &(handle_info->metadata_filename), &count, OPAL_STRING))) {
529         ORTE_ERROR_LOG(ret);
530         exit_status = ret;
531         goto cleanup;
532     }
533 
534     OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
535                          "sstore:central:(app): pull() from %s -> %s (%d, %d, %s)",
536                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
537                          ORTE_NAME_PRINT(ORTE_PROC_MY_DAEMON),
538                          handle_info->id,
539                          handle_info->seq_num,
540                          handle_info->global_ref_name
541                          ));
542  cleanup:
543     if (NULL != buffer) {
544         OBJ_RELEASE(buffer);
545         buffer = NULL;
546     }
547     if (NULL != rb) {
548         OBJ_RELEASE(rb);
549         buffer = NULL;
550     }
551 
552     return exit_status;
553 }
554 
push_handle_info(orte_sstore_central_app_snapshot_info_t * handle_info)555 static int push_handle_info(orte_sstore_central_app_snapshot_info_t *handle_info )
556 {
557     int ret, exit_status = ORTE_SUCCESS;
558     opal_buffer_t *buffer = NULL;
559     orte_sstore_central_cmd_flag_t command;
560 
561     buffer = OBJ_NEW(opal_buffer_t);
562 
563     command = ORTE_SSTORE_CENTRAL_PUSH;
564     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD))) {
565         ORTE_ERROR_LOG(ret);
566         exit_status = ret;
567         goto cleanup;
568     }
569 
570     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
571         ORTE_ERROR_LOG(ret);
572         exit_status = ret;
573         goto cleanup;
574     }
575 
576     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->ckpt_skipped), 1, OPAL_BOOL))) {
577         ORTE_ERROR_LOG(ret);
578         exit_status = ret;
579         goto cleanup;
580     }
581 
582     if( !handle_info->ckpt_skipped ) {
583         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->crs_comp), 1, OPAL_STRING))) {
584             ORTE_ERROR_LOG(ret);
585             exit_status = ret;
586             goto cleanup;
587         }
588     }
589 
590     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
591                                                        ORTE_RML_TAG_SSTORE_INTERNAL,
592                                                        orte_rml_send_callback, NULL))) {
593         ORTE_ERROR_LOG(ret);
594         exit_status = ret;
595         goto cleanup;
596     }
597 
598     /* buffer should not be released here; the callback releases it */
599     buffer = NULL;
600 
601  cleanup:
602     if (NULL != buffer) {
603         OBJ_RELEASE(buffer);
604         buffer = NULL;
605     }
606 
607     return exit_status;
608 }
609 
init_local_snapshot_directory(orte_sstore_central_app_snapshot_info_t * handle_info)610 static int init_local_snapshot_directory(orte_sstore_central_app_snapshot_info_t *handle_info)
611 {
612     int ret, exit_status = ORTE_SUCCESS;
613     mode_t my_mode = S_IRWXU;
614 
615     /*
616      * Make the snapshot directory from the uniq_global_snapshot_name
617      */
618     if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(handle_info->local_location, my_mode)) ) {
619         opal_show_help("help-orte-sstore-central.txt", "fail_path_create", true,
620                        ORTE_NAME_PRINT(ORTE_PROC_MY_NAME),
621                        orte_process_info.nodename,
622                        handle_info->local_location);
623         ORTE_ERROR_LOG(ret);
624         exit_status = ret;
625         goto cleanup;
626     }
627 
628     /*
629      * Open up the metadata file
630      */
631     if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
632         ORTE_ERROR_LOG(ret);
633         exit_status = ret;
634         goto cleanup;
635     }
636 
637     /*
638      * Add a timestamp and the PID of this process
639      */
640     if( ORTE_SUCCESS != (ret = metadata_write_timestamp(handle_info)) ) {
641         ORTE_ERROR_LOG(ret);
642         exit_status = ret;
643         goto cleanup;
644     }
645 
646     if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info, SSTORE_METADATA_LOCAL_PID_STR, (int)getpid())) ) {
647         ORTE_ERROR_LOG(ret);
648         exit_status = ret;
649         goto cleanup;
650     }
651 
652     if( ORTE_SUCCESS != (ret = metadata_close(handle_info)) ) {
653         ORTE_ERROR_LOG(ret);
654         exit_status = ret;
655         goto cleanup;
656     }
657 
658  cleanup:
659     return exit_status;
660 }
661 
662 
663 /**************************
664  * Metadata functions
665  **************************/
metadata_open(orte_sstore_central_app_snapshot_info_t * handle_info)666 static int metadata_open(orte_sstore_central_app_snapshot_info_t * handle_info)
667 {
668     /* If already open, then just return */
669     if( NULL != handle_info->metadata ) {
670         return ORTE_SUCCESS;
671     }
672 
673     if (NULL == (handle_info->metadata = fopen(handle_info->metadata_filename, "a")) ) {
674         opal_output(orte_sstore_base_framework.framework_output,
675                     "sstore:central:(global):init_dir() Unable to open the file (%s)\n",
676                     handle_info->metadata_filename);
677         ORTE_ERROR_LOG(ORTE_ERROR);
678         return ORTE_ERROR;
679    }
680 
681    return ORTE_SUCCESS;
682 }
683 
metadata_close(orte_sstore_central_app_snapshot_info_t * handle_info)684 static int metadata_close(orte_sstore_central_app_snapshot_info_t * handle_info)
685 {
686     /* If already closed, then just return */
687     if( NULL == handle_info->metadata ) {
688         return ORTE_SUCCESS;
689     }
690 
691     fclose(handle_info->metadata);
692     handle_info->metadata = NULL;
693 
694     return ORTE_SUCCESS;
695 }
696 
metadata_write_str(orte_sstore_central_app_snapshot_info_t * handle_info,char * key,char * value)697 static int metadata_write_str(orte_sstore_central_app_snapshot_info_t * handle_info, char *key, char *value)
698 {
699     int ret, exit_status = ORTE_SUCCESS;
700 
701     /* Make sure the metadata file is open */
702     if( NULL == handle_info->metadata ) {
703         if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
704             ORTE_ERROR_LOG(ret);
705             exit_status = ret;
706             goto cleanup;
707         }
708     }
709 
710     fprintf(handle_info->metadata, "%s%s\n", key, value);
711 
712  cleanup:
713     /* Must close the metadata each time, since if we try to checkpoint the
714      * CRS might want to restore the FD, and will likely fail if the snapshot
715      * moved */
716     if( NULL != handle_info->metadata ) {
717         fclose(handle_info->metadata);
718         handle_info->metadata = NULL;
719     }
720 
721     return exit_status;
722 }
723 
metadata_write_int(orte_sstore_central_app_snapshot_info_t * handle_info,char * key,int value)724 static int metadata_write_int(orte_sstore_central_app_snapshot_info_t * handle_info, char *key, int value)
725 {
726     int ret, exit_status = ORTE_SUCCESS;
727 
728     /* Make sure the metadata file is open */
729     if( NULL == handle_info->metadata ) {
730         if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
731             ORTE_ERROR_LOG(ret);
732             exit_status = ret;
733             goto cleanup;
734         }
735     }
736 
737     fprintf(handle_info->metadata, "%s%d\n", key, value);
738 
739  cleanup:
740     return exit_status;
741 }
742 
metadata_write_timestamp(orte_sstore_central_app_snapshot_info_t * handle_info)743 static int metadata_write_timestamp(orte_sstore_central_app_snapshot_info_t * handle_info)
744 {
745     int ret, exit_status = ORTE_SUCCESS;
746     time_t timestamp;
747 
748     /* Make sure the metadata file is open */
749     if( NULL == handle_info->metadata ) {
750         if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
751             ORTE_ERROR_LOG(ret);
752             exit_status = ret;
753             goto cleanup;
754         }
755     }
756 
757     timestamp = time(NULL);
758     fprintf(handle_info->metadata, "%s%s", SSTORE_METADATA_INTERNAL_TIME_STR, ctime(&timestamp));
759 
760  cleanup:
761     return exit_status;
762 }
763