1 /*
2 * Copyright (c) 2011 The Trustees of Indiana University.
3 * All rights reserved.
4 * Copyright (c) 2004-2011 The University of Tennessee and The University
5 * of Tennessee Research Foundation. All rights
6 * reserved.
7 * $COPYRIGHT$
8 *
9 * Additional copyrights may follow
10 *
11 * $HEADER$
12 */
13
14 /*
15 *
16 */
17
18 #include "orte_config.h"
19
20 #include <string.h>
21 #include <stdlib.h>
22 #include <sys/types.h>
23 #include <sys/stat.h>
24 #include <sys/wait.h>
25 #ifdef HAVE_UNISTD_H
26 #include <unistd.h>
27 #endif /* HAVE_UNISTD_H */
28
29 #include "orte/mca/mca.h"
30 #include "opal/mca/base/base.h"
31
32 #include "opal/mca/event/event.h"
33
34 #include "orte/constants.h"
35 #include "opal/util/argv.h"
36 #include "opal/util/output.h"
37 #include "opal/util/show_help.h"
38 #include "opal/util/opal_environ.h"
39 #include "opal/util/basename.h"
40 #include "opal/util/os_dirpath.h"
41 #include "opal/util/opal_getcwd.h"
42
43 #include "opal/threads/mutex.h"
44 #include "opal/threads/condition.h"
45
46 #include "orte/util/show_help.h"
47 #include "orte/util/name_fns.h"
48 #include "orte/util/proc_info.h"
49 #include "orte/runtime/orte_globals.h"
50 #include "orte/runtime/orte_wait.h"
51
52 #include "orte/mca/errmgr/errmgr.h"
53 #include "orte/mca/errmgr/base/base.h"
54 #include "orte/mca/errmgr/base/errmgr_private.h"
55 #include "orte/mca/ess/ess.h"
56 #include "orte/mca/rml/rml.h"
57 #include "orte/mca/rml/rml_types.h"
58 #include "orte/mca/filem/filem.h"
59 #include "orte/mca/grpcomm/grpcomm.h"
60 #include "orte/mca/snapc/snapc.h"
61 #include "orte/mca/snapc/base/base.h"
62
63 #include "orte/mca/sstore/sstore.h"
64 #include "orte/mca/sstore/base/base.h"
65
66 #include "sstore_stage.h"
67
68 #define SSTORE_HANDLE_TYPE_NONE 0
69 #define SSTORE_HANDLE_TYPE_CKPT 1
70 #define SSTORE_HANDLE_TYPE_RESTART 2
71
72 #define SSTORE_GLOBAL_NONE 0
73 #define SSTORE_GLOBAL_ERROR 1
74 #define SSTORE_GLOBAL_INIT 2
75 #define SSTORE_GLOBAL_REG 3
76 #define SSTORE_GLOBAL_SYNCING 4
77 #define SSTORE_GLOBAL_SYNCED 5
78
79 /**********
80 * Object Stuff
81 **********/
82 struct orte_sstore_stage_global_snapshot_info_t {
83 /** List super object */
84 opal_list_item_t super;
85
86 /** */
87 orte_sstore_base_handle_t id;
88
89 /** Job ID */
90 orte_jobid_t jobid;
91
92 /** State */
93 int state;
94
95 /** Handle type */
96 int handle_type;
97
98 /** Sequence Number */
99 int seq_num;
100
101 /** Reference Name */
102 char * ref_name;
103
104 /** Local Location (Relative Path to base_location) */
105 char * local_location;
106
107 /** Application location format (Global) */
108 char * app_global_location_fmt;
109
110 /** Application location format (Local) */
111 char * app_local_location_fmt;
112
113 /** Application location format (Local) */
114 char * app_local_cache_location_fmt;
115
116 /** Base location */
117 char * base_location;
118
119 /** Metadata File Name */
120 char *metadata_filename;
121
122 /** Metadata File Descriptor */
123 FILE *metadata;
124
125 /** Num procs reported as locally synced */
126 int num_procs_synced;
127
128 /** Num procs reported as done */
129 int num_procs_done;
130
131 /** Num procs total in job */
132 int num_procs_total;
133
134 /** List of FileM Requests to wait upon */
135 opal_list_t *filem_requests;
136
137 /** Is this checkpoint representing a migration? */
138 bool migrating;
139
140 /** JJH: Assume all processes are compressed the same way */
141 char * compress_comp;
142 char * compress_postfix;
143
144 /** Progress Meter */
145 double last_progress_report;
146 };
147 typedef struct orte_sstore_stage_global_snapshot_info_t orte_sstore_stage_global_snapshot_info_t;
148 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_stage_global_snapshot_info_t);
149
150 void orte_sstore_stage_global_snapshot_info_construct(orte_sstore_stage_global_snapshot_info_t *info);
151 void orte_sstore_stage_global_snapshot_info_destruct( orte_sstore_stage_global_snapshot_info_t *info);
152
153 OBJ_CLASS_INSTANCE(orte_sstore_stage_global_snapshot_info_t,
154 opal_list_item_t,
155 orte_sstore_stage_global_snapshot_info_construct,
156 orte_sstore_stage_global_snapshot_info_destruct);
157
158
159 /**********
160 * Local Function and Variable Declarations
161 **********/
162 static bool is_global_listener_active = false;
163 static int sstore_stage_global_start_listener(void);
164 static int sstore_stage_global_stop_listener(void);
165 static void sstore_stage_global_recv(int status,
166 orte_process_name_t* sender,
167 opal_buffer_t* buffer,
168 orte_rml_tag_t tag,
169 void* cbdata);
170
171 static int process_local_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info);
172 static int process_local_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info);
173 static int process_local_done(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info);
174 static int xcast_remove_all(orte_sstore_stage_global_snapshot_info_t *handle_info);
175
176 static orte_sstore_stage_global_snapshot_info_t *create_new_handle_info(int seq, int type, orte_jobid_t jobid);
177 static orte_sstore_stage_global_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle);
178
179 static int metadata_open(orte_sstore_stage_global_snapshot_info_t * handle_info);
180 static int metadata_close(orte_sstore_stage_global_snapshot_info_t * handle_info);
181 static int metadata_write_int(orte_sstore_stage_global_snapshot_info_t * handle_info, char * key, int value);
182 static int metadata_write_str(orte_sstore_stage_global_snapshot_info_t * handle_info, char * key, char *value);
183 static int metadata_write_timestamp(orte_sstore_stage_global_snapshot_info_t * handle_info);
184
185 static int init_global_snapshot_directory(orte_sstore_stage_global_snapshot_info_t *handle_info);
186 static int stage_snapshot_sort_compare_fn(opal_list_item_t **a,
187 opal_list_item_t **b);
188 static int orte_sstore_stage_extract_global_metadata(orte_sstore_stage_global_snapshot_info_t * handle_info,
189 orte_sstore_base_global_snapshot_info_t *global_snapshot);
190
191 static int wait_all_filem(orte_sstore_stage_global_snapshot_info_t *handle_info);
192 static void sync_global_dir(orte_sstore_stage_global_snapshot_info_t *handle_info);
193
194 static int next_handle_id = 1;
195 static opal_list_t *active_handles = NULL;
196
197 /*
198 * Progress
199 */
200 static void sstore_stage_report_progress(orte_sstore_stage_global_snapshot_info_t *handle_info);
201
202 #define SSTORE_STAGE_REPORT_PROGRESS(handle_info) \
203 { \
204 if(OPAL_UNLIKELY(orte_sstore_stage_progress_meter > 0)) { \
205 sstore_stage_report_progress(handle_info); \
206 } \
207 }
208
209 /**********
210 * Object stuff
211 **********/
orte_sstore_stage_global_snapshot_info_construct(orte_sstore_stage_global_snapshot_info_t * info)212 void orte_sstore_stage_global_snapshot_info_construct(orte_sstore_stage_global_snapshot_info_t *info)
213 {
214 info->id = next_handle_id;
215 next_handle_id++;
216
217 info->jobid = ORTE_JOBID_INVALID;
218
219 info->state = SSTORE_GLOBAL_NONE;
220
221 info->handle_type = SSTORE_HANDLE_TYPE_NONE;
222
223 info->seq_num = -1;
224
225 info->base_location = strdup(orte_sstore_base_global_snapshot_dir);
226
227 info->ref_name = NULL;
228 info->local_location = NULL;
229 info->app_global_location_fmt = NULL;
230 info->app_local_location_fmt = NULL;
231 info->app_local_cache_location_fmt = NULL;
232
233 info->metadata_filename = NULL;
234 info->metadata = NULL;
235
236 info->filem_requests = OBJ_NEW(opal_list_t);
237
238 info->num_procs_synced = 0;
239 info->num_procs_done = 0;
240 info->num_procs_total = 0;
241
242 info->migrating = false;
243
244 info->compress_comp = NULL;
245 info->compress_postfix = NULL;
246
247 info->last_progress_report = 0.0;
248 }
249
orte_sstore_stage_global_snapshot_info_destruct(orte_sstore_stage_global_snapshot_info_t * info)250 void orte_sstore_stage_global_snapshot_info_destruct( orte_sstore_stage_global_snapshot_info_t *info)
251 {
252 info->id = 0;
253 info->seq_num = -1;
254
255 info->jobid = ORTE_JOBID_INVALID;
256
257 info->state = SSTORE_GLOBAL_NONE;
258
259 info->handle_type = SSTORE_HANDLE_TYPE_NONE;
260
261 if( NULL != info->ref_name ) {
262 free( info->ref_name );
263 info->ref_name = NULL;
264 }
265
266 if( NULL != info->local_location ) {
267 free( info->local_location );
268 info->local_location = NULL;
269 }
270
271 if( NULL != info->app_global_location_fmt ) {
272 free( info->app_global_location_fmt );
273 info->app_global_location_fmt = NULL;
274 }
275
276 if( NULL != info->app_local_location_fmt ) {
277 free( info->app_local_location_fmt );
278 info->app_local_location_fmt = NULL;
279 }
280
281 if( NULL != info->app_local_cache_location_fmt ) {
282 free( info->app_local_cache_location_fmt );
283 info->app_local_cache_location_fmt = NULL;
284 }
285
286 if( NULL != info->base_location ) {
287 free( info->base_location );
288 info->base_location = NULL;
289 }
290
291 if( NULL != info->metadata_filename ) {
292 free( info->metadata_filename ) ;
293 info->metadata_filename = NULL;
294 }
295
296 if( NULL != info->metadata ) {
297 fclose(info->metadata);
298 info->metadata = NULL;
299 }
300
301 if( NULL != info->filem_requests ) {
302 OBJ_RELEASE(info->filem_requests);
303 info->filem_requests = NULL;
304 }
305
306 info->num_procs_synced = 0;
307 info->num_procs_done = 0;
308 info->num_procs_total = 0;
309
310 info->migrating = false;
311
312 if( NULL != info->compress_comp ) {
313 free(info->compress_comp);
314 info->compress_comp = NULL;
315 }
316
317 if( NULL != info->compress_postfix ) {
318 free(info->compress_postfix);
319 info->compress_postfix = NULL;
320 }
321
322 info->last_progress_report = 0.0;
323 }
324
325 /******************
326 * Local functions
327 ******************/
orte_sstore_stage_global_module_init(void)328 int orte_sstore_stage_global_module_init(void)
329 {
330 int ret, exit_status = ORTE_SUCCESS;
331
332 if( NULL == active_handles ) {
333 active_handles = OBJ_NEW(opal_list_t);
334 }
335
336 /*
337 * If user has not enabled recovery, but enabled Caching then caching does
338 * not benefit the job. Continue using it, but warn the user.
339 */
340 if( orte_sstore_stage_enabled_caching && !orte_enable_recovery ) {
341 opal_show_help("help-orte-sstore-stage.txt", "caching_no_recovery", true);
342 }
343
344 /*
345 * Setup a listener for the HNP/Apps
346 */
347 if( ORTE_SUCCESS != (ret = sstore_stage_global_start_listener()) ) {
348 ORTE_ERROR_LOG(ret);
349 exit_status = ret;
350 goto cleanup;
351 }
352
353 exit_status = orte_sstore_stage_local_module_init();
354
355 cleanup:
356 return exit_status;
357 }
358
orte_sstore_stage_global_module_finalize(void)359 int orte_sstore_stage_global_module_finalize(void)
360 {
361 int ret, exit_status = ORTE_SUCCESS;
362 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
363 opal_list_item_t* item = NULL;
364 bool done = false;
365 int cur_time = 0, max_time = 120;
366
367 /*
368 * Wait for all active transfers to finish
369 */
370 done = false;
371 while( 0 < opal_list_get_size(active_handles) && !done ) {
372 done = true;
373 for(item = opal_list_get_first(active_handles);
374 item != opal_list_get_end(active_handles);
375 item = opal_list_get_next(item) ) {
376 handle_info = (orte_sstore_stage_global_snapshot_info_t*)item;
377 if( SSTORE_GLOBAL_SYNCED != handle_info->state &&
378 SSTORE_GLOBAL_NONE != handle_info->state ) {
379 done = false;
380 break;
381 }
382 }
383 if( done ) {
384 break;
385 }
386 else {
387 if( cur_time != 0 && cur_time % 30 == 0 ) {
388 opal_output(0, "---> Waiting for sync(): %3d / %3d\n",
389 cur_time, max_time);
390 }
391
392 opal_progress();
393 if( cur_time >= max_time ) {
394 break;
395 } else {
396 sleep(1);
397 }
398 cur_time++;
399 }
400 }
401
402 exit_status = orte_sstore_stage_local_module_finalize();
403
404 if( NULL != active_handles ) {
405 OBJ_RELEASE(active_handles);
406 }
407
408 /*
409 * Shutdown the listener for the HNP/Apps
410 */
411 if( ORTE_SUCCESS != (ret = sstore_stage_global_stop_listener()) ) {
412 ORTE_ERROR_LOG(ret);
413 exit_status = ret;
414 goto cleanup;
415 }
416
417 cleanup:
418 return exit_status;
419 }
420
orte_sstore_stage_global_request_checkpoint_handle(orte_sstore_base_handle_t * handle,int seq,orte_jobid_t jobid)421 int orte_sstore_stage_global_request_checkpoint_handle(orte_sstore_base_handle_t *handle, int seq, orte_jobid_t jobid)
422 {
423 int ret, exit_status = ORTE_SUCCESS;
424 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
425
426 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
427 "sstore:stage:(global): request_checkpoint_handle()"));
428
429 /*
430 * Construct a handle
431 * - Associate all of the necessary information
432 */
433 handle_info = create_new_handle_info(seq, SSTORE_HANDLE_TYPE_CKPT, jobid);
434
435 /*
436 * Create the global checkpoint directory
437 */
438 if( ORTE_SUCCESS != (ret = init_global_snapshot_directory(handle_info)) ) {
439 ORTE_ERROR_LOG(ret);
440 exit_status = ret;
441 goto cleanup;
442 }
443
444 /*
445 * Return the handle
446 */
447 *handle = handle_info->id;
448
449 cleanup:
450 return exit_status;
451 }
452
orte_sstore_stage_global_request_global_snapshot_data(orte_sstore_base_handle_t * handle,orte_sstore_base_global_snapshot_info_t * snapshot)453 int orte_sstore_stage_global_request_global_snapshot_data(orte_sstore_base_handle_t *handle,
454 orte_sstore_base_global_snapshot_info_t *snapshot)
455 {
456 int ret, exit_status = ORTE_SUCCESS;
457 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
458
459 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
460 "sstore:stage:(global): request_global_snapshot_data()"));
461
462 /*
463 * Lookup the handle (if NULL, use last stable)
464 */
465 if( NULL != handle ) {
466 handle_info = find_handle_info(*handle);
467 snapshot->ss_handle = *handle;
468 } else {
469 handle_info = find_handle_info(orte_sstore_handle_last_stable);
470 snapshot->ss_handle = orte_sstore_handle_last_stable;
471 }
472
473 /*
474 * Construct the snapshot from local data, and metadata file
475 */
476 snapshot->seq_num = handle_info->seq_num;
477 snapshot->reference = strdup(handle_info->ref_name);
478 snapshot->basedir = strdup(handle_info->base_location);
479 snapshot->metadata_filename = strdup(handle_info->metadata_filename);
480
481 /* If this is the current checkpoint, pull data from local cache */
482 if( orte_sstore_handle_current == snapshot->ss_handle ) {
483 if( ORTE_SUCCESS != (ret = orte_sstore_stage_extract_global_metadata(handle_info, snapshot)) ) {
484 ORTE_ERROR_LOG(ret);
485 exit_status = ret;
486 goto cleanup;
487 }
488 }
489 /* Otherwise, pull from metadata */
490 else {
491 if( ORTE_SUCCESS != (ret = orte_sstore_base_extract_global_metadata(snapshot)) ) {
492 ORTE_ERROR_LOG(ret);
493 exit_status = ret;
494 goto cleanup;
495 }
496 }
497
498 opal_list_sort(&snapshot->local_snapshots, stage_snapshot_sort_compare_fn);
499
500 cleanup:
501 return exit_status;
502 }
503
orte_sstore_stage_global_register(orte_sstore_base_handle_t handle)504 int orte_sstore_stage_global_register(orte_sstore_base_handle_t handle)
505 {
506 int ret, exit_status = ORTE_SUCCESS;
507 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
508
509 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
510 "sstore:stage:(global): register(%d) - Global", handle));
511
512 /*
513 * Lookup the handle
514 */
515 handle_info = find_handle_info(handle);
516 if( SSTORE_GLOBAL_REG != handle_info->state ) {
517 handle_info->state = SSTORE_GLOBAL_REG;
518 } else {
519 return orte_sstore_stage_local_register(handle);
520 }
521
522 orte_sstore_handle_current = handle;
523
524 /*
525 * Associate the metadata
526 */
527 if( handle_info->migrating ) {
528 if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
529 SSTORE_METADATA_INTERNAL_MIG_SEQ_STR,
530 handle_info->seq_num)) ) {
531 ORTE_ERROR_LOG(ret);
532 exit_status = ret;
533 goto cleanup;
534 }
535 } else {
536 if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
537 SSTORE_METADATA_GLOBAL_SNAP_SEQ_STR,
538 handle_info->seq_num)) ) {
539 ORTE_ERROR_LOG(ret);
540 exit_status = ret;
541 goto cleanup;
542 }
543 }
544
545 if( ORTE_SUCCESS != (ret = metadata_write_str(handle_info,
546 SSTORE_METADATA_LOCAL_SNAP_REF_FMT_STR,
547 orte_sstore_base_local_snapshot_fmt)) ) {
548 ORTE_ERROR_LOG(ret);
549 exit_status = ret;
550 goto cleanup;
551 }
552
553 if( ORTE_SUCCESS != (ret = metadata_write_timestamp(handle_info)) ) {
554 ORTE_ERROR_LOG(ret);
555 exit_status = ret;
556 goto cleanup;
557 }
558
559 cleanup:
560 return exit_status;
561 }
562
orte_sstore_stage_global_get_attr(orte_sstore_base_handle_t handle,orte_sstore_base_key_t key,char ** value)563 int orte_sstore_stage_global_get_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char **value)
564 {
565 int exit_status = ORTE_SUCCESS;
566 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
567
568 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
569 "sstore:stage:(global): get_attr()"));
570
571 /*
572 * Lookup the handle
573 */
574 handle_info = find_handle_info(handle);
575
576 /*
577 * Access metadata
578 */
579 /* Used by snapc */
580 if( SSTORE_METADATA_GLOBAL_SNAP_REF == key ) {
581 *value = strdup(handle_info->ref_name);
582 }
583 /* Used by snapc */
584 else if( SSTORE_METADATA_GLOBAL_SNAP_SEQ == key ) {
585 asprintf(value, "%d", handle_info->seq_num);
586 }
587 /* Used by orte-restart and RecoS and snapc (kinda) */
588 else if( SSTORE_METADATA_LOCAL_SNAP_LOC == key ) {
589 asprintf(value, "%s/%s/%d",
590 handle_info->base_location,
591 handle_info->ref_name,
592 handle_info->seq_num);
593 }
594 /* Used by orte-restart and RecoS */
595 else if( SSTORE_METADATA_LOCAL_SNAP_REF_FMT == key ) {
596 *value = strdup(orte_sstore_base_local_snapshot_fmt);
597 }
598 /* Used by orte-restart and RecoS */
599 else if( SSTORE_METADATA_LOCAL_SNAP_REF_LOC_FMT == key ) {
600 asprintf(value, "%s/%s/%d/%s",
601 handle_info->base_location,
602 handle_info->ref_name,
603 handle_info->seq_num,
604 orte_sstore_base_local_snapshot_fmt);
605 }
606 else {
607 exit_status = ORTE_ERR_NOT_SUPPORTED;
608 }
609
610 return exit_status;
611 }
612
orte_sstore_stage_global_set_attr(orte_sstore_base_handle_t handle,orte_sstore_base_key_t key,char * value)613 int orte_sstore_stage_global_set_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char *value)
614 {
615 int ret, exit_status = ORTE_SUCCESS;
616 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
617 char *key_str = NULL;
618
619 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
620 "sstore:stage:(global): set_attr()"));
621
622 /*
623 * Lookup the handle
624 */
625 handle_info = find_handle_info(handle);
626
627 /*
628 * Process key (Access metadata)
629 */
630 if( key == SSTORE_METADATA_GLOBAL_MIGRATING ) {
631 handle_info->migrating = true;
632 }
633 else {
634 orte_sstore_base_convert_key_to_string(key, &key_str);
635 if( NULL == key_str ) {
636 ORTE_ERROR_LOG(ORTE_ERROR);
637 exit_status = ORTE_ERROR;
638 goto cleanup;
639 }
640
641 if( ORTE_SUCCESS != (ret = metadata_write_str(handle_info, key_str, value))) {
642 ORTE_ERROR_LOG(ret);
643 exit_status = ret;
644 goto cleanup;
645 }
646 }
647
648 cleanup:
649 if( NULL != key_str ) {
650 free(key_str);
651 key_str = NULL;
652 }
653
654 return exit_status;
655 }
656
orte_sstore_stage_global_sync(orte_sstore_base_handle_t handle)657 int orte_sstore_stage_global_sync(orte_sstore_base_handle_t handle)
658 {
659 int ret, exit_status = ORTE_SUCCESS;
660 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
661
662 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
663 "sstore:stage:(global): sync()"));
664
665 /*
666 * Lookup the handle
667 */
668 handle_info = find_handle_info(handle);
669 if( SSTORE_GLOBAL_SYNCING != handle_info->state ) {
670 handle_info->state = SSTORE_GLOBAL_SYNCING;
671 if( ORTE_SNAPC_LOCAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_LOCAL_COORD_TYPE) ) {
672 return orte_sstore_stage_local_sync(handle);
673 }
674 }
675
676 /*
677 * Wait for all the processes to report in before waiting on all the requests
678 */
679 while(handle_info->num_procs_synced < handle_info->num_procs_total) {
680 opal_progress();
681 }
682
683 /*
684 * Synchronize all of the files
685 * Wait on FileM operations
686 */
687 if( !orte_sstore_stage_skip_filem ) {
688 if( ORTE_SUCCESS != (ret = wait_all_filem(handle_info))) {
689 ORTE_ERROR_LOG(ret);
690 exit_status = ret;
691 goto cleanup;
692 }
693 }
694
695 /*
696 * Finalize and close the metadata
697 */
698 if( ORTE_SUCCESS != (ret = metadata_write_timestamp(handle_info)) ) {
699 ORTE_ERROR_LOG(ret);
700 exit_status = ret;
701 goto cleanup;
702 }
703
704 if( handle_info->migrating ) {
705 if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
706 SSTORE_METADATA_INTERNAL_DONE_MIG_SEQ_STR,
707 handle_info->seq_num)) ) {
708 ORTE_ERROR_LOG(ret);
709 exit_status = ret;
710 goto cleanup;
711 }
712 } else {
713 if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
714 SSTORE_METADATA_INTERNAL_DONE_SEQ_STR,
715 handle_info->seq_num)) ) {
716 ORTE_ERROR_LOG(ret);
717 exit_status = ret;
718 goto cleanup;
719 }
720 }
721
722 if( ORTE_SUCCESS != (ret = metadata_close(handle_info)) ) {
723 ORTE_ERROR_LOG(ret);
724 exit_status = ret;
725 goto cleanup;
726 }
727
728 /* JJH: We should lock this var! */
729 if( !handle_info->migrating ) {
730 orte_sstore_base_is_checkpoint_available = true;
731 orte_sstore_handle_last_stable = orte_sstore_handle_current;
732 }
733
734 handle_info->state = SSTORE_GLOBAL_SYNCED;
735
736 cleanup:
737 return exit_status;
738 }
739
sync_global_dir(orte_sstore_stage_global_snapshot_info_t * handle_info)740 static void sync_global_dir(orte_sstore_stage_global_snapshot_info_t *handle_info)
741 {
742 opal_list_item_t* item = NULL, *f_item = NULL;
743 orte_filem_base_request_t *filem_request = NULL;
744 orte_filem_base_file_set_t * f_set = NULL;
745 char * fs_str = NULL;
746 char cwd[OPAL_PATH_MAX];
747
748 opal_getcwd(cwd, OPAL_PATH_MAX);
749
750 /*
751 * Sync the Sequence num dir
752 */
753 asprintf(&fs_str, "%s/%s/%d",
754 handle_info->base_location,
755 handle_info->ref_name,
756 handle_info->seq_num);
757 OPAL_OUTPUT_VERBOSE((20, mca_sstore_stage_component.super.output_handle,
758 "sstore:stage:(global): sync_dir(): Sync'ing on %s",
759 fs_str));
760 if( 0 != chdir(fs_str) ) {
761 opal_output(0, "sstore:stage:(global): Failed to chdir(%s)",
762 fs_str);
763 goto cleanup;
764 }
765 system("sync ; sync ; ls > /dev/null");
766
767 /*
768 * Sync each of the local snapshots
769 * if compressing, then this is already covered above
770 */
771 if( orte_sstore_stage_enabled_compression ) {
772 goto cleanup;
773 }
774
775 for(f_item = opal_list_get_first(handle_info->filem_requests);
776 f_item != opal_list_get_end(handle_info->filem_requests);
777 f_item = opal_list_get_next(f_item) ) {
778 filem_request = (orte_filem_base_request_t *)f_item;
779
780 for(item = opal_list_get_first(&(filem_request->file_sets));
781 item != opal_list_get_end(&(filem_request->file_sets));
782 item = opal_list_get_next(item) ) {
783 f_set = (orte_filem_base_file_set_t *) item;
784
785 if( NULL != fs_str ) {
786 free(fs_str);
787 fs_str = NULL;
788 }
789
790 if( ORTE_FILEM_TYPE_FILE != f_set->target_flag ) {
791 OPAL_OUTPUT_VERBOSE((20, mca_sstore_stage_component.super.output_handle,
792 "sstore:stage:(global): sync_dir(): Sync'ing on %s",
793 f_set->local_target));
794 if( 0 != chdir(f_set->local_target) ) {
795 opal_output(0, "sstore:stage:(global): Failed to chdir(%s)",
796 f_set->local_target);
797 } else {
798 system("sync ; sync ");
799 }
800 }
801 }
802 }
803
804 cleanup:
805 chdir(cwd);
806
807 if( NULL != fs_str ) {
808 free(fs_str);
809 fs_str = NULL;
810 }
811
812 return;
813 }
814
orte_sstore_stage_global_remove(orte_sstore_base_handle_t handle)815 int orte_sstore_stage_global_remove(orte_sstore_base_handle_t handle)
816 {
817 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
818 "sstore:stage:(global): remove()"));
819
820 /*
821 * Lookup the handle
822 */
823
824 return ORTE_SUCCESS;
825 }
826
orte_sstore_stage_global_pack(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_base_handle_t handle)827 int orte_sstore_stage_global_pack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t handle)
828 {
829 int ret, exit_status = ORTE_SUCCESS;
830 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
831
832 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
833 "sstore:stage:(global): pack()"));
834
835 /*
836 * Lookup the handle
837 */
838 handle_info = find_handle_info(handle);
839
840 /*
841 * Pack the handle ID
842 */
843 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &handle, 1, ORTE_SSTORE_HANDLE))) {
844 ORTE_ERROR_LOG(ret);
845 exit_status = ret;
846 goto cleanup;
847 }
848
849 /*
850 * Pack any metadata
851 */
852 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->seq_num), 1, OPAL_INT )) ) {
853 ORTE_ERROR_LOG(ret);
854 exit_status = ret;
855 goto cleanup;
856 }
857
858 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->ref_name), 1, OPAL_STRING )) ) {
859 ORTE_ERROR_LOG(ret);
860 exit_status = ret;
861 goto cleanup;
862 }
863
864 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->app_local_location_fmt), 1, OPAL_STRING )) ) {
865 ORTE_ERROR_LOG(ret);
866 exit_status = ret;
867 goto cleanup;
868 }
869
870 if( orte_sstore_stage_enabled_caching ) {
871 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->app_local_cache_location_fmt), 1, OPAL_STRING )) ) {
872 ORTE_ERROR_LOG(ret);
873 exit_status = ret;
874 goto cleanup;
875 }
876 }
877
878 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->migrating), 1, OPAL_BOOL )) ) {
879 ORTE_ERROR_LOG(ret);
880 exit_status = ret;
881 goto cleanup;
882 }
883
884 cleanup:
885 return exit_status;
886 }
887
orte_sstore_stage_global_unpack(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_base_handle_t * handle)888 int orte_sstore_stage_global_unpack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t *handle)
889 {
890 int ret, exit_status = ORTE_SUCCESS;
891
892 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
893 "sstore:stage:(global): unpack()"));
894
895 /*
896 * Unpack the handle id
897 */
898 if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_JOBID,
899 ORTE_PROC_MY_NAME,
900 peer)) {
901 /*
902 * Differ to the orted version, so if we have application then they get updated too
903 */
904 if( ORTE_SUCCESS != (ret = orte_sstore_stage_local_unpack(peer, buffer, handle)) ) {
905 ORTE_ERROR_LOG(ret);
906 exit_status = ret;
907 goto cleanup;
908 }
909 }
910
911 cleanup:
912 return exit_status;
913 }
914
915 /**************************
916 * Local functions
917 **************************/
create_new_handle_info(int seq,int type,orte_jobid_t jobid)918 static orte_sstore_stage_global_snapshot_info_t *create_new_handle_info(int seq, int type, orte_jobid_t jobid)
919 {
920 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
921 orte_job_t *jdata = NULL;
922
923 handle_info = OBJ_NEW(orte_sstore_stage_global_snapshot_info_t);
924
925 handle_info->jobid = jobid;
926
927 handle_info->state = SSTORE_GLOBAL_INIT;
928
929 handle_info->handle_type = type;
930
931 handle_info->seq_num = seq;
932
933 orte_sstore_base_get_global_snapshot_ref(&(handle_info->ref_name), getpid());
934
935 asprintf(&(handle_info->local_location), "%s/%d",
936 handle_info->ref_name, handle_info->seq_num);
937
938 /* This is used by the application to establish the local directory */
939 asprintf(&(handle_info->app_local_location_fmt), "%s/%s/%s/%s",
940 orte_sstore_stage_local_snapshot_dir,
941 ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
942 ORTE_SSTORE_LOCAL_SNAPSHOT_STAGE_DIR_NAME,
943 orte_sstore_base_local_snapshot_fmt);
944
945 if( orte_sstore_stage_enabled_caching ) {
946 asprintf(&(handle_info->app_local_cache_location_fmt), "%s/%s/%s/%d/%s",
947 orte_sstore_stage_local_snapshot_dir,
948 ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
949 ORTE_SSTORE_LOCAL_SNAPSHOT_CACHE_DIR_NAME,
950 handle_info->seq_num,
951 orte_sstore_base_local_snapshot_fmt);
952 }
953
954 /* This is used by the HNP to remember where it should place each process */
955 asprintf(&(handle_info->app_global_location_fmt), "%s/%s/%s",
956 handle_info->base_location,
957 handle_info->local_location,
958 orte_sstore_base_local_snapshot_fmt);
959
960 asprintf(&(handle_info->metadata_filename), "%s/%s/%s",
961 handle_info->base_location,
962 handle_info->ref_name,
963 orte_sstore_base_global_metadata_filename);
964
965 jdata = orte_get_job_data_object(handle_info->jobid);
966 handle_info->num_procs_total = (int)jdata->num_procs;
967
968 opal_list_append(active_handles, &(handle_info->super));
969
970 return handle_info;
971 }
972
find_handle_info(orte_sstore_base_handle_t handle)973 static orte_sstore_stage_global_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle)
974 {
975 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
976 opal_list_item_t* item = NULL;
977
978 for(item = opal_list_get_first(active_handles);
979 item != opal_list_get_end(active_handles);
980 item = opal_list_get_next(item) ) {
981 handle_info = (orte_sstore_stage_global_snapshot_info_t*)item;
982
983 if( handle_info->id == handle ) {
984 return handle_info;
985 }
986 }
987
988 return NULL;
989 }
990
sstore_stage_global_start_listener(void)991 static int sstore_stage_global_start_listener(void)
992 {
993 if( is_global_listener_active ) {
994 return ORTE_SUCCESS;
995 }
996
997 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL,
998 ORTE_RML_PERSISTENT, sstore_stage_global_recv, NULL);
999
1000 is_global_listener_active = true;
1001 return ORTE_SUCCESS;
1002 }
1003
sstore_stage_global_stop_listener(void)1004 static int sstore_stage_global_stop_listener(void)
1005 {
1006 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL);
1007
1008 is_global_listener_active = false;
1009 return ORTE_SUCCESS;
1010 }
1011
sstore_stage_global_recv(int status,orte_process_name_t * sender,opal_buffer_t * buffer,orte_rml_tag_t tag,void * cbdata)1012 static void sstore_stage_global_recv(int status,
1013 orte_process_name_t* sender,
1014 opal_buffer_t* buffer,
1015 orte_rml_tag_t tag,
1016 void* cbdata)
1017 {
1018 int ret;
1019 orte_sstore_stage_cmd_flag_t command;
1020 orte_std_cntr_t count;
1021 orte_sstore_base_handle_t loc_id;
1022 orte_sstore_stage_global_snapshot_info_t *handle_info = NULL;
1023
1024 if( ORTE_RML_TAG_SSTORE_INTERNAL != tag ) {
1025 return;
1026 }
1027
1028 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1029 "sstore:stage:(global): process_cmd(%s)",
1030 ORTE_NAME_PRINT(sender)));
1031
1032 count = 1;
1033 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_STAGE_CMD))) {
1034 ORTE_ERROR_LOG(ret);
1035 goto cleanup;
1036 }
1037
1038 count = 1;
1039 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE )) ) {
1040 ORTE_ERROR_LOG(ret);
1041 goto cleanup;
1042 }
1043
1044 /*
1045 * If this was an application process contacting us, then act like an orted
1046 * instead of an HNP
1047 */
1048 if(OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_JOBID,
1049 ORTE_PROC_MY_NAME,
1050 sender)) {
1051
1052 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1053 "sstore:stage:(local): process_cmd(%s)",
1054 ORTE_NAME_PRINT(sender)));
1055
1056 orte_sstore_stage_local_process_cmd_action(sender, command, loc_id, buffer);
1057 return;
1058 }
1059
1060 /*
1061 * Find the referenced handle
1062 */
1063 if(NULL == (handle_info = find_handle_info(loc_id)) ) {
1064 ; /* JJH big problem */
1065 }
1066
1067 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1068 "sstore:stage:(global): process_cmd(%s) - Command = %s",
1069 ORTE_NAME_PRINT(sender),
1070 (ORTE_SSTORE_STAGE_PULL == command ? "Pull" :
1071 (ORTE_SSTORE_STAGE_PUSH == command ? "Push" :
1072 (ORTE_SSTORE_STAGE_REMOVE == command ? "Remove" :
1073 (ORTE_SSTORE_STAGE_DONE == command ? "Done" : "Unknown")))) ));
1074
1075 /*
1076 * Process the command
1077 */
1078 if( ORTE_SSTORE_STAGE_PULL == command ) {
1079 process_local_pull(sender, buffer, handle_info);
1080 }
1081 else if( ORTE_SSTORE_STAGE_PUSH == command ) {
1082 process_local_push(sender, buffer, handle_info);
1083 }
1084 else if( ORTE_SSTORE_STAGE_REMOVE == command ) {
1085 /* This is actually intended for the local coordinator */
1086 orte_sstore_stage_local_process_cmd_action(sender, command, loc_id, buffer);
1087 }
1088 else if( ORTE_SSTORE_STAGE_DONE == command ) {
1089 process_local_done(sender, buffer, handle_info);
1090 }
1091
1092 cleanup:
1093 return;
1094 }
1095
process_local_pull(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_global_snapshot_info_t * handle_info)1096 static int process_local_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info)
1097 {
1098 int ret, exit_status = ORTE_SUCCESS;
1099 opal_buffer_t *loc_buffer = NULL;
1100 orte_sstore_stage_cmd_flag_t command;
1101
1102 /*
1103 * Push back the requested information
1104 */
1105 loc_buffer = OBJ_NEW(opal_buffer_t);
1106
1107 command = ORTE_SSTORE_STAGE_PUSH;
1108 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1109 ORTE_ERROR_LOG(ret);
1110 exit_status = ret;
1111 goto cleanup;
1112 }
1113
1114 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1115 ORTE_ERROR_LOG(ret);
1116 exit_status = ret;
1117 goto cleanup;
1118 }
1119
1120 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->seq_num), 1, OPAL_INT))) {
1121 ORTE_ERROR_LOG(ret);
1122 exit_status = ret;
1123 goto cleanup;
1124 }
1125
1126 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->ref_name), 1, OPAL_STRING))) {
1127 ORTE_ERROR_LOG(ret);
1128 exit_status = ret;
1129 goto cleanup;
1130 }
1131
1132 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->app_local_location_fmt), 1, OPAL_STRING))) {
1133 ORTE_ERROR_LOG(ret);
1134 exit_status = ret;
1135 goto cleanup;
1136 }
1137
1138 if( orte_sstore_stage_enabled_caching ) {
1139 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->app_local_cache_location_fmt), 1, OPAL_STRING))) {
1140 ORTE_ERROR_LOG(ret);
1141 exit_status = ret;
1142 goto cleanup;
1143 }
1144 }
1145
1146 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->migrating), 1, OPAL_BOOL))) {
1147 ORTE_ERROR_LOG(ret);
1148 exit_status = ret;
1149 goto cleanup;
1150 }
1151
1152 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1153 orte_rml_send_callback, NULL))) {
1154 ORTE_ERROR_LOG(ret);
1155 exit_status = ret;
1156 goto cleanup;
1157 }
1158 /* loc_buffer should not be released here; the callback releases it */
1159 loc_buffer = NULL;
1160
1161 cleanup:
1162 if (NULL != loc_buffer) {
1163 OBJ_RELEASE(loc_buffer);
1164 loc_buffer = NULL;
1165 }
1166
1167 return exit_status;
1168 }
1169
process_local_push(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_global_snapshot_info_t * handle_info)1170 static int process_local_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info)
1171 {
1172 int ret, exit_status = ORTE_SUCCESS;
1173 orte_std_cntr_t count;
1174 size_t num_entries, i;
1175 orte_process_name_t name;
1176 bool ckpt_skipped = false;
1177 char * crs_comp = NULL;
1178 char * compress_comp = NULL;
1179 char * compress_postfix = NULL;
1180 char * proc_name = NULL;
1181 char * tmp_str = NULL;
1182 orte_filem_base_request_t *filem_request = NULL;
1183 orte_filem_base_process_set_t *p_set = NULL;
1184 orte_filem_base_file_set_t * f_set = NULL;
1185
1186 if( !orte_sstore_stage_skip_filem ) {
1187 filem_request = OBJ_NEW(orte_filem_base_request_t);
1188 /*
1189 * Define the process set:
1190 * Source (daemon) -> Sink (HNP)
1191 */
1192 p_set = OBJ_NEW(orte_filem_base_process_set_t);
1193 p_set->source.jobid = peer->jobid;
1194 p_set->source.vpid = peer->vpid;
1195 p_set->sink.jobid = ORTE_PROC_MY_NAME->jobid;
1196 p_set->sink.vpid = ORTE_PROC_MY_NAME->vpid;
1197 opal_list_append(&(filem_request->process_sets), &(p_set->super) );
1198 }
1199
1200 /*
1201 * Unpack the data
1202 */
1203 count = 1;
1204 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_entries, &count, OPAL_SIZE))) {
1205 ORTE_ERROR_LOG(ret);
1206 exit_status = ret;
1207 goto cleanup;
1208 }
1209
1210 for(i = 0; i < num_entries; ++i ) {
1211 count = 1;
1212 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &name, &count, ORTE_NAME))) {
1213 ORTE_ERROR_LOG(ret);
1214 exit_status = ret;
1215 goto cleanup;
1216 }
1217
1218 count = 1;
1219 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &ckpt_skipped, &count, OPAL_BOOL))) {
1220 ORTE_ERROR_LOG(ret);
1221 exit_status = ret;
1222 goto cleanup;
1223 }
1224
1225 if( !ckpt_skipped ) {
1226 count = 1;
1227 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &crs_comp, &count, OPAL_STRING))) {
1228 ORTE_ERROR_LOG(ret);
1229 exit_status = ret;
1230 goto cleanup;
1231 }
1232
1233 if( orte_sstore_stage_enabled_compression ) {
1234 count = 1;
1235 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &compress_comp, &count, OPAL_STRING))) {
1236 ORTE_ERROR_LOG(ret);
1237 exit_status = ret;
1238 goto cleanup;
1239 }
1240 if( NULL == handle_info->compress_comp ) {
1241 handle_info->compress_comp = strdup(compress_comp);
1242 }
1243
1244 count = 1;
1245 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &compress_postfix, &count, OPAL_STRING))) {
1246 ORTE_ERROR_LOG(ret);
1247 exit_status = ret;
1248 goto cleanup;
1249 }
1250 if( NULL == handle_info->compress_postfix ) {
1251 handle_info->compress_postfix = strdup(compress_postfix);
1252 }
1253 }
1254
1255 if( !orte_sstore_stage_skip_filem ) {
1256 /*
1257 * Append to the file set for movement
1258 */
1259 f_set = OBJ_NEW(orte_filem_base_file_set_t);
1260 if( orte_sstore_stage_enabled_compression ) {
1261 f_set->target_flag = ORTE_FILEM_TYPE_FILE;
1262 } else {
1263 f_set->target_flag = ORTE_FILEM_TYPE_DIR;
1264 }
1265
1266 if( orte_sstore_stage_enabled_compression ) {
1267 asprintf(&tmp_str,
1268 handle_info->app_global_location_fmt,
1269 name.vpid);
1270 asprintf(&(f_set->local_target), "%s%s",
1271 tmp_str,
1272 compress_postfix);
1273 } else {
1274 asprintf(&(f_set->local_target),
1275 handle_info->app_global_location_fmt,
1276 name.vpid);
1277 }
1278
1279 if( orte_sstore_stage_global_is_shared ) {
1280 f_set->local_hint = ORTE_FILEM_HINT_SHARED;
1281 }
1282
1283 if( orte_sstore_stage_enabled_compression ) {
1284 asprintf(&tmp_str,
1285 handle_info->app_local_location_fmt,
1286 name.vpid);
1287 asprintf(&(f_set->remote_target), "%s%s",
1288 tmp_str,
1289 compress_postfix);
1290 } else {
1291 asprintf(&(f_set->remote_target),
1292 handle_info->app_local_location_fmt,
1293 name.vpid);
1294 }
1295
1296 opal_list_append(&(filem_request->file_sets), &(f_set->super) );
1297
1298 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1299 "sstore:stage:(global): push(): Pulling remote file <%s> to <%s>",
1300 f_set->remote_target,
1301 f_set->local_target));
1302 }
1303
1304 /*
1305 * Write this information to the global metadata
1306 */
1307 orte_util_convert_process_name_to_string(&proc_name, &name);
1308
1309 metadata_write_str(handle_info,
1310 SSTORE_METADATA_INTERNAL_PROCESS_STR,
1311 proc_name);
1312 metadata_write_str(handle_info,
1313 SSTORE_METADATA_LOCAL_CRS_COMP_STR,
1314 crs_comp);
1315 if( orte_sstore_stage_enabled_compression ) {
1316 metadata_write_str(handle_info,
1317 SSTORE_METADATA_LOCAL_COMPRESS_COMP_STR,
1318 compress_comp);
1319 metadata_write_str(handle_info,
1320 SSTORE_METADATA_LOCAL_COMPRESS_POSTFIX_STR,
1321 compress_postfix);
1322 }
1323 }
1324
1325 if( NULL != crs_comp ) {
1326 free(crs_comp);
1327 crs_comp = NULL;
1328 }
1329 if( NULL != compress_comp ) {
1330 free(compress_comp);
1331 compress_comp = NULL;
1332 }
1333 if( NULL != compress_postfix ) {
1334 free(compress_postfix);
1335 compress_postfix = NULL;
1336 }
1337 if( NULL != proc_name ) {
1338 free(proc_name);
1339 proc_name = NULL;
1340 }
1341 if( NULL != tmp_str ) {
1342 free(tmp_str);
1343 tmp_str = NULL;
1344 }
1345
1346 (handle_info->num_procs_synced)++;
1347 }
1348
1349 if( !orte_sstore_stage_skip_filem && 0 < opal_list_get_size(&(filem_request->file_sets)) ) {
1350 /*
1351 * Start to pull the files to global storage
1352 */
1353 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1354 "sstore:stage:(global): push(): Pulling remote files from %s (%3d of %3d done)",
1355 ORTE_NAME_PRINT(peer),
1356 handle_info->num_procs_synced,
1357 handle_info->num_procs_total));
1358 opal_list_append(handle_info->filem_requests, &(filem_request->super));
1359 if(ORTE_SUCCESS != (ret = orte_filem.get_nb(filem_request)) ) {
1360 ORTE_ERROR_LOG(ret);
1361 exit_status = ret;
1362 goto cleanup;
1363 }
1364 }
1365
1366 cleanup:
1367 if( NULL != crs_comp ) {
1368 free(crs_comp);
1369 crs_comp = NULL;
1370 }
1371 if( NULL != compress_comp ) {
1372 free(compress_comp);
1373 compress_comp = NULL;
1374 }
1375 if( NULL != compress_postfix ) {
1376 free(compress_postfix);
1377 compress_postfix = NULL;
1378 }
1379 if( NULL != proc_name ) {
1380 free(proc_name);
1381 proc_name = NULL;
1382 }
1383 if( NULL != tmp_str ) {
1384 free(tmp_str);
1385 tmp_str = NULL;
1386 }
1387
1388 return exit_status;
1389 }
1390
process_local_done(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_global_snapshot_info_t * handle_info)1391 static int process_local_done(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_global_snapshot_info_t *handle_info)
1392 {
1393 int ret, exit_status = ORTE_SUCCESS;
1394 orte_std_cntr_t count;
1395 size_t num_entries;
1396
1397 /*
1398 * Unpack the data
1399 */
1400 count = 1;
1401 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_entries, &count, OPAL_SIZE))) {
1402 ORTE_ERROR_LOG(ret);
1403 exit_status = ret;
1404 goto cleanup;
1405 }
1406
1407 (handle_info->num_procs_done) += (int)num_entries;
1408
1409 SSTORE_STAGE_REPORT_PROGRESS(handle_info);
1410
1411 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1412 "sstore:stage:(global): done(): [Peer %s] Moved %d files (%3d of %3d reported as done)",
1413 ORTE_NAME_PRINT(peer),
1414 (int)num_entries,
1415 handle_info->num_procs_done,
1416 handle_info->num_procs_total));
1417
1418 cleanup:
1419 return exit_status;
1420 }
1421
init_global_snapshot_directory(orte_sstore_stage_global_snapshot_info_t * handle_info)1422 static int init_global_snapshot_directory(orte_sstore_stage_global_snapshot_info_t *handle_info)
1423 {
1424 int ret, exit_status = ORTE_SUCCESS;
1425 char * dir_name = NULL;
1426 mode_t my_mode = S_IRWXU;
1427
1428 /*
1429 * Make the snapshot directory from the uniq_global_snapshot_name
1430 */
1431 asprintf(&dir_name, "%s/%s",
1432 handle_info->base_location,
1433 handle_info->local_location);
1434 if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(dir_name, my_mode)) ) {
1435 ORTE_ERROR_LOG(ret);
1436 exit_status = ret;
1437 goto cleanup;
1438 }
1439
1440 /*
1441 * Open up the metadata file
1442 */
1443 if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1444 ORTE_ERROR_LOG(ret);
1445 exit_status = ret;
1446 goto cleanup;
1447 }
1448
1449 cleanup:
1450 if(NULL != dir_name) {
1451 free(dir_name);
1452 dir_name = NULL;
1453 }
1454
1455 return exit_status;
1456 }
1457
wait_all_filem(orte_sstore_stage_global_snapshot_info_t * handle_info)1458 static int wait_all_filem(orte_sstore_stage_global_snapshot_info_t *handle_info)
1459 {
1460 int ret, exit_status = ORTE_SUCCESS;
1461 opal_list_item_t* item = NULL;
1462
1463 if( orte_sstore_stage_skip_filem ) {
1464 return exit_status;
1465 }
1466
1467 /*
1468 * Wait for all the transfers to complete
1469 */
1470 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1471 "sstore:stage:(global): wait_all_filem(): Waiting on all outstanding FileM requests (%d)",
1472 (int)opal_list_get_size(handle_info->filem_requests) ));
1473
1474 if(ORTE_SUCCESS != (ret = orte_filem.wait_all(handle_info->filem_requests)) ) {
1475 ORTE_ERROR_LOG(ret);
1476 exit_status = ret;
1477 goto cleanup;
1478 }
1479
1480 /*
1481 * Remove the data on the remote side
1482 */
1483 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1484 "sstore:stage:(global): wait_all_filem(): Removing all local files"));
1485 if( ORTE_SUCCESS != (ret = xcast_remove_all(handle_info))) {
1486 ORTE_ERROR_LOG(ret);
1487 exit_status = ret;
1488 goto cleanup;
1489 }
1490
1491 /*
1492 * Touch all local checkpoints
1493 */
1494 sync_global_dir(handle_info);
1495
1496 /*
1497 * Wait for the removal to complete
1498 */
1499 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1500 "sstore:stage:(global): wait_all_filem(): Waiting for remove to finish..."));
1501 while(handle_info->num_procs_done < handle_info->num_procs_total) {
1502 opal_progress();
1503 }
1504
1505 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1506 "sstore:stage:(global): wait_all_filem(): All files have been transfered"));
1507
1508 cleanup:
1509 while (NULL != (item = opal_list_remove_first(handle_info->filem_requests) ) ) {
1510 OBJ_RELEASE(item);
1511 }
1512 OBJ_DESTRUCT(handle_info->filem_requests);
1513
1514 return exit_status;
1515 }
1516
xcast_remove_all(orte_sstore_stage_global_snapshot_info_t * handle_info)1517 static int xcast_remove_all(orte_sstore_stage_global_snapshot_info_t *handle_info)
1518 {
1519 int ret, exit_status = ORTE_SUCCESS;
1520 opal_buffer_t *loc_buffer = NULL;
1521 orte_sstore_stage_cmd_flag_t command;
1522 orte_grpcomm_signature_t *sig;
1523
1524 handle_info->num_procs_done = 0;
1525
1526 loc_buffer = OBJ_NEW(opal_buffer_t);
1527
1528 command = ORTE_SSTORE_STAGE_REMOVE;
1529 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1530 ORTE_ERROR_LOG(ret);
1531 exit_status = ret;
1532 goto cleanup;
1533 }
1534
1535 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1536 ORTE_ERROR_LOG(ret);
1537 exit_status = ret;
1538 goto cleanup;
1539 }
1540
1541 /* goes to all daemons */
1542 sig = OBJ_NEW(orte_grpcomm_signature_t);
1543 sig->signature = (orte_process_name_t*)malloc(sizeof(orte_process_name_t));
1544 sig->signature[0].jobid = ORTE_PROC_MY_NAME->jobid;
1545 sig->signature[0].vpid = ORTE_VPID_WILDCARD;
1546 if (ORTE_SUCCESS != (ret = orte_grpcomm.xcast(sig, ORTE_RML_TAG_SSTORE_INTERNAL, loc_buffer))) {
1547 ORTE_ERROR_LOG(ret);
1548 exit_status = ret;
1549 goto cleanup;
1550 }
1551
1552 /* loc_buffer should not be released here; the callback releases it */
1553 loc_buffer = NULL;
1554
1555 cleanup:
1556 if (NULL != loc_buffer) {
1557 OBJ_RELEASE(loc_buffer);
1558 loc_buffer = NULL;
1559 }
1560
1561 OBJ_RELEASE(sig);
1562
1563 return exit_status;
1564 }
1565
1566 /**************************
1567 * Metadata functions
1568 **************************/
metadata_open(orte_sstore_stage_global_snapshot_info_t * handle_info)1569 static int metadata_open(orte_sstore_stage_global_snapshot_info_t * handle_info)
1570 {
1571 /* If already open, then just return */
1572 if( NULL != handle_info->metadata ) {
1573 return ORTE_SUCCESS;
1574 }
1575
1576 if (NULL == (handle_info->metadata = fopen(handle_info->metadata_filename, "a")) ) {
1577 opal_output(orte_sstore_base_framework.framework_output,
1578 "sstore:stage:(global):init_dir() Unable to open the file (%s)\n",
1579 handle_info->metadata_filename);
1580 ORTE_ERROR_LOG(ORTE_ERROR);
1581 return ORTE_ERROR;
1582 }
1583
1584 return ORTE_SUCCESS;
1585 }
1586
metadata_close(orte_sstore_stage_global_snapshot_info_t * handle_info)1587 static int metadata_close(orte_sstore_stage_global_snapshot_info_t * handle_info)
1588 {
1589 /* If already closed, then just return */
1590 if( NULL == handle_info->metadata ) {
1591 return ORTE_SUCCESS;
1592 }
1593
1594 fclose(handle_info->metadata);
1595 handle_info->metadata = NULL;
1596
1597 return ORTE_SUCCESS;
1598 }
1599
metadata_write_int(orte_sstore_stage_global_snapshot_info_t * handle_info,char * key,int value)1600 static int metadata_write_int(orte_sstore_stage_global_snapshot_info_t * handle_info, char *key, int value)
1601 {
1602 int ret, exit_status = ORTE_SUCCESS;
1603
1604 /* Make sure the metadata file is open */
1605 if( NULL == handle_info->metadata ) {
1606 if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1607 ORTE_ERROR_LOG(ret);
1608 exit_status = ret;
1609 goto cleanup;
1610 }
1611 }
1612
1613 fprintf(handle_info->metadata, "%s%d\n", key, value);
1614
1615 cleanup:
1616 return exit_status;
1617 }
1618
metadata_write_str(orte_sstore_stage_global_snapshot_info_t * handle_info,char * key,char * value)1619 static int metadata_write_str(orte_sstore_stage_global_snapshot_info_t * handle_info, char *key, char *value)
1620 {
1621 int ret, exit_status = ORTE_SUCCESS;
1622
1623 /* Make sure the metadata file is open */
1624 if( NULL == handle_info->metadata ) {
1625 if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1626 ORTE_ERROR_LOG(ret);
1627 exit_status = ret;
1628 goto cleanup;
1629 }
1630 }
1631
1632 fprintf(handle_info->metadata, "%s%s\n", key, value);
1633
1634 cleanup:
1635 return exit_status;
1636 }
1637
metadata_write_timestamp(orte_sstore_stage_global_snapshot_info_t * handle_info)1638 static int metadata_write_timestamp(orte_sstore_stage_global_snapshot_info_t * handle_info)
1639 {
1640 int ret, exit_status = ORTE_SUCCESS;
1641 time_t timestamp;
1642
1643 /* Make sure the metadata file is open */
1644 if( NULL == handle_info->metadata ) {
1645 if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1646 ORTE_ERROR_LOG(ret);
1647 exit_status = ret;
1648 goto cleanup;
1649 }
1650 }
1651
1652 timestamp = time(NULL);
1653 fprintf(handle_info->metadata, "%s%s",
1654 SSTORE_METADATA_INTERNAL_TIME_STR,
1655 ctime(×tamp));
1656
1657 cleanup:
1658 return exit_status;
1659 }
1660
orte_sstore_stage_extract_global_metadata(orte_sstore_stage_global_snapshot_info_t * handle_info,orte_sstore_base_global_snapshot_info_t * global_snapshot)1661 static int orte_sstore_stage_extract_global_metadata(orte_sstore_stage_global_snapshot_info_t * handle_info,
1662 orte_sstore_base_global_snapshot_info_t *global_snapshot)
1663 {
1664 int exit_status = ORTE_SUCCESS;
1665 orte_sstore_base_local_snapshot_info_t *vpid_snapshot = NULL;
1666 opal_list_item_t* item = NULL;
1667 int i = 0;
1668
1669 /*
1670 * Cleanup the structure a bit, so we can refresh it below
1671 */
1672 while (NULL != (item = opal_list_remove_first(&global_snapshot->local_snapshots))) {
1673 OBJ_RELEASE(item);
1674 }
1675
1676 if( NULL != global_snapshot->start_time ) {
1677 free( global_snapshot->start_time );
1678 global_snapshot->start_time = NULL;
1679 }
1680
1681 if( NULL != global_snapshot->end_time ) {
1682 free( global_snapshot->end_time );
1683 global_snapshot->end_time = NULL;
1684 }
1685
1686 /*
1687 * Create a structure for each application process
1688 */
1689 for(i = 0; i < handle_info->num_procs_total; ++i) {
1690 vpid_snapshot = OBJ_NEW(orte_sstore_base_local_snapshot_info_t);
1691 vpid_snapshot->ss_handle = handle_info->id;
1692
1693 vpid_snapshot->process_name.jobid = handle_info->jobid;
1694 vpid_snapshot->process_name.vpid = i;
1695
1696 /* JJH: Currently we do not have this information since we do not save
1697 * individual vpid info in the Global SStore. It is in the metadata
1698 * though.
1699 */
1700 vpid_snapshot->crs_comp = NULL;
1701 if( NULL != handle_info->compress_comp ) {
1702 vpid_snapshot->compress_comp = strdup(handle_info->compress_comp);
1703 } else {
1704 vpid_snapshot->compress_comp = NULL;
1705 }
1706 if( NULL != handle_info->compress_postfix ) {
1707 vpid_snapshot->compress_postfix = strdup(handle_info->compress_postfix);
1708 } else {
1709 vpid_snapshot->compress_postfix = NULL;
1710 }
1711 vpid_snapshot->start_time = NULL;
1712 vpid_snapshot->end_time = NULL;
1713
1714 opal_list_append(&global_snapshot->local_snapshots, &(vpid_snapshot->super));
1715 }
1716
1717 return exit_status;
1718 }
1719
stage_snapshot_sort_compare_fn(opal_list_item_t ** a,opal_list_item_t ** b)1720 static int stage_snapshot_sort_compare_fn(opal_list_item_t **a,
1721 opal_list_item_t **b)
1722 {
1723 orte_sstore_base_local_snapshot_info_t *snap_a, *snap_b;
1724
1725 snap_a = (orte_sstore_base_local_snapshot_info_t*)(*a);
1726 snap_b = (orte_sstore_base_local_snapshot_info_t*)(*b);
1727
1728 if( snap_a->process_name.vpid > snap_b->process_name.vpid ) {
1729 return 1;
1730 }
1731 else if( snap_a->process_name.vpid == snap_b->process_name.vpid ) {
1732 return 0;
1733 }
1734 else {
1735 return -1;
1736 }
1737 }
1738
sstore_stage_report_progress(orte_sstore_stage_global_snapshot_info_t * handle_info)1739 static void sstore_stage_report_progress(orte_sstore_stage_global_snapshot_info_t *handle_info)
1740 {
1741 double perc_done;
1742
1743 perc_done = (handle_info->num_procs_total - handle_info->num_procs_done);
1744 perc_done = perc_done / (1.0 * handle_info->num_procs_total);
1745 perc_done = (perc_done-1)*(-100.0);
1746
1747 if( perc_done >= (handle_info->last_progress_report + orte_sstore_stage_progress_meter ) ||
1748 handle_info->last_progress_report == 0.0 ) {
1749 handle_info->last_progress_report = perc_done;
1750 opal_output(0, "sstore:stage: progress: %10.2f %c Finished\n",
1751 perc_done, '%');
1752 }
1753
1754 return;
1755 }
1756