1 /*
2 * Copyright (c) 2010 The Trustees of Indiana University.
3 * All rights reserved.
4 * Copyright (c) 2004-2011 The University of Tennessee and The University
5 * of Tennessee Research Foundation. All rights
6 * reserved.
7 * $COPYRIGHT$
8 *
9 * Additional copyrights may follow
10 *
11 * $HEADER$
12 */
13
14 /*
15 *
16 */
17
18 #include "orte_config.h"
19
20 #include <string.h>
21 #include <stdlib.h>
22 #include <sys/types.h>
23 #include <sys/stat.h>
24 #include <sys/wait.h>
25 #ifdef HAVE_UNISTD_H
26 #include <unistd.h>
27 #endif /* HAVE_UNISTD_H */
28
29 #include "orte/mca/mca.h"
30 #include "opal/mca/base/base.h"
31
32 #include "opal/mca/event/event.h"
33
34 #include "orte/constants.h"
35 #include "orte/util/show_help.h"
36 #include "opal/util/argv.h"
37 #include "opal/util/output.h"
38 #include "opal/util/opal_environ.h"
39 #include "opal/util/basename.h"
40 #include "opal/util/os_dirpath.h"
41
42 #include "opal/mca/compress/compress.h"
43 #include "opal/mca/compress/base/base.h"
44
45 #include "opal/threads/mutex.h"
46 #include "opal/threads/condition.h"
47
48 #include "orte/util/name_fns.h"
49 #include "orte/util/proc_info.h"
50 #include "orte/runtime/orte_globals.h"
51 #include "orte/runtime/orte_wait.h"
52 #include "orte/mca/errmgr/errmgr.h"
53 #include "orte/mca/rml/rml.h"
54 #include "orte/mca/rml/rml_types.h"
55 #include "orte/mca/odls/odls_types.h"
56 #include "orte/mca/filem/filem.h"
57 #include "orte/mca/filem/base/base.h"
58
59 #include "orte/mca/sstore/sstore.h"
60 #include "orte/mca/sstore/base/base.h"
61
62 #include "sstore_stage.h"
63
64 /**********
65 * Object stuff
66 **********/
67 #define SSTORE_LOCAL_NONE 0
68 #define SSTORE_LOCAL_ERROR 1
69 #define SSTORE_LOCAL_INIT 2
70 #define SSTORE_LOCAL_READY 3
71 #define SSTORE_LOCAL_SYNCED 4
72 #define SSTORE_LOCAL_DONE 5
73
74 struct orte_sstore_stage_local_snapshot_info_t {
75 /** List super object */
76 opal_list_item_t super;
77
78 /** */
79 orte_sstore_base_handle_t id;
80
81 /** Status */
82 int status;
83
84 /** Sequence Number */
85 int seq_num;
86
87 /** Global Reference Name */
88 char * global_ref_name;
89
90 /** Local Location Format String */
91 char * location_fmt;
92
93 /** Local Cache Location Format String */
94 char * cache_location_fmt;
95
96 /* Application info handles*/
97 opal_list_t *app_info_handle;
98
99 /** Compress Component used */
100 char * compress_comp;
101
102 /** Compress Component postfix */
103 char * compress_postfix;
104
105 /** Is this checkpoint representing a migration? */
106 bool migrating;
107 };
108 typedef struct orte_sstore_stage_local_snapshot_info_t orte_sstore_stage_local_snapshot_info_t;
109 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_stage_local_snapshot_info_t);
110
111 void orte_sstore_stage_local_snapshot_info_construct(orte_sstore_stage_local_snapshot_info_t *info);
112 void orte_sstore_stage_local_snapshot_info_destruct( orte_sstore_stage_local_snapshot_info_t *info);
113
114 OBJ_CLASS_INSTANCE(orte_sstore_stage_local_snapshot_info_t,
115 opal_list_item_t,
116 orte_sstore_stage_local_snapshot_info_construct,
117 orte_sstore_stage_local_snapshot_info_destruct);
118
119 struct orte_sstore_stage_local_app_snapshot_info_t {
120 /** List super object */
121 opal_list_item_t super;
122
123 /** Process Name associated with this entry */
124 orte_process_name_t name;
125
126 /** Local Location (Absolute Path) */
127 char * local_location;
128
129 /** Compressed Local Location (Absolute Path) */
130 char * compressed_local_location;
131
132 /** Local Cache Location (Absolute Path) */
133 char * local_cache_location;
134
135 /** Metadata File Name (Absolute Path) */
136 char * metadata_filename;
137
138 /** CRS Component used */
139 char * crs_comp;
140
141 /** If this app. skipped the checkpoint - usually for non-migrating procs */
142 bool ckpt_skipped;
143
144 /** Compression PID to wait on */
145 pid_t compress_pid;
146 };
147 typedef struct orte_sstore_stage_local_app_snapshot_info_t orte_sstore_stage_local_app_snapshot_info_t;
148 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_stage_local_app_snapshot_info_t);
149
150 void orte_sstore_stage_local_app_snapshot_info_construct(orte_sstore_stage_local_app_snapshot_info_t *info);
151 void orte_sstore_stage_local_app_snapshot_info_destruct( orte_sstore_stage_local_app_snapshot_info_t *info);
152
153 OBJ_CLASS_INSTANCE(orte_sstore_stage_local_app_snapshot_info_t,
154 opal_list_item_t,
155 orte_sstore_stage_local_app_snapshot_info_construct,
156 orte_sstore_stage_local_app_snapshot_info_destruct);
157
158
159
160 /**********
161 * Local Function and Variable Declarations
162 **********/
163 static bool is_global_listener_active = false;
164 static int sstore_stage_local_start_listener(void);
165 static int sstore_stage_local_stop_listener(void);
166 static void sstore_stage_local_recv(int status,
167 orte_process_name_t* sender,
168 opal_buffer_t* buffer,
169 orte_rml_tag_t tag,
170 void* cbdata);
171
172 static int process_global_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
173 static int process_global_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
174 static int process_global_remove(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
175 static int process_app_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
176 static int process_app_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info);
177
178 static orte_sstore_stage_local_snapshot_info_t *create_new_handle_info(orte_sstore_base_handle_t handle);
179 static orte_sstore_stage_local_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle);
180 static orte_sstore_stage_local_snapshot_info_t *find_handle_info_ref(char * ref, int seq);
181
182 static int append_new_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
183 orte_process_name_t *name);
184 static orte_sstore_stage_local_app_snapshot_info_t *find_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
185 orte_process_name_t *name);
186
187 static int pull_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info );
188 static int push_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info );
189
190 static int wait_all_apps_updated(orte_sstore_stage_local_snapshot_info_t *handle_info);
191
192 static int start_compression(orte_sstore_stage_local_snapshot_info_t *handle_info,
193 orte_sstore_stage_local_app_snapshot_info_t *app_info);
194 static void sstore_stage_local_compress_waitpid_cb(orte_proc_t *proc, void* cbdata);
195 static int wait_all_compressed(orte_sstore_stage_local_snapshot_info_t *handle_info);
196
197 static int orte_sstore_stage_local_preload_files(char **local_location, bool *skip_xfer,
198 char *global_loc, char *ref, char *postfix, int seq);
199
200 static int sstore_stage_create_local_dir(void);
201 static int sstore_stage_destroy_local_dir(void);
202
203 static int sstore_stage_create_cache(void);
204 static int sstore_stage_update_cache(orte_sstore_stage_local_snapshot_info_t *handle_info);
205 static int sstore_stage_destroy_cache(void);
206
207 static opal_list_t *active_handles = NULL;
208 static char * sstore_stage_local_basedir = NULL;
209
210 static char * sstore_stage_cache_basedir = NULL;
211
212 static char * sstore_stage_cache_current_dir = NULL;
213 static char * sstore_stage_cache_last_dir = NULL;
214
215 static opal_list_t * preload_filem_requests = NULL;
216
217 /**********
218 * Object stuff
219 **********/
orte_sstore_stage_local_snapshot_info_construct(orte_sstore_stage_local_snapshot_info_t * info)220 void orte_sstore_stage_local_snapshot_info_construct(orte_sstore_stage_local_snapshot_info_t *info)
221 {
222 info->id = 0;
223
224 info->status = SSTORE_LOCAL_NONE;
225
226 info->seq_num = -1;
227
228 info->global_ref_name = NULL;
229
230 info->location_fmt = NULL;
231
232 info->cache_location_fmt = NULL;
233
234 info->app_info_handle = OBJ_NEW(opal_list_t);
235
236 info->compress_comp = NULL;
237
238 info->compress_postfix = NULL;
239
240 info->migrating = false;
241 }
242
orte_sstore_stage_local_snapshot_info_destruct(orte_sstore_stage_local_snapshot_info_t * info)243 void orte_sstore_stage_local_snapshot_info_destruct( orte_sstore_stage_local_snapshot_info_t *info)
244 {
245 info->id = 0;
246
247 info->status = SSTORE_LOCAL_NONE;
248
249 info->seq_num = -1;
250
251 if( NULL != info->global_ref_name ) {
252 free( info->global_ref_name );
253 info->global_ref_name = NULL;
254 }
255
256 if( NULL != info->location_fmt ) {
257 free( info->location_fmt );
258 info->location_fmt = NULL;
259 }
260
261 if( NULL != info->cache_location_fmt ) {
262 free( info->cache_location_fmt );
263 info->cache_location_fmt = NULL;
264 }
265
266 if( NULL != info->app_info_handle ) {
267 OBJ_RELEASE(info->app_info_handle);
268 info->app_info_handle = NULL;
269 }
270
271 if( NULL != info->compress_comp ) {
272 free(info->compress_comp);
273 info->compress_comp = NULL;
274 }
275
276 if( NULL != info->compress_postfix ) {
277 free(info->compress_postfix);
278 info->compress_postfix = NULL;
279 }
280
281 info->migrating = false;
282 }
283
orte_sstore_stage_local_app_snapshot_info_construct(orte_sstore_stage_local_app_snapshot_info_t * info)284 void orte_sstore_stage_local_app_snapshot_info_construct(orte_sstore_stage_local_app_snapshot_info_t *info)
285 {
286 info->name.jobid = ORTE_JOBID_INVALID;
287 info->name.vpid = ORTE_VPID_INVALID;
288
289 info->local_location = NULL;
290 info->compressed_local_location = NULL;
291 info->local_cache_location = NULL;
292 info->metadata_filename = NULL;
293 info->crs_comp = NULL;
294 info->ckpt_skipped = false;
295 info->compress_pid = 0;
296 }
297
orte_sstore_stage_local_app_snapshot_info_destruct(orte_sstore_stage_local_app_snapshot_info_t * info)298 void orte_sstore_stage_local_app_snapshot_info_destruct( orte_sstore_stage_local_app_snapshot_info_t *info)
299 {
300 info->name.jobid = ORTE_JOBID_INVALID;
301 info->name.vpid = ORTE_VPID_INVALID;
302
303 if( NULL != info->local_location ) {
304 free(info->local_location);
305 info->local_location = NULL;
306 }
307
308 if( NULL != info->compressed_local_location ) {
309 free(info->compressed_local_location);
310 info->compressed_local_location = NULL;
311 }
312
313 if( NULL != info->local_cache_location ) {
314 free(info->local_cache_location);
315 info->local_cache_location = NULL;
316 }
317
318 if( NULL != info->metadata_filename ) {
319 free(info->metadata_filename);
320 info->metadata_filename = NULL;
321 }
322
323 if( NULL != info->crs_comp ) {
324 free(info->crs_comp);
325 info->crs_comp = NULL;
326 }
327
328 info->ckpt_skipped = false;
329
330 info->compress_pid = 0;
331 }
332
333 /******************
334 * Local functions
335 ******************/
orte_sstore_stage_local_module_init(void)336 int orte_sstore_stage_local_module_init(void)
337 {
338 int ret, exit_status = ORTE_SUCCESS;
339
340 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
341 "sstore:stage:(local): init()"));
342
343 if( NULL == active_handles ) {
344 active_handles = OBJ_NEW(opal_list_t);
345 }
346
347 if( NULL == preload_filem_requests ) {
348 preload_filem_requests = OBJ_NEW(opal_list_t);
349 }
350
351 /*
352 * Create the local storage directory
353 */
354 asprintf(&sstore_stage_local_basedir, "%s/%s/%s",
355 orte_sstore_stage_local_snapshot_dir,
356 ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
357 ORTE_SSTORE_LOCAL_SNAPSHOT_STAGE_DIR_NAME);
358 if( ORTE_SUCCESS != (ret = sstore_stage_create_local_dir()) ) {
359 ORTE_ERROR_LOG(ret);
360 exit_status = ret;
361 goto cleanup;
362 }
363
364 /*
365 * Create the local cache
366 */
367 if( orte_sstore_stage_enabled_caching ) {
368 asprintf(&sstore_stage_cache_basedir, "%s/%s/%s",
369 orte_sstore_stage_local_snapshot_dir,
370 ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
371 ORTE_SSTORE_LOCAL_SNAPSHOT_CACHE_DIR_NAME);
372
373 if( ORTE_SUCCESS != (ret = sstore_stage_create_cache()) ) {
374 ORTE_ERROR_LOG(ret);
375 exit_status = ret;
376 goto cleanup;
377 }
378 }
379
380 /*
381 * Setup a listener for the HNP/Apps
382 * We could be the HNP, in which case the listener is already registered.
383 */
384 if( !ORTE_PROC_IS_HNP ) {
385 if( ORTE_SUCCESS != (ret = sstore_stage_local_start_listener()) ) {
386 ORTE_ERROR_LOG(ret);
387 exit_status = ret;
388 goto cleanup;
389 }
390 }
391
392 cleanup:
393 return exit_status;
394 }
395
orte_sstore_stage_local_module_finalize(void)396 int orte_sstore_stage_local_module_finalize(void)
397 {
398 int ret, exit_status = ORTE_SUCCESS;
399 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
400 opal_list_item_t* item = NULL;
401 bool done = false;
402 int cur_time = 0, max_time = 120;
403
404 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
405 "sstore:stage:(local): finalize()"));
406
407 /*
408 * Wait for all active transfers to finish
409 */
410 if( !ORTE_PROC_IS_HNP ) {
411 done = false;
412 while( 0 < opal_list_get_size(active_handles) && !done ) {
413 done = true;
414 for(item = opal_list_get_first(active_handles);
415 item != opal_list_get_end(active_handles);
416 item = opal_list_get_next(item) ) {
417 handle_info = (orte_sstore_stage_local_snapshot_info_t*)item;
418 if( SSTORE_LOCAL_DONE != handle_info->status &&
419 SSTORE_LOCAL_NONE != handle_info->status &&
420 SSTORE_LOCAL_ERROR != handle_info->status ) {
421 done = false;
422 break;
423 }
424 }
425 if( done ) {
426 break;
427 }
428 else {
429 if( cur_time != 0 && cur_time % 30 == 0 ) {
430 opal_output(0, "---> Waiting for fin(): %3d / %3d\n",
431 cur_time, max_time);
432 }
433
434 opal_progress();
435 if( cur_time >= max_time ) {
436 break;
437 } else {
438 sleep(1);
439 }
440 cur_time++;
441 }
442 }
443 }
444
445 if( NULL != active_handles ) {
446 OBJ_RELEASE(active_handles);
447 }
448
449 if( NULL != preload_filem_requests ) {
450 OBJ_RELEASE(preload_filem_requests);
451 }
452
453 /*
454 * Shutdown the listener for the HNP/Apps
455 * We could be the HNP, in which case the listener is already deregistered.
456 */
457 if( !ORTE_PROC_IS_HNP ) {
458 if( ORTE_SUCCESS != (ret = sstore_stage_local_stop_listener()) ) {
459 ORTE_ERROR_LOG(ret);
460 exit_status = ret;
461 goto cleanup;
462 }
463 }
464
465 /*
466 * Destroy the local cache
467 */
468 if( orte_sstore_stage_enabled_caching ) {
469 if( ORTE_SUCCESS != (ret = sstore_stage_destroy_cache()) ) {
470 ORTE_ERROR_LOG(ret);
471 exit_status = ret;
472 goto cleanup;
473 }
474 }
475
476 /*
477 * Destroy the local storage directory
478 */
479 if( ORTE_SUCCESS != (ret = sstore_stage_destroy_local_dir()) ) {
480 ORTE_ERROR_LOG(ret);
481 exit_status = ret;
482 goto cleanup;
483 }
484
485 cleanup:
486 if( orte_sstore_stage_enabled_caching ) {
487 if( NULL != sstore_stage_cache_basedir ) {
488 free(sstore_stage_cache_basedir);
489 sstore_stage_cache_basedir = NULL;
490 }
491 }
492
493 if( NULL != sstore_stage_local_basedir ) {
494 free(sstore_stage_local_basedir);
495 sstore_stage_local_basedir = NULL;
496 }
497
498 return exit_status;
499 }
500
orte_sstore_stage_local_request_checkpoint_handle(orte_sstore_base_handle_t * handle,int seq,orte_jobid_t jobid)501 int orte_sstore_stage_local_request_checkpoint_handle(orte_sstore_base_handle_t *handle, int seq, orte_jobid_t jobid)
502 {
503 opal_output(0, "sstore:stage:(local): request_checkpoint_handle() Not implemented!");
504 return ORTE_ERR_NOT_IMPLEMENTED;
505 }
506
orte_sstore_stage_local_register(orte_sstore_base_handle_t handle)507 int orte_sstore_stage_local_register(orte_sstore_base_handle_t handle)
508 {
509 int ret, exit_status = ORTE_SUCCESS;
510 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
511
512 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
513 "sstore:stage:(local): register()"));
514
515 /*
516 * Create a handle
517 */
518 if( NULL == (handle_info = find_handle_info(handle)) ) {
519 handle_info = create_new_handle_info(handle);
520 }
521
522 /*
523 * Get basic information from Global SStore
524 */
525 if( ORTE_SUCCESS != (ret = pull_handle_info(handle_info)) ) {
526 ORTE_ERROR_LOG(ret);
527 exit_status = ret;
528 goto cleanup;
529 }
530
531 /*
532 * Wait here until the pull request has been satisfied
533 */
534 while(SSTORE_LOCAL_READY != handle_info->status &&
535 SSTORE_LOCAL_ERROR != handle_info->status ) {
536 opal_progress();
537 }
538
539 cleanup:
540 return exit_status;
541 }
542
orte_sstore_stage_local_get_attr(orte_sstore_base_handle_t handle,orte_sstore_base_key_t key,char ** value)543 int orte_sstore_stage_local_get_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char **value)
544 {
545 opal_output(0, "sstore:stage:(local): get_attr() Not implemented!");
546 return ORTE_ERR_NOT_IMPLEMENTED;
547 }
548
orte_sstore_stage_local_set_attr(orte_sstore_base_handle_t handle,orte_sstore_base_key_t key,char * value)549 int orte_sstore_stage_local_set_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char *value)
550 {
551 opal_output(0, "sstore:stage:(local): set_attr() Not implemented!");
552 return ORTE_ERR_NOT_IMPLEMENTED;
553 }
554
orte_sstore_stage_local_sync(orte_sstore_base_handle_t handle)555 int orte_sstore_stage_local_sync(orte_sstore_base_handle_t handle)
556 {
557 int ret, exit_status = ORTE_SUCCESS;
558 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
559
560 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
561 "sstore:stage:(local): sync()"));
562
563 /*
564 * Lookup the handle
565 */
566 handle_info = find_handle_info(handle);
567
568 /*
569 * Wait for all of the applications to update their metadata
570 */
571 if( ORTE_SUCCESS != (ret = wait_all_apps_updated(handle_info))) {
572 ORTE_ERROR_LOG(ret);
573 exit_status = ret;
574 goto cleanup;
575 }
576
577 /*
578 * Wait for compression to finish
579 */
580 if( orte_sstore_stage_enabled_compression ) {
581 if( ORTE_SUCCESS != (ret = wait_all_compressed(handle_info))) {
582 ORTE_ERROR_LOG(ret);
583 exit_status = ret;
584 goto cleanup;
585 }
586 }
587
588 /*
589 * Push information to the Global coordinator
590 */
591 if( ORTE_SUCCESS != (ret = push_handle_info(handle_info)) ) {
592 ORTE_ERROR_LOG(ret);
593 exit_status = ret;
594 goto cleanup;
595 }
596
597 handle_info->status = SSTORE_LOCAL_SYNCED;
598
599 cleanup:
600 return exit_status;
601 }
602
orte_sstore_stage_local_remove(orte_sstore_base_handle_t handle)603 int orte_sstore_stage_local_remove(orte_sstore_base_handle_t handle)
604 {
605 opal_output(0, "sstore:stage:(local): remove() Not implemented!");
606 return ORTE_ERR_NOT_IMPLEMENTED;
607 }
608
orte_sstore_stage_local_pack(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_base_handle_t handle)609 int orte_sstore_stage_local_pack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t handle)
610 {
611 int ret, exit_status = ORTE_SUCCESS;
612
613 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
614 "sstore:stage:(local): pack()"));
615
616 /*
617 * Lookup the handle
618 */
619
620
621 /*
622 * Pack the handle ID
623 */
624 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &handle, 1, ORTE_SSTORE_HANDLE))) {
625 ORTE_ERROR_LOG(ret);
626 exit_status = ret;
627 goto cleanup;
628 }
629
630 /*
631 * Pack any metadata
632 */
633
634 cleanup:
635 return exit_status;
636 }
637
orte_sstore_stage_local_unpack(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_base_handle_t * handle)638 int orte_sstore_stage_local_unpack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t *handle)
639 {
640 int ret, exit_status = ORTE_SUCCESS;
641 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
642 orte_std_cntr_t count;
643
644 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
645 "sstore:stage:(local): unpack()"));
646
647 /*
648 * Unpack the handle id
649 */
650 count = 1;
651 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, handle, &count, ORTE_SSTORE_HANDLE))) {
652 ORTE_ERROR_LOG(ret);
653 exit_status = ret;
654 goto cleanup;
655 }
656
657 /*
658 * Lookup the handle
659 */
660 if( NULL == (handle_info = find_handle_info(*handle)) ) {
661 handle_info = create_new_handle_info(*handle);
662 }
663
664 /*
665 * Unpack the metadata piggybacked on this message
666 */
667 if( ORTE_SUCCESS != (ret = process_global_push(peer, buffer, handle_info))) {
668 ORTE_ERROR_LOG(ret);
669 exit_status = ret;
670 goto cleanup;
671 }
672
673 cleanup:
674 return exit_status;
675 }
676
orte_sstore_stage_local_fetch_app_deps(orte_app_context_t * app)677 int orte_sstore_stage_local_fetch_app_deps(orte_app_context_t *app)
678 {
679 int ret, exit_status = ORTE_SUCCESS;
680 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
681 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
682 char **sstore_args = NULL;
683 char * req_snap_loc = NULL;
684 char * req_snap_global_ref = NULL;
685 char * req_snap_ref = NULL;
686 char * req_snap_postfix = NULL;
687 char * local_location = NULL;
688 char * req_snap_compress = NULL;
689 char * compress_local_location = NULL;
690 char * compress_ref = NULL;
691 char * tmp_str = NULL;
692 int req_snap_seq = 0;
693 int i;
694 orte_proc_t *child = NULL;
695 int loc_argc = 0;
696 bool skip_xfer = false;
697 char *sload = NULL;
698
699 orte_get_attribute(&app->attributes, ORTE_APP_SSTORE_LOAD, (void **)&sload, OPAL_STRING);
700
701 if(!ORTE_FLAG_TEST(app, ORTE_APP_FLAG_USED_ON_NODE) || NULL == sload) {
702 OPAL_OUTPUT_VERBOSE((30, mca_sstore_stage_component.super.output_handle,
703 "sstore:stage:(local): fetch_app_deps(%3d): Not for this daemon (%s, %d, %s)",
704 app->idx, (ORTE_FLAG_TEST(app, ORTE_APP_FLAG_USED_ON_NODE) ? "T" : "F"),
705 (int)app->num_procs, sload));
706 /* Nothing to do */
707 goto cleanup;
708 }
709
710 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
711 "sstore:stage:(local): fetch_app_deps(%3d): %s",
712 app->idx, sload));
713
714 /*
715 * Extract the 'ref:seq' parameter
716 */
717 sstore_args = opal_argv_split(sload, ':');
718 req_snap_loc = strdup(sstore_args[0]);
719 req_snap_global_ref = strdup(sstore_args[1]);
720 req_snap_ref = strdup(sstore_args[2]);
721 if( NULL == sstore_args[4] ) { /* Not compressed */
722 req_snap_seq = atoi( sstore_args[3]);
723 } else {
724 req_snap_compress = strdup(sstore_args[3]);
725 req_snap_postfix = strdup(sstore_args[4]);
726 req_snap_seq = atoi( sstore_args[5]);
727 }
728
729 handle_info = find_handle_info_ref(req_snap_global_ref, req_snap_seq);
730 if( NULL == handle_info ) {
731 /* No checkpoints known, just preload the checkpoint */
732 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
733 "sstore:stage:(local): fetch_app_deps(%3d): No known checkpoint [%s, %d]",
734 app->idx,
735 req_snap_ref,
736 req_snap_seq));
737 goto filem_preload;
738 }
739
740 /*
741 * If caching enabled, then look to see if we have this snapshot cached
742 * Do not cache if migrating, since checkpoints taken while migrating are
743 * not guaranteed to be globally taken.
744 */
745 if( orte_sstore_stage_enabled_caching && !handle_info->migrating ) {
746 /*
747 * Find the process
748 */
749 for (i=0; i < orte_local_children->size; i++) {
750 if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
751 continue;
752 }
753
754 if( app->idx == child->app_idx ) {
755 /*
756 * Find the app snapshot ref
757 */
758 app_info = find_app_handle_info(handle_info, &child->name);
759 break;
760 }
761 }
762
763 if( NULL == app_info ) {
764 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
765 "sstore:stage:(local): fetch_app_deps(%3d): No processes known for this app context",
766 app->idx));
767 goto filem_preload;
768 }
769
770 /*
771 * Do we have a cached version of this file?
772 */
773 if( NULL != app_info->local_cache_location &&
774 0 == (ret = access(app_info->local_cache_location, F_OK)) ) {
775 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
776 "sstore:stage:(local): fetch_app_deps(%3d): Using local cache. (%s)",
777 app->idx,
778 app_info->local_cache_location));
779
780 opal_argv_append(&loc_argc, &(app->argv), "-c");
781 opal_argv_append(&loc_argc, &(app->argv), app_info->local_cache_location);
782 goto cleanup;
783 } else {
784 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
785 "sstore:stage:(local): fetch_app_deps(%3d): No cache available for %s. (%s)",
786 app->idx,
787 ORTE_NAME_PRINT(&app_info->name),
788 app_info->local_cache_location));
789 }
790 }
791
792 filem_preload:
793 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
794 "sstore:stage:(local): fetch_app_deps(%3d): Fetch files from Central storage",
795 app->idx));
796
797 /*
798 * If we got here, then there is no cached directory, so just preload the
799 * files, update the argument set, and carry on.
800 */
801 if( ORTE_SUCCESS != (ret = orte_sstore_stage_local_preload_files(&local_location,
802 &skip_xfer,
803 req_snap_loc,
804 req_snap_ref,
805 req_snap_postfix,
806 req_snap_seq)) ) {
807 ORTE_ERROR_LOG(ret);
808 exit_status = ret;
809 goto cleanup;
810 }
811 opal_argv_append(&loc_argc, &(app->argv), "-l");
812 opal_argv_append(&loc_argc, &(app->argv), local_location);
813
814 /*
815 * Decompress files:
816 * opal-restart will do this for us on launch
817 */
818 if( !skip_xfer ) {
819 if( NULL != req_snap_compress && 0 < strlen(req_snap_compress) ) {
820 opal_argv_append(&loc_argc, &(app->argv), "-d");
821 opal_argv_append(&loc_argc, &(app->argv), req_snap_compress);
822 }
823 if( NULL != req_snap_postfix && 0 < strlen(req_snap_postfix) ) {
824 opal_argv_append(&loc_argc, &(app->argv), "-p");
825 opal_argv_append(&loc_argc, &(app->argv), req_snap_postfix);
826 }
827 }
828
829 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
830 "sstore:stage:(local): fetch_app_deps(%3d): Fetching to (%s)",
831 app->idx,
832 local_location));
833
834 cleanup:
835 if( NULL != req_snap_compress ) {
836 free(req_snap_compress);
837 req_snap_compress = NULL;
838 }
839
840 if( NULL != tmp_str ) {
841 free(tmp_str);
842 tmp_str = NULL;
843 }
844
845 if( NULL != compress_local_location ) {
846 free(compress_local_location);
847 compress_local_location = NULL;
848 }
849
850 if( NULL != compress_ref ) {
851 free(compress_ref);
852 compress_ref = NULL;
853 }
854
855 if( NULL != sstore_args ) {
856 opal_argv_free(sstore_args);
857 sstore_args = NULL;
858 }
859
860 if( NULL != req_snap_ref ) {
861 free(req_snap_ref);
862 req_snap_ref = NULL;
863 }
864
865 if( NULL != req_snap_postfix ) {
866 free(req_snap_postfix);
867 req_snap_postfix = NULL;
868 }
869
870 if( NULL != req_snap_loc ) {
871 free(req_snap_loc);
872 req_snap_loc = NULL;
873 }
874
875 if( NULL != req_snap_global_ref ) {
876 free(req_snap_global_ref);
877 req_snap_global_ref = NULL;
878 }
879
880 return exit_status;
881 }
882
orte_sstore_stage_local_wait_all_deps(void)883 int orte_sstore_stage_local_wait_all_deps(void)
884 {
885 int ret, exit_status = ORTE_SUCCESS;
886 opal_list_item_t* item = NULL;
887
888 /* Nothing being preloaded, so just move on */
889 if( 0 >= opal_list_get_size(preload_filem_requests) ) {
890 return ORTE_SUCCESS;
891 }
892
893 /*
894 * Wait for all files to move
895 */
896 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
897 "sstore:stage:(local): wait_all_deps(): Waiting on %d requests",
898 (int)opal_list_get_size(preload_filem_requests)));
899
900 if(ORTE_SUCCESS != (ret = orte_filem.wait_all(preload_filem_requests)) ) {
901 ORTE_ERROR_LOG(ret);
902 exit_status = ret;
903 goto cleanup;
904 }
905
906 /*
907 * Cache the restart files locally, so we can restart faster next time
908 * JJH: We already check the restart directory for a local copy before
909 * starting the transfer. So this feels unnecessary since the
910 * restart directory is always used as a cache, whether or not
911 * caching is enabled. The extra copy to the cache directory
912 * does not buy us anything.
913 */
914
915 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
916 "sstore:stage:(local): wait_all_deps(): Finished waiting on %d requests!",
917 (int)opal_list_get_size(preload_filem_requests)));
918
919 cleanup:
920 while (NULL != (item = opal_list_remove_first(preload_filem_requests) ) ) {
921 OBJ_RELEASE(item);
922 }
923
924 return exit_status;
925 }
926
927 /**************************
928 * Local functions
929 **************************/
create_new_handle_info(orte_sstore_base_handle_t handle)930 static orte_sstore_stage_local_snapshot_info_t *create_new_handle_info(orte_sstore_base_handle_t handle)
931 {
932 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
933 int i;
934 orte_proc_t *child = NULL;
935
936 if( NULL == active_handles ) {
937 active_handles = OBJ_NEW(opal_list_t);
938 }
939
940 handle_info = OBJ_NEW(orte_sstore_stage_local_snapshot_info_t);
941
942 handle_info->id = handle;
943
944 opal_list_append(active_handles, &(handle_info->super));
945
946 /*
947 * Create a sub structure for each child
948 */
949 for (i=0; i < orte_local_children->size; i++) {
950 if (NULL == (child = (orte_proc_t*)opal_pointer_array_get_item(orte_local_children, i))) {
951 continue;
952 }
953 append_new_app_handle_info(handle_info, &child->name);
954 }
955
956 handle_info->status = SSTORE_LOCAL_INIT;
957
958 return handle_info;
959 }
960
find_handle_info(orte_sstore_base_handle_t handle)961 static orte_sstore_stage_local_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle)
962 {
963 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
964 opal_list_item_t* item = NULL;
965
966 if( NULL == active_handles ) {
967 return NULL;
968 }
969
970 for(item = opal_list_get_first(active_handles);
971 item != opal_list_get_end(active_handles);
972 item = opal_list_get_next(item) ) {
973 handle_info = (orte_sstore_stage_local_snapshot_info_t*)item;
974
975 if( handle_info->id == handle ) {
976 return handle_info;
977 }
978 }
979
980 return NULL;
981 }
982
find_handle_info_ref(char * ref,int seq)983 static orte_sstore_stage_local_snapshot_info_t *find_handle_info_ref(char * ref, int seq)
984 {
985 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
986 opal_list_item_t* item = NULL;
987
988 if( NULL == active_handles ) {
989 return NULL;
990 }
991
992 for(item = opal_list_get_first(active_handles);
993 item != opal_list_get_end(active_handles);
994 item = opal_list_get_next(item) ) {
995 handle_info = (orte_sstore_stage_local_snapshot_info_t*)item;
996
997 if( 0 == strncmp(handle_info->global_ref_name, ref, strlen(ref)) &&
998 handle_info->seq_num == seq ) {
999 return handle_info;
1000 }
1001 }
1002
1003 return NULL;
1004 }
1005
append_new_app_handle_info(orte_sstore_stage_local_snapshot_info_t * handle_info,orte_process_name_t * name)1006 static int append_new_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
1007 orte_process_name_t *name)
1008 {
1009 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1010
1011 app_info = OBJ_NEW(orte_sstore_stage_local_app_snapshot_info_t);
1012
1013 app_info->name.jobid = name->jobid;
1014 app_info->name.vpid = name->vpid;
1015
1016 opal_list_append(handle_info->app_info_handle, &(app_info->super));
1017
1018 return ORTE_SUCCESS;
1019 }
1020
find_app_handle_info(orte_sstore_stage_local_snapshot_info_t * handle_info,orte_process_name_t * name)1021 static orte_sstore_stage_local_app_snapshot_info_t *find_app_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info,
1022 orte_process_name_t *name)
1023 {
1024 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1025 opal_list_item_t* item = NULL;
1026 orte_ns_cmp_bitmask_t mask;
1027
1028 for(item = opal_list_get_first(handle_info->app_info_handle);
1029 item != opal_list_get_end(handle_info->app_info_handle);
1030 item = opal_list_get_next(item) ) {
1031 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1032
1033 mask = ORTE_NS_CMP_ALL;
1034
1035 if (OPAL_EQUAL == orte_util_compare_name_fields(mask, &app_info->name, name)) {
1036 return app_info;
1037 }
1038 }
1039
1040 return NULL;
1041 }
1042
sstore_stage_local_start_listener(void)1043 static int sstore_stage_local_start_listener(void)
1044 {
1045 if( is_global_listener_active ) {
1046 return ORTE_SUCCESS;
1047 }
1048
1049 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL,
1050 ORTE_RML_PERSISTENT, sstore_stage_local_recv, NULL);
1051
1052 is_global_listener_active = true;
1053 return ORTE_SUCCESS;
1054 }
1055
sstore_stage_local_stop_listener(void)1056 static int sstore_stage_local_stop_listener(void)
1057 {
1058 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL);
1059 is_global_listener_active = false;
1060 return ORTE_SUCCESS;
1061 }
1062
sstore_stage_local_recv(int status,orte_process_name_t * sender,opal_buffer_t * buffer,orte_rml_tag_t tag,void * cbdata)1063 static void sstore_stage_local_recv(int status,
1064 orte_process_name_t* sender,
1065 opal_buffer_t* buffer,
1066 orte_rml_tag_t tag,
1067 void* cbdata)
1068 {
1069 int ret;
1070 orte_sstore_stage_cmd_flag_t command;
1071 orte_std_cntr_t count;
1072 orte_sstore_base_handle_t loc_id;
1073
1074 if( ORTE_RML_TAG_SSTORE_INTERNAL != tag ) {
1075 return;
1076 }
1077
1078 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1079 "sstore:stage:(local): process_cmd(%s)",
1080 ORTE_NAME_PRINT(sender)));
1081
1082 count = 1;
1083 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_STAGE_CMD))) {
1084 ORTE_ERROR_LOG(ret);
1085 goto cleanup;
1086 }
1087
1088 count = 1;
1089 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE )) ) {
1090 ORTE_ERROR_LOG(ret);
1091 goto cleanup;
1092 }
1093
1094 orte_sstore_stage_local_process_cmd_action(sender, command, loc_id, buffer);
1095
1096 cleanup:
1097 return;
1098 }
1099
orte_sstore_stage_local_process_cmd_action(orte_process_name_t * sender,orte_sstore_stage_cmd_flag_t command,orte_sstore_base_handle_t loc_id,opal_buffer_t * buffer)1100 int orte_sstore_stage_local_process_cmd_action(orte_process_name_t *sender,
1101 orte_sstore_stage_cmd_flag_t command,
1102 orte_sstore_base_handle_t loc_id,
1103 opal_buffer_t* buffer)
1104 {
1105 orte_sstore_stage_local_snapshot_info_t *handle_info = NULL;
1106
1107 /*
1108 * Find the referenced handle (Create if it does not exist)
1109 */
1110 if(NULL == (handle_info = find_handle_info(loc_id)) ) {
1111 handle_info = create_new_handle_info(loc_id);
1112 }
1113
1114 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1115 "sstore:stage:(local): process_cmd(%s) - Command = %s",
1116 ORTE_NAME_PRINT(sender),
1117 (ORTE_SSTORE_STAGE_PULL == command ? "Pull" :
1118 (ORTE_SSTORE_STAGE_PUSH == command ? "Push" :
1119 (ORTE_SSTORE_STAGE_REMOVE == command ? "Remove" :
1120 (ORTE_SSTORE_STAGE_DONE == command ? "Done" : "Unknown")))) ));
1121
1122 /*
1123 * Process the command
1124 */
1125 if( ORTE_SSTORE_STAGE_PULL == command ) {
1126 if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_HNP, sender)) {
1127 process_global_pull(sender, buffer, handle_info);
1128 } else {
1129 process_app_pull(sender, buffer, handle_info);
1130 }
1131 }
1132 else if( ORTE_SSTORE_STAGE_PUSH == command ) {
1133 if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_ALL, ORTE_PROC_MY_HNP, sender)) {
1134 process_global_push(sender, buffer, handle_info);
1135 } else {
1136 process_app_push(sender, buffer, handle_info);
1137 }
1138 }
1139 else if( ORTE_SSTORE_STAGE_REMOVE == command ) {
1140 /* The xcast from the root makes the 'sender' equal to this process :/
1141 * so we know it is the HNP, so just use that name */
1142 process_global_remove(ORTE_PROC_MY_HNP, buffer, handle_info);
1143 }
1144
1145 return ORTE_SUCCESS;
1146 }
1147
process_global_pull(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_local_snapshot_info_t * handle_info)1148 static int process_global_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1149 {
1150 /* JJH should be as simple as calling push_handle_info() */
1151 opal_output(0, "sstore:stage:(local): process_global_pull() Not implemented!");
1152 return ORTE_ERR_NOT_IMPLEMENTED;
1153 }
1154
process_global_push(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_local_snapshot_info_t * handle_info)1155 static int process_global_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1156 {
1157 int ret, exit_status = ORTE_SUCCESS;
1158 orte_std_cntr_t count;
1159 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1160 opal_list_item_t* item = NULL;
1161
1162 count = 1;
1163 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->seq_num), &count, OPAL_INT))) {
1164 ORTE_ERROR_LOG(ret);
1165 exit_status = ret;
1166 goto cleanup;
1167 }
1168
1169 count = 1;
1170 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->global_ref_name), &count, OPAL_STRING))) {
1171 ORTE_ERROR_LOG(ret);
1172 exit_status = ret;
1173 goto cleanup;
1174 }
1175
1176 count = 1;
1177 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->location_fmt), &count, OPAL_STRING))) {
1178 ORTE_ERROR_LOG(ret);
1179 exit_status = ret;
1180 goto cleanup;
1181 }
1182
1183 if( orte_sstore_stage_enabled_caching ) {
1184 count = 1;
1185 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->cache_location_fmt), &count, OPAL_STRING))) {
1186 ORTE_ERROR_LOG(ret);
1187 exit_status = ret;
1188 goto cleanup;
1189 }
1190 }
1191
1192 count = 1;
1193 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(handle_info->migrating), &count, OPAL_BOOL))) {
1194 ORTE_ERROR_LOG(ret);
1195 exit_status = ret;
1196 goto cleanup;
1197 }
1198
1199 /*
1200 * For each process we are working with
1201 */
1202 for(item = opal_list_get_first(handle_info->app_info_handle);
1203 item != opal_list_get_end(handle_info->app_info_handle);
1204 item = opal_list_get_next(item) ) {
1205 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1206
1207 if( NULL != app_info->local_location ) {
1208 free(app_info->local_location);
1209 app_info->local_location = NULL;
1210 }
1211 asprintf(&(app_info->local_location), handle_info->location_fmt, app_info->name.vpid);
1212
1213 if( orte_sstore_stage_enabled_caching ) {
1214 if( NULL != app_info->local_cache_location ) {
1215 free(app_info->local_cache_location);
1216 app_info->local_cache_location = NULL;
1217 }
1218 asprintf(&(app_info->local_cache_location), handle_info->cache_location_fmt, app_info->name.vpid);
1219 }
1220
1221 if( NULL != app_info->metadata_filename ) {
1222 free(app_info->metadata_filename);
1223 app_info->metadata_filename = NULL;
1224 }
1225 asprintf(&(app_info->metadata_filename), "%s/%s",
1226 app_info->local_location,
1227 orte_sstore_base_local_metadata_filename);
1228 }
1229
1230 cleanup:
1231 if( ORTE_SUCCESS == exit_status ) {
1232 handle_info->status = SSTORE_LOCAL_READY;
1233 } else {
1234 handle_info->status = SSTORE_LOCAL_ERROR;
1235 }
1236
1237 return exit_status;
1238 }
1239
process_global_remove(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_local_snapshot_info_t * handle_info)1240 static int process_global_remove(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1241 {
1242 int ret, exit_status = ORTE_SUCCESS;
1243 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1244 opal_list_item_t* item = NULL;
1245 opal_buffer_t *loc_buffer = NULL;
1246 orte_sstore_stage_cmd_flag_t command;
1247 size_t list_size;
1248 char * cmd = NULL;
1249
1250 /*
1251 * If not caching, then just remove the local copy
1252 * Or if migrating, since we do not cache checkpoints generated while
1253 * migrating.
1254 */
1255 if( !orte_sstore_stage_enabled_caching || handle_info->migrating ) {
1256 for(item = opal_list_get_first(handle_info->app_info_handle);
1257 item != opal_list_get_end(handle_info->app_info_handle);
1258 item = opal_list_get_next(item) ) {
1259 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1260
1261 asprintf(&cmd, "rm -rf %s", app_info->local_location);
1262 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1263 "sstore:stage:(local): update_cache(): Removing with command (%s)",
1264 cmd));
1265 system(cmd);
1266
1267 if( orte_sstore_stage_enabled_compression && NULL != app_info->compressed_local_location) {
1268 free(cmd);
1269 cmd = NULL;
1270
1271 asprintf(&cmd, "rm -rf %s", app_info->compressed_local_location);
1272 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1273 "sstore:stage:(local): update_cache(): Removing with command (%s)",
1274 cmd));
1275 system(cmd);
1276 }
1277 }
1278 }
1279 else {
1280 /*
1281 * Update the local cache
1282 */
1283 if( ORTE_SUCCESS != (ret = sstore_stage_update_cache(handle_info)) ) {
1284 ORTE_ERROR_LOG(ret);
1285 exit_status = ret;
1286 goto cleanup;
1287 }
1288 }
1289
1290 loc_buffer = OBJ_NEW(opal_buffer_t);
1291
1292 command = ORTE_SSTORE_STAGE_DONE;
1293 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1294 ORTE_ERROR_LOG(ret);
1295 exit_status = ret;
1296 goto cleanup;
1297 }
1298
1299 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1300 ORTE_ERROR_LOG(ret);
1301 exit_status = ret;
1302 goto cleanup;
1303 }
1304
1305 list_size = opal_list_get_size(handle_info->app_info_handle);
1306 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &list_size, 1, OPAL_SIZE))) {
1307 ORTE_ERROR_LOG(ret);
1308 exit_status = ret;
1309 goto cleanup;
1310 }
1311
1312 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1313 orte_rml_send_callback, NULL))) {
1314 ORTE_ERROR_LOG(ret);
1315 exit_status = ret;
1316 goto cleanup;
1317 }
1318
1319 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1320 "sstore:stage:(local): remove(): Sent done for %d files to %s",
1321 (int)list_size,
1322 ORTE_NAME_PRINT(peer)));
1323
1324 handle_info->status = SSTORE_LOCAL_DONE;
1325 /* loc_buffer should not be released here; the callback releases it */
1326 loc_buffer = NULL;
1327
1328 cleanup:
1329 if( NULL != cmd ) {
1330 free(cmd);
1331 cmd = NULL;
1332 }
1333
1334 if (NULL != loc_buffer) {
1335 OBJ_RELEASE(loc_buffer);
1336 loc_buffer = NULL;
1337 }
1338
1339 return exit_status;
1340 }
1341
process_app_pull(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_local_snapshot_info_t * handle_info)1342 static int process_app_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1343 {
1344 int ret, exit_status = ORTE_SUCCESS;
1345 opal_buffer_t *loc_buffer = NULL;
1346 orte_sstore_stage_cmd_flag_t command;
1347 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1348
1349 /*
1350 * Find this app's data
1351 */
1352 app_info = find_app_handle_info(handle_info, peer);
1353
1354 /*
1355 * Push back the requested information
1356 */
1357 loc_buffer = OBJ_NEW(opal_buffer_t);
1358
1359 command = ORTE_SSTORE_STAGE_PUSH;
1360 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1361 ORTE_ERROR_LOG(ret);
1362 exit_status = ret;
1363 goto cleanup;
1364 }
1365
1366 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1367 ORTE_ERROR_LOG(ret);
1368 exit_status = ret;
1369 goto cleanup;
1370 }
1371
1372 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->seq_num), 1, OPAL_INT))) {
1373 ORTE_ERROR_LOG(ret);
1374 exit_status = ret;
1375 goto cleanup;
1376 }
1377
1378 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->global_ref_name), 1, OPAL_STRING))) {
1379 ORTE_ERROR_LOG(ret);
1380 exit_status = ret;
1381 goto cleanup;
1382 }
1383
1384 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(app_info->local_location), 1, OPAL_STRING))) {
1385 ORTE_ERROR_LOG(ret);
1386 exit_status = ret;
1387 goto cleanup;
1388 }
1389
1390 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(app_info->metadata_filename), 1, OPAL_STRING))) {
1391 ORTE_ERROR_LOG(ret);
1392 exit_status = ret;
1393 goto cleanup;
1394 }
1395
1396 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1397 orte_rml_send_callback, NULL))) {
1398 ORTE_ERROR_LOG(ret);
1399 exit_status = ret;
1400 goto cleanup;
1401 }
1402
1403 /* loc_buffer should not be released here; the callback releases it */
1404 loc_buffer = NULL;
1405
1406 cleanup:
1407 if (NULL != loc_buffer) {
1408 OBJ_RELEASE(loc_buffer);
1409 loc_buffer = NULL;
1410 }
1411
1412 return exit_status;
1413 }
1414
process_app_push(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_stage_local_snapshot_info_t * handle_info)1415 static int process_app_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_stage_local_snapshot_info_t *handle_info)
1416 {
1417 int ret, exit_status = ORTE_SUCCESS;
1418 orte_std_cntr_t count;
1419 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1420
1421 /*
1422 * Find this app's data
1423 */
1424 app_info = find_app_handle_info(handle_info, peer);
1425
1426 /*
1427 * Unpack the data
1428 */
1429 count = 1;
1430 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(app_info->ckpt_skipped), &count, OPAL_BOOL))) {
1431 ORTE_ERROR_LOG(ret);
1432 exit_status = ret;
1433 goto cleanup;
1434 }
1435
1436 if( !app_info->ckpt_skipped ) {
1437 count = 1;
1438 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &(app_info->crs_comp), &count, OPAL_STRING))) {
1439 ORTE_ERROR_LOG(ret);
1440 exit_status = ret;
1441 goto cleanup;
1442 }
1443 }
1444
1445 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1446 "sstore:stage:(local): app_push(%s, skip=%s, %s)",
1447 ORTE_NAME_PRINT(&(app_info->name)),
1448 (app_info->ckpt_skipped ? "T" : "F"),
1449 app_info->crs_comp));
1450
1451 /* Compression started on sync() */
1452
1453 cleanup:
1454 return exit_status;
1455 }
1456
wait_all_apps_updated(orte_sstore_stage_local_snapshot_info_t * handle_info)1457 static int wait_all_apps_updated(orte_sstore_stage_local_snapshot_info_t *handle_info)
1458 {
1459 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1460 opal_list_item_t *item = NULL;
1461 bool is_done = true;
1462
1463 do {
1464 is_done = true;
1465 for(item = opal_list_get_first(handle_info->app_info_handle);
1466 item != opal_list_get_end(handle_info->app_info_handle);
1467 item = opal_list_get_next(item) ) {
1468 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1469
1470 if( NULL == app_info->crs_comp && !app_info->ckpt_skipped ) {
1471 is_done = false;
1472 break;
1473 }
1474 }
1475
1476 if( !is_done ) {
1477 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1478 "sstore:stage:(local): Waiting for appliccation %s",
1479 ORTE_NAME_PRINT(&(app_info->name)) ));
1480 opal_progress();
1481 }
1482 } while(!is_done);
1483
1484 return ORTE_SUCCESS;
1485 }
1486
start_compression(orte_sstore_stage_local_snapshot_info_t * handle_info,orte_sstore_stage_local_app_snapshot_info_t * app_info)1487 static int start_compression(orte_sstore_stage_local_snapshot_info_t *handle_info,
1488 orte_sstore_stage_local_app_snapshot_info_t *app_info)
1489 {
1490 int ret, exit_status = ORTE_SUCCESS;
1491 char * postfix = NULL;
1492 orte_proc_t *proc;
1493
1494 /* Sanity Check */
1495 if( !orte_sstore_stage_enabled_compression ) {
1496 goto cleanup;
1497 }
1498
1499 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1500 "sstore:stage:(local): start_compression() Starting compression for process %s of (%s)",
1501 ORTE_NAME_PRINT(&(app_info->name)),
1502 app_info->local_location ));
1503
1504 /*
1505 * Start compression (nonblocking)
1506 */
1507 if( ORTE_SUCCESS != (ret = opal_compress.compress_nb(app_info->local_location,
1508 &(app_info->compressed_local_location),
1509 &(postfix),
1510 &(app_info->compress_pid))) ) {
1511 ORTE_ERROR_LOG(ret);
1512 exit_status = ret;
1513 goto cleanup;
1514 }
1515 if( app_info->compress_pid <= 0 ) {
1516 ORTE_ERROR_LOG(ORTE_ERROR);
1517 exit_status = ORTE_ERROR;
1518 goto cleanup;
1519 }
1520
1521 if( NULL == handle_info->compress_comp ) {
1522 handle_info->compress_comp = strdup(opal_compress_base_selected_component.base_version.mca_component_name);
1523 handle_info->compress_postfix = strdup(postfix);
1524 }
1525
1526 /*
1527 * Setup a callback for when it is finished
1528 */
1529 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1530 "sstore:stage:(local): start_compression() Waiting for compression (%d) for process %s",
1531 app_info->compress_pid,
1532 ORTE_NAME_PRINT(&(app_info->name)) ));
1533
1534 proc = OBJ_NEW(orte_proc_t);
1535 proc->pid = app_info->compress_pid;
1536 /* be sure to mark it as alive so we don't instantly fire */
1537 ORTE_FLAG_SET(proc, ORTE_PROC_FLAG_ALIVE);
1538
1539 orte_wait_cb(proc, sstore_stage_local_compress_waitpid_cb, app_info);
1540
1541 cleanup:
1542 if( NULL != postfix ) {
1543 free(postfix);
1544 postfix = NULL;
1545 }
1546
1547 return exit_status;
1548 }
1549
sstore_stage_local_compress_waitpid_cb(orte_proc_t * proc,void * cbdata)1550 static void sstore_stage_local_compress_waitpid_cb(orte_proc_t *proc, void* cbdata)
1551 {
1552 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1553
1554 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)cbdata;
1555
1556 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1557 "sstore:stage:(local): waitpid(%6d) Compression finished for Process %s",
1558 (int)proc->pid,
1559 ORTE_NAME_PRINT(&(app_info->name)) ));
1560
1561 app_info->compress_pid = 0;
1562 OBJ_RELEASE(proc);
1563 }
1564
wait_all_compressed(orte_sstore_stage_local_snapshot_info_t * handle_info)1565 static int wait_all_compressed(orte_sstore_stage_local_snapshot_info_t *handle_info)
1566 {
1567 int ret, exit_status = ORTE_SUCCESS;
1568 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1569 opal_list_item_t *item = NULL;
1570 bool is_done = true;
1571 int usleep_time = 1000;
1572 int s_time = 0, max_wait_time;
1573
1574 /* Sanity Check */
1575 if( !orte_sstore_stage_enabled_compression ) {
1576 return ORTE_SUCCESS;
1577 }
1578
1579 /*
1580 * Start all compression
1581 */
1582 if( orte_sstore_stage_compress_delay > 0 ) {
1583 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1584 "sstore:stage:(local): Delaying %d second before starting compression...",
1585 orte_sstore_stage_compress_delay));
1586 max_wait_time = orte_sstore_stage_compress_delay * (1000000/usleep_time);
1587 for( s_time = 0; s_time < max_wait_time; ++s_time) {
1588 opal_progress();
1589 usleep(1000);
1590 }
1591 }
1592
1593 for(item = opal_list_get_first(handle_info->app_info_handle);
1594 item != opal_list_get_end(handle_info->app_info_handle);
1595 item = opal_list_get_next(item) ) {
1596 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1597
1598 if( ORTE_SUCCESS != (ret = start_compression(handle_info, app_info)) ) {
1599 ORTE_ERROR_LOG(ret);
1600 exit_status = ret;
1601 goto cleanup;
1602 }
1603 }
1604
1605 /*
1606 * Wait for compression to finish
1607 */
1608 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1609 "sstore:stage:(local): Waiting for compression to finish..."));
1610 do {
1611 is_done = true;
1612 for(item = opal_list_get_first(handle_info->app_info_handle);
1613 item != opal_list_get_end(handle_info->app_info_handle);
1614 item = opal_list_get_next(item) ) {
1615 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1616
1617 if( 0 < app_info->compress_pid ) {
1618 is_done = false;
1619 break;
1620 }
1621 }
1622
1623 if( !is_done ) {
1624 OPAL_OUTPUT_VERBOSE((30, mca_sstore_stage_component.super.output_handle,
1625 "sstore:stage:(local): Waiting for compression to finish for appliccation %s",
1626 ORTE_NAME_PRINT(&(app_info->name)) ));
1627 opal_progress();
1628 }
1629 } while(!is_done);
1630
1631 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1632 "sstore:stage:(local): Compression finished!"));
1633 cleanup:
1634 return exit_status;
1635 }
1636
pull_handle_info(orte_sstore_stage_local_snapshot_info_t * handle_info)1637 static int pull_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info )
1638 {
1639 int ret, exit_status = ORTE_SUCCESS;
1640 opal_buffer_t *buffer = NULL;
1641 orte_sstore_stage_cmd_flag_t command;
1642
1643 /*
1644 * Check to see if this is necessary
1645 * (Did we get all of the info from the handle unpack?)
1646 */
1647 if( 0 <= handle_info->seq_num &&
1648 NULL != handle_info->global_ref_name &&
1649 NULL != handle_info->location_fmt ) {
1650 handle_info->status = SSTORE_LOCAL_READY;
1651 return ORTE_SUCCESS;
1652 }
1653
1654 buffer = OBJ_NEW(opal_buffer_t);
1655
1656 /*
1657 * Ask the daemon to send us the info that we need
1658 */
1659 command = ORTE_SSTORE_STAGE_PULL;
1660 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1661 ORTE_ERROR_LOG(ret);
1662 exit_status = ret;
1663 goto cleanup;
1664 }
1665
1666 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1667 ORTE_ERROR_LOG(ret);
1668 exit_status = ret;
1669 goto cleanup;
1670 }
1671
1672 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
1673 ORTE_RML_TAG_SSTORE_INTERNAL,
1674 orte_rml_send_callback, NULL))) {
1675 ORTE_ERROR_LOG(ret);
1676 exit_status = ret;
1677 goto cleanup;
1678 }
1679
1680 /* buffer should not be released here; the callback releases it */
1681 buffer = NULL;
1682
1683 cleanup:
1684 if (NULL != buffer) {
1685 OBJ_RELEASE(buffer);
1686 buffer = NULL;
1687 }
1688
1689 return exit_status;
1690 }
1691
push_handle_info(orte_sstore_stage_local_snapshot_info_t * handle_info)1692 static int push_handle_info(orte_sstore_stage_local_snapshot_info_t *handle_info )
1693 {
1694 int ret, exit_status = ORTE_SUCCESS;
1695 opal_buffer_t *buffer = NULL;
1696 orte_sstore_stage_cmd_flag_t command;
1697 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1698 opal_list_item_t *item = NULL;
1699 size_t list_size;
1700
1701 buffer = OBJ_NEW(opal_buffer_t);
1702
1703 command = ORTE_SSTORE_STAGE_PUSH;
1704 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SSTORE_STAGE_CMD))) {
1705 ORTE_ERROR_LOG(ret);
1706 exit_status = ret;
1707 goto cleanup;
1708 }
1709
1710 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
1711 ORTE_ERROR_LOG(ret);
1712 exit_status = ret;
1713 goto cleanup;
1714 }
1715
1716 list_size = opal_list_get_size(handle_info->app_info_handle);
1717 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &list_size, 1, OPAL_SIZE))) {
1718 ORTE_ERROR_LOG(ret);
1719 exit_status = ret;
1720 goto cleanup;
1721 }
1722
1723 /*
1724 * For each process we are working with
1725 */
1726 for(item = opal_list_get_first(handle_info->app_info_handle);
1727 item != opal_list_get_end(handle_info->app_info_handle);
1728 item = opal_list_get_next(item) ) {
1729 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1730
1731 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->name), 1, ORTE_NAME))) {
1732 ORTE_ERROR_LOG(ret);
1733 exit_status = ret;
1734 goto cleanup;
1735 }
1736
1737 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->ckpt_skipped), 1, OPAL_BOOL))) {
1738 ORTE_ERROR_LOG(ret);
1739 exit_status = ret;
1740 goto cleanup;
1741 }
1742
1743 if( !app_info->ckpt_skipped ) {
1744 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(app_info->crs_comp), 1, OPAL_STRING))) {
1745 ORTE_ERROR_LOG(ret);
1746 exit_status = ret;
1747 goto cleanup;
1748 }
1749
1750 if( orte_sstore_stage_enabled_compression ) {
1751 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->compress_comp), 1, OPAL_STRING))) {
1752 ORTE_ERROR_LOG(ret);
1753 exit_status = ret;
1754 goto cleanup;
1755 }
1756 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->compress_postfix), 1, OPAL_STRING))) {
1757 ORTE_ERROR_LOG(ret);
1758 exit_status = ret;
1759 goto cleanup;
1760 }
1761 }
1762 }
1763 }
1764
1765 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
1766 orte_rml_send_callback, NULL))) {
1767 ORTE_ERROR_LOG(ret);
1768 exit_status = ret;
1769 goto cleanup;
1770 }
1771
1772 /* buffer should not be released here; the callback releases it */
1773 buffer = NULL;
1774
1775 cleanup:
1776 if (NULL != buffer) {
1777 OBJ_RELEASE(buffer);
1778 buffer = NULL;
1779 }
1780
1781 return exit_status;
1782 }
1783
sstore_stage_create_local_dir(void)1784 static int sstore_stage_create_local_dir(void)
1785 {
1786 int ret, exit_status = ORTE_SUCCESS;
1787 mode_t my_mode = S_IRWXU;
1788
1789 if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(sstore_stage_local_basedir, my_mode)) ) {
1790 ORTE_ERROR_LOG(ret);
1791 exit_status = ret;
1792 goto cleanup;
1793 }
1794
1795 cleanup:
1796 return exit_status;
1797 }
1798
sstore_stage_destroy_local_dir(void)1799 static int sstore_stage_destroy_local_dir(void)
1800 {
1801 int ret, exit_status = ORTE_SUCCESS;
1802 char * basedir_root = NULL;
1803
1804 asprintf(&basedir_root, "%s/%s",
1805 orte_sstore_stage_local_snapshot_dir,
1806 ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME);
1807
1808 if(OPAL_SUCCESS != (ret = opal_os_dirpath_destroy(basedir_root, true, NULL)) ) {
1809 ORTE_ERROR_LOG(ret);
1810 exit_status = ret;
1811 goto cleanup;
1812 }
1813
1814 cleanup:
1815 if( NULL != basedir_root ) {
1816 free(basedir_root);
1817 basedir_root = NULL;
1818 }
1819
1820 return exit_status;
1821 }
1822
sstore_stage_create_cache(void)1823 static int sstore_stage_create_cache(void)
1824 {
1825 int ret, exit_status = ORTE_SUCCESS;
1826 mode_t my_mode = S_IRWXU;
1827
1828 /* Sanity check */
1829 if( !orte_sstore_stage_enabled_caching ) {
1830 goto cleanup;
1831 }
1832
1833 if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(sstore_stage_cache_basedir, my_mode)) ) {
1834 ORTE_ERROR_LOG(ret);
1835 exit_status = ret;
1836 goto cleanup;
1837 }
1838
1839 cleanup:
1840 return exit_status;
1841 }
1842
sstore_stage_destroy_cache(void)1843 static int sstore_stage_destroy_cache(void)
1844 {
1845 int ret, exit_status = ORTE_SUCCESS;
1846
1847 /* Sanity check */
1848 if( !orte_sstore_stage_enabled_caching ) {
1849 goto cleanup;
1850 }
1851
1852 if(OPAL_SUCCESS != (ret = opal_os_dirpath_destroy(sstore_stage_cache_basedir, true, NULL)) ) {
1853 ORTE_ERROR_LOG(ret);
1854 exit_status = ret;
1855 goto cleanup;
1856 }
1857
1858 cleanup:
1859 return exit_status;
1860 }
1861
sstore_stage_update_cache(orte_sstore_stage_local_snapshot_info_t * handle_info)1862 static int sstore_stage_update_cache(orte_sstore_stage_local_snapshot_info_t *handle_info)
1863 {
1864 int ret, exit_status = ORTE_SUCCESS;
1865 char *cmd = NULL;
1866 mode_t my_mode = S_IRWXU;
1867 char *cache_dirname = NULL;
1868 orte_sstore_stage_local_app_snapshot_info_t *app_info = NULL;
1869 opal_list_item_t* item = NULL;
1870 size_t list_size;
1871
1872 /* Sanity Check */
1873 if( !orte_sstore_stage_enabled_caching || handle_info->migrating) {
1874 goto cleanup;
1875 }
1876
1877 list_size = opal_list_get_size(handle_info->app_info_handle);
1878 if( 0 >= list_size ) {
1879 /* No processes on this node, skip */
1880 exit_status = ORTE_SUCCESS;
1881 goto cleanup;
1882 }
1883
1884 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)opal_list_get_first(handle_info->app_info_handle);
1885 if( NULL == app_info ) {
1886 ORTE_ERROR_LOG(ORTE_ERROR);
1887 exit_status = ORTE_ERROR;
1888 goto cleanup;
1889 }
1890
1891 /*
1892 * Create the base cache directory
1893 */
1894 cache_dirname = opal_dirname(app_info->local_cache_location);
1895 if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(cache_dirname, my_mode)) ) {
1896 ORTE_ERROR_LOG(ret);
1897 exit_status = ret;
1898 goto cleanup;
1899 }
1900
1901 /*
1902 * For each process, move the current checkpoint to the cache directory
1903 * Cached snapshots are always stored uncompressed.
1904 */
1905 for(item = opal_list_get_first(handle_info->app_info_handle);
1906 item != opal_list_get_end(handle_info->app_info_handle);
1907 item = opal_list_get_next(item) ) {
1908 app_info = (orte_sstore_stage_local_app_snapshot_info_t*)item;
1909
1910 asprintf(&cmd, "mv %s %s", app_info->local_location, cache_dirname);
1911 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1912 "sstore:stage:(local): update_cache(): Caching snapshot for process %s [%s]",
1913 ORTE_NAME_PRINT(&app_info->name),
1914 cmd));
1915 system(cmd);
1916
1917 /* (JJH) Remove the cached files */
1918 if( orte_sstore_stage_enabled_compression && NULL != app_info->compressed_local_location) {
1919 free(cmd);
1920 cmd = NULL;
1921
1922 asprintf(&cmd, "rm -rf %s", app_info->compressed_local_location);
1923 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1924 "sstore:stage:(local): update_cache(): Removing with command (%s)",
1925 cmd));
1926 system(cmd);
1927 }
1928 }
1929
1930 /*
1931 * Remove the previous cached checkpoint
1932 */
1933 if( NULL != sstore_stage_cache_last_dir ) {
1934 asprintf(&cmd, "rm -rf %s", sstore_stage_cache_last_dir);
1935 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1936 "sstore:stage:(local): update_cache(): Removing old cache dir command (%s)",
1937 sstore_stage_cache_last_dir));
1938 system(cmd);
1939 }
1940
1941 /*
1942 * Update 'last' cache pointer
1943 */
1944 if( NULL != sstore_stage_cache_last_dir ) {
1945 free(sstore_stage_cache_last_dir);
1946 sstore_stage_cache_last_dir = NULL;
1947 }
1948 if( NULL != sstore_stage_cache_current_dir ) {
1949 sstore_stage_cache_last_dir = strdup(sstore_stage_cache_current_dir);
1950 }
1951
1952 /*
1953 * Update 'current' cache pointer
1954 */
1955 if( NULL != sstore_stage_cache_current_dir ) {
1956 free(sstore_stage_cache_current_dir);
1957 sstore_stage_cache_current_dir = NULL;
1958 }
1959 sstore_stage_cache_current_dir = strdup(cache_dirname);
1960
1961 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
1962 "sstore:stage:(local): update_cache(): Cache Pointers cur(%s), last(%s)",
1963 sstore_stage_cache_current_dir, sstore_stage_cache_last_dir));
1964
1965 cleanup:
1966 if( NULL != cmd ) {
1967 free(cmd);
1968 cmd = NULL;
1969 }
1970
1971 return exit_status;
1972 }
1973
orte_sstore_stage_local_preload_files(char ** local_location,bool * skip_xfer,char * global_loc,char * ref,char * postfix,int seq)1974 static int orte_sstore_stage_local_preload_files(char **local_location, bool *skip_xfer,
1975 char *global_loc, char *ref, char *postfix, int seq)
1976 {
1977 int ret, exit_status = ORTE_SUCCESS;
1978 mode_t my_mode = S_IRWXU;
1979 orte_filem_base_request_t *filem_request;
1980 orte_filem_base_process_set_t *p_set = NULL;
1981 orte_filem_base_file_set_t * f_set = NULL;
1982 char * full_local_location = NULL;
1983
1984 *skip_xfer = false;
1985
1986 if( NULL != *local_location) {
1987 free(*local_location);
1988 *local_location = NULL;
1989 }
1990
1991 /*
1992 * If the global directory is shared, then just reference directly
1993 *
1994 * Skip this optimization if compressing. Since decompressing on the
1995 * central storage would typically require a transfer to the local
1996 * disk to decompress, then transfer back. Eliminating all benefits
1997 * of the optimization.
1998 */
1999 /* (JJH) If we are going to use the preloaded restart files for subsequent
2000 * restarts then we actually always want to preload the files. This
2001 * way if we need to restart from the same checkpoint again, then
2002 * we can from the local restart cache.
2003 */
2004 #if 0
2005 if( orte_sstore_stage_global_is_shared &&
2006 (NULL == postfix || 0 >= strlen(postfix) ) ) {
2007 *local_location = strdup(global_loc);
2008 *skip_xfer = true;
2009 goto cleanup;
2010 }
2011 #endif
2012
2013 asprintf(local_location, "%s/%s/%s/%d",
2014 orte_sstore_stage_local_snapshot_dir,
2015 ORTE_SSTORE_LOCAL_SNAPSHOT_DIR_NAME,
2016 ORTE_SSTORE_LOCAL_SNAPSHOT_RESTART_DIR_NAME,
2017 seq);
2018 asprintf(&full_local_location, "%s/%s",
2019 *local_location,
2020 ref);
2021
2022 /*
2023 * If the snapshot already exists locally, just reuse it instead of
2024 * transfering it again.
2025 */
2026 if( 0 == (ret = access(full_local_location, F_OK)) ) {
2027 OPAL_OUTPUT_VERBOSE((10, mca_sstore_stage_component.super.output_handle,
2028 "sstore:stage:(local): preload_files() Local snapshot already exists, reuse it (%s)",
2029 full_local_location));
2030 *skip_xfer = true;
2031 goto cleanup;
2032 }
2033
2034 /*
2035 * Create the local restart directory
2036 */
2037 if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(*local_location, my_mode)) ) {
2038 ORTE_ERROR_LOG(ret);
2039 exit_status = ret;
2040 goto cleanup;
2041 }
2042
2043 /*
2044 * FileM request to move the checkpoint to the local directory
2045 */
2046 filem_request = OBJ_NEW(orte_filem_base_request_t);
2047
2048 /* Define the process set */
2049 p_set = OBJ_NEW(orte_filem_base_process_set_t);
2050 if( ORTE_PROC_IS_HNP ) {
2051 /* if I am the HNP, then use me as the source */
2052 p_set->source.jobid = ORTE_PROC_MY_NAME->jobid;
2053 p_set->source.vpid = ORTE_PROC_MY_NAME->vpid;
2054 }
2055 else {
2056 /* otherwise, set the HNP as the source */
2057 p_set->source.jobid = ORTE_PROC_MY_HNP->jobid;
2058 p_set->source.vpid = ORTE_PROC_MY_HNP->vpid;
2059 }
2060 p_set->sink.jobid = ORTE_PROC_MY_NAME->jobid;
2061 p_set->sink.vpid = ORTE_PROC_MY_NAME->vpid;
2062 opal_list_append(&(filem_request->process_sets), &(p_set->super) );
2063
2064 /* Define the file set */
2065 f_set = OBJ_NEW(orte_filem_base_file_set_t);
2066
2067 f_set->local_target = strdup(*local_location);
2068 if( NULL != postfix && 0 < strlen(postfix) ) {
2069 asprintf(&(f_set->remote_target), "%s/%s%s",
2070 global_loc,
2071 ref,
2072 postfix);
2073 } else {
2074 asprintf(&(f_set->remote_target), "%s/%s",
2075 global_loc,
2076 ref);
2077 }
2078 if( NULL != postfix && 0 < strlen(postfix) ) {
2079 f_set->target_flag = ORTE_FILEM_TYPE_FILE;
2080 } else {
2081 f_set->target_flag = ORTE_FILEM_TYPE_DIR;
2082 }
2083
2084 opal_list_append(&(filem_request->file_sets), &(f_set->super) );
2085
2086 /* Start getting the files */
2087 opal_list_append(preload_filem_requests, &(filem_request->super));
2088 if(ORTE_SUCCESS != (ret = orte_filem.get_nb(filem_request)) ) {
2089 ORTE_ERROR_LOG(ret);
2090 exit_status = ret;
2091 goto cleanup;
2092 }
2093
2094 cleanup:
2095 if( NULL != full_local_location ) {
2096 free(full_local_location);
2097 full_local_location = NULL;
2098 }
2099
2100 return exit_status;
2101 }
2102