1 /*
2 * Copyright (c) 2011 The Trustees of Indiana University.
3 * All rights reserved.
4 * Copyright (c) 2004-2011 The University of Tennessee and The University
5 * of Tennessee Research Foundation. All rights
6 * reserved.
7 * $COPYRIGHT$
8 *
9 * Additional copyrights may follow
10 *
11 * $HEADER$
12 */
13
14 /*
15 *
16 */
17
18 #include "orte_config.h"
19
20 #include <string.h>
21 #include <stdlib.h>
22 #include <sys/types.h>
23 #include <sys/stat.h>
24 #include <sys/wait.h>
25 #ifdef HAVE_UNISTD_H
26 #include <unistd.h>
27 #endif /* HAVE_UNISTD_H */
28
29 #include "orte/mca/mca.h"
30 #include "opal/mca/base/base.h"
31
32 #include "opal/mca/event/event.h"
33
34 #include "orte/constants.h"
35 #include "orte/util/show_help.h"
36 #include "opal/util/argv.h"
37 #include "opal/util/output.h"
38 #include "opal/util/opal_environ.h"
39 #include "opal/util/basename.h"
40 #include "opal/util/os_dirpath.h"
41
42 #include "opal/threads/mutex.h"
43 #include "opal/threads/condition.h"
44
45 #include "orte/util/name_fns.h"
46 #include "orte/util/proc_info.h"
47 #include "orte/runtime/orte_globals.h"
48 #include "orte/runtime/orte_wait.h"
49 #include "orte/mca/errmgr/errmgr.h"
50 #include "orte/mca/ess/ess.h"
51 #include "orte/mca/rml/rml.h"
52 #include "orte/mca/rml/rml_types.h"
53 #include "orte/mca/snapc/snapc.h"
54 #include "orte/mca/snapc/base/base.h"
55
56 #include "orte/mca/sstore/sstore.h"
57 #include "orte/mca/sstore/base/base.h"
58
59 #include "sstore_central.h"
60
61 #define SSTORE_HANDLE_TYPE_NONE 0
62 #define SSTORE_HANDLE_TYPE_CKPT 1
63 #define SSTORE_HANDLE_TYPE_RESTART 2
64
65 #define SSTORE_GLOBAL_NONE 0
66 #define SSTORE_GLOBAL_ERROR 1
67 #define SSTORE_GLOBAL_INIT 2
68 #define SSTORE_GLOBAL_REG 3
69 #define SSTORE_GLOBAL_SYNCING 4
70 #define SSTORE_GLOBAL_SYNCED 5
71
72 /**********
73 * Object Stuff
74 **********/
75 struct orte_sstore_central_global_snapshot_info_t {
76 /** List super object */
77 opal_list_item_t super;
78
79 /** */
80 orte_sstore_base_handle_t id;
81
82 /** Job ID */
83 orte_jobid_t jobid;
84
85 /** State */
86 int state;
87
88 /** Handle type */
89 int handle_type;
90
91 /** Sequence Number */
92 int seq_num;
93
94 /** Reference Name */
95 char * ref_name;
96
97 /** Local Location (Relative Path to base_location) */
98 char * local_location;
99
100 /** Application location format */
101 char * app_location_fmt;
102
103 /** Base location */
104 char * base_location;
105
106 /** Metadata File Name */
107 char *metadata_filename;
108
109 /** Metadata File Descriptor */
110 FILE *metadata;
111
112 /** Num procs in job */
113 int num_procs_total;
114
115 /** Num procs synced */
116 int num_procs_synced;
117
118 /** Is this checkpoint representing a migration? */
119 bool migrating;
120 };
121 typedef struct orte_sstore_central_global_snapshot_info_t orte_sstore_central_global_snapshot_info_t;
122 ORTE_DECLSPEC OBJ_CLASS_DECLARATION(orte_sstore_central_global_snapshot_info_t);
123
124 void orte_sstore_central_global_snapshot_info_construct(orte_sstore_central_global_snapshot_info_t *info);
125 void orte_sstore_central_global_snapshot_info_destruct( orte_sstore_central_global_snapshot_info_t *info);
126
127 OBJ_CLASS_INSTANCE(orte_sstore_central_global_snapshot_info_t,
128 opal_list_item_t,
129 orte_sstore_central_global_snapshot_info_construct,
130 orte_sstore_central_global_snapshot_info_destruct);
131
132
133 /**********
134 * Local Function and Variable Declarations
135 **********/
136 static bool is_global_listener_active = false;
137 static int sstore_central_global_start_listener(void);
138 static int sstore_central_global_stop_listener(void);
139 static void sstore_central_global_recv(int status,
140 orte_process_name_t* sender,
141 opal_buffer_t* buffer,
142 orte_rml_tag_t tag,
143 void* cbdata);
144 static int process_local_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_global_snapshot_info_t *handle_info);
145 static int process_local_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_global_snapshot_info_t *handle_info);
146
147 static orte_sstore_central_global_snapshot_info_t *create_new_handle_info(int seq, int type, orte_jobid_t jobid);
148 static orte_sstore_central_global_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle);
149 static orte_sstore_central_global_snapshot_info_t *find_handle_info_from_ref(char *ref, int seq);
150
151 static int metadata_open(orte_sstore_central_global_snapshot_info_t * handle_info);
152 static int metadata_close(orte_sstore_central_global_snapshot_info_t * handle_info);
153 static int metadata_write_int(orte_sstore_central_global_snapshot_info_t * handle_info, char * key, int value);
154 static int metadata_write_str(orte_sstore_central_global_snapshot_info_t * handle_info, char * key, char *value);
155 static int metadata_write_timestamp(orte_sstore_central_global_snapshot_info_t * handle_info);
156
157 static int init_global_snapshot_directory(orte_sstore_central_global_snapshot_info_t *handle_info);
158 static int central_snapshot_sort_compare_fn(opal_list_item_t **a,
159 opal_list_item_t **b);
160 static int orte_sstore_central_extract_global_metadata(orte_sstore_central_global_snapshot_info_t * handle_info,
161 orte_sstore_base_global_snapshot_info_t *global_snapshot);
162
163 static int next_handle_id = 1;
164
165 static opal_list_t *active_handles = NULL;
166
167 /**********
168 * Object stuff
169 **********/
orte_sstore_central_global_snapshot_info_construct(orte_sstore_central_global_snapshot_info_t * info)170 void orte_sstore_central_global_snapshot_info_construct(orte_sstore_central_global_snapshot_info_t *info)
171 {
172 info->id = next_handle_id;
173 next_handle_id++;
174
175 info->jobid = ORTE_JOBID_INVALID;
176
177 info->state = SSTORE_GLOBAL_NONE;
178
179 info->handle_type = SSTORE_HANDLE_TYPE_NONE;
180
181 info->seq_num = -1;
182
183 info->base_location = strdup(orte_sstore_base_global_snapshot_dir);
184
185 info->ref_name = NULL;
186 info->local_location = NULL;
187 info->app_location_fmt = NULL;
188
189 info->metadata_filename = NULL;
190 info->metadata = NULL;
191
192 info->num_procs_total = 0;
193 info->num_procs_synced = 0;
194
195 info->migrating = false;
196 }
197
orte_sstore_central_global_snapshot_info_destruct(orte_sstore_central_global_snapshot_info_t * info)198 void orte_sstore_central_global_snapshot_info_destruct( orte_sstore_central_global_snapshot_info_t *info)
199 {
200 info->id = 0;
201 info->seq_num = -1;
202
203 info->jobid = ORTE_JOBID_INVALID;
204
205 info->state = SSTORE_GLOBAL_NONE;
206
207 info->handle_type = SSTORE_HANDLE_TYPE_NONE;
208
209 if( NULL != info->ref_name ) {
210 free( info->ref_name );
211 info->ref_name = NULL;
212 }
213
214 if( NULL != info->local_location ) {
215 free( info->local_location );
216 info->local_location = NULL;
217 }
218
219 if( NULL != info->app_location_fmt ) {
220 free( info->app_location_fmt );
221 info->app_location_fmt = NULL;
222 }
223
224 if( NULL != info->base_location ) {
225 free( info->base_location );
226 info->base_location = NULL;
227 }
228
229 if( NULL != info->metadata_filename ) {
230 free( info->metadata_filename ) ;
231 info->metadata_filename = NULL;
232 }
233
234 if( NULL != info->metadata ) {
235 fclose(info->metadata);
236 info->metadata = NULL;
237 }
238
239 info->num_procs_total = 0;
240 info->num_procs_synced = 0;
241
242 info->migrating = false;
243 }
244
245 /******************
246 * Local functions
247 ******************/
orte_sstore_central_global_module_init(void)248 int orte_sstore_central_global_module_init(void)
249 {
250 int ret, exit_status = ORTE_SUCCESS;
251
252 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
253 "sstore:central:(global): init()"));
254
255 if( NULL == active_handles ) {
256 active_handles = OBJ_NEW(opal_list_t);
257 }
258
259 /*
260 * Setup a listener for the HNP/Apps
261 */
262 if( ORTE_SUCCESS != (ret = sstore_central_global_start_listener()) ) {
263 ORTE_ERROR_LOG(ret);
264 exit_status = ret;
265 goto cleanup;
266 }
267
268 exit_status = orte_sstore_central_local_module_init();
269
270 cleanup:
271 return exit_status;
272 }
273
orte_sstore_central_global_module_finalize(void)274 int orte_sstore_central_global_module_finalize(void)
275 {
276 int ret, exit_status = ORTE_SUCCESS;
277
278 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
279 "sstore:central:(global): finalize()"));
280
281 exit_status = orte_sstore_central_local_module_finalize();
282
283 if( NULL != active_handles ) {
284 OBJ_RELEASE(active_handles);
285 }
286
287 /*
288 * Shutdown the listener for the HNP/Apps
289 */
290 if( ORTE_SUCCESS != (ret = sstore_central_global_stop_listener()) ) {
291 ORTE_ERROR_LOG(ret);
292 exit_status = ret;
293 goto cleanup;
294 }
295
296 cleanup:
297 return exit_status;
298 }
299
orte_sstore_central_global_request_checkpoint_handle(orte_sstore_base_handle_t * handle,int seq,orte_jobid_t jobid)300 int orte_sstore_central_global_request_checkpoint_handle(orte_sstore_base_handle_t *handle, int seq, orte_jobid_t jobid)
301 {
302 int ret, exit_status = ORTE_SUCCESS;
303 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
304
305 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
306 "sstore:central:(global): request_checkpoint_handle()"));
307
308 /*
309 * Construct a handle
310 * - Associate all of the necessary information
311 */
312 handle_info = create_new_handle_info(seq, SSTORE_HANDLE_TYPE_CKPT, jobid);
313
314 /*
315 * Create the global checkpoint directory
316 */
317 if( ORTE_SUCCESS != (ret = init_global_snapshot_directory(handle_info)) ) {
318 ORTE_ERROR_LOG(ret);
319 exit_status = ret;
320 goto cleanup;
321 }
322
323 /*
324 * Return the handle
325 */
326 *handle = handle_info->id;
327
328 cleanup:
329 return exit_status;
330 }
331
orte_sstore_central_global_request_restart_handle(orte_sstore_base_handle_t * handle,char * basedir,char * ref,int seq,orte_sstore_base_global_snapshot_info_t * snapshot)332 int orte_sstore_central_global_request_restart_handle(orte_sstore_base_handle_t *handle, char *basedir,
333 char *ref, int seq,
334 orte_sstore_base_global_snapshot_info_t *snapshot)
335 {
336 int ret, exit_status = ORTE_SUCCESS;
337 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
338
339 handle_info = find_handle_info_from_ref(ref, seq);
340 if( NULL == handle_info ) {
341 ret = ORTE_ERROR;
342 ORTE_ERROR_LOG(ret);
343 exit_status = ret;
344 goto cleanup;
345 }
346
347 *handle = handle_info->id;
348
349 cleanup:
350 return exit_status;
351 }
352
orte_sstore_central_global_request_global_snapshot_data(orte_sstore_base_handle_t * handle,orte_sstore_base_global_snapshot_info_t * snapshot)353 int orte_sstore_central_global_request_global_snapshot_data(orte_sstore_base_handle_t *handle,
354 orte_sstore_base_global_snapshot_info_t *snapshot)
355 {
356 int ret, exit_status = ORTE_SUCCESS;
357 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
358
359 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
360 "sstore:central:(global): request_global_snapshot_data()"));
361
362 /*
363 * Lookup the handle (if NULL, use last stable)
364 */
365 if( NULL != handle ) {
366 handle_info = find_handle_info(*handle);
367 snapshot->ss_handle = *handle;
368 } else {
369 handle_info = find_handle_info(orte_sstore_handle_last_stable);
370 snapshot->ss_handle = orte_sstore_handle_last_stable;
371 }
372
373 /*
374 * Construct the snapshot from local data, and metadata file
375 */
376 snapshot->seq_num = handle_info->seq_num;
377 snapshot->reference = strdup(handle_info->ref_name);
378 snapshot->basedir = strdup(handle_info->base_location);
379 snapshot->metadata_filename = strdup(handle_info->metadata_filename);
380
381 /* If this is the current checkpoint, pull data from local cache */
382 if( orte_sstore_handle_current == snapshot->ss_handle ) {
383 if( ORTE_SUCCESS != (ret = orte_sstore_central_extract_global_metadata(handle_info, snapshot)) ) {
384 ORTE_ERROR_LOG(ret);
385 exit_status = ret;
386 goto cleanup;
387 }
388 }
389 /* Otherwise, pull from metadata */
390 else {
391 if( ORTE_SUCCESS != (ret = orte_sstore_base_extract_global_metadata(snapshot)) ) {
392 ORTE_ERROR_LOG(ret);
393 exit_status = ret;
394 goto cleanup;
395 }
396 }
397
398 opal_list_sort(&snapshot->local_snapshots, central_snapshot_sort_compare_fn);
399
400 cleanup:
401 return exit_status;
402 }
403
orte_sstore_central_global_register(orte_sstore_base_handle_t handle)404 int orte_sstore_central_global_register(orte_sstore_base_handle_t handle)
405 {
406 int ret, exit_status = ORTE_SUCCESS;
407 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
408
409 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
410 "sstore:central:(global): register(%d) - Global", handle));
411
412 /*
413 * Lookup the handle
414 */
415 handle_info = find_handle_info(handle);
416 if( SSTORE_GLOBAL_REG != handle_info->state ) {
417 handle_info->state = SSTORE_GLOBAL_REG;
418 } else {
419 return orte_sstore_central_local_register(handle);
420 }
421
422 orte_sstore_handle_current = handle;
423
424 /*
425 * Associate the metadata
426 */
427 if( handle_info->migrating ) {
428 if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
429 SSTORE_METADATA_INTERNAL_MIG_SEQ_STR,
430 handle_info->seq_num)) ) {
431 ORTE_ERROR_LOG(ret);
432 exit_status = ret;
433 goto cleanup;
434 }
435 } else {
436 if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
437 SSTORE_METADATA_GLOBAL_SNAP_SEQ_STR,
438 handle_info->seq_num)) ) {
439 ORTE_ERROR_LOG(ret);
440 exit_status = ret;
441 goto cleanup;
442 }
443 }
444
445 if( ORTE_SUCCESS != (ret = metadata_write_str(handle_info,
446 SSTORE_METADATA_LOCAL_SNAP_REF_FMT_STR,
447 orte_sstore_base_local_snapshot_fmt)) ) {
448 ORTE_ERROR_LOG(ret);
449 exit_status = ret;
450 goto cleanup;
451 }
452
453 if( ORTE_SUCCESS != (ret = metadata_write_timestamp(handle_info)) ) {
454 ORTE_ERROR_LOG(ret);
455 exit_status = ret;
456 goto cleanup;
457 }
458
459 cleanup:
460 return exit_status;
461 }
462
orte_sstore_central_global_get_attr(orte_sstore_base_handle_t handle,orte_sstore_base_key_t key,char ** value)463 int orte_sstore_central_global_get_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char **value)
464 {
465 int exit_status = ORTE_SUCCESS;
466 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
467
468 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
469 "sstore:central:(global): get_attr()"));
470
471 /*
472 * Lookup the handle
473 */
474 handle_info = find_handle_info(handle);
475
476 /*
477 * Access metadata
478 */
479 if( SSTORE_METADATA_GLOBAL_SNAP_REF == key ) {
480 *value = strdup(handle_info->ref_name);
481 }
482 else if( SSTORE_METADATA_GLOBAL_SNAP_SEQ == key ) {
483 asprintf(value, "%d", handle_info->seq_num);
484 }
485 else if( SSTORE_METADATA_LOCAL_SNAP_REF_FMT == key ) {
486 *value = strdup(orte_sstore_base_local_snapshot_fmt);
487 }
488 /* 'central' does not cache, so these are the same */
489 else if( SSTORE_METADATA_LOCAL_SNAP_LOC == key ) {
490 asprintf(value, "%s/%s/%d",
491 handle_info->base_location,
492 handle_info->ref_name,
493 handle_info->seq_num);
494 }
495 else if( SSTORE_METADATA_LOCAL_SNAP_REF_LOC_FMT == key ) {
496 asprintf(value, "%s/%s/%d/%s",
497 handle_info->base_location,
498 handle_info->ref_name,
499 handle_info->seq_num,
500 orte_sstore_base_local_snapshot_fmt);
501 }
502 else {
503 exit_status = ORTE_ERR_NOT_SUPPORTED;
504 }
505
506 return exit_status;
507 }
508
orte_sstore_central_global_set_attr(orte_sstore_base_handle_t handle,orte_sstore_base_key_t key,char * value)509 int orte_sstore_central_global_set_attr(orte_sstore_base_handle_t handle, orte_sstore_base_key_t key, char *value)
510 {
511 int ret, exit_status = ORTE_SUCCESS;
512 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
513 char *key_str = NULL;
514
515 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
516 "sstore:central:(global): set_attr()"));
517
518 /*
519 * Lookup the handle
520 */
521 handle_info = find_handle_info(handle);
522
523 /*
524 * Process key (Access metadata)
525 */
526 if( key == SSTORE_METADATA_GLOBAL_MIGRATING ) {
527 handle_info->migrating = true;
528 }
529 else {
530 orte_sstore_base_convert_key_to_string(key, &key_str);
531 if( NULL == key_str ) {
532 ORTE_ERROR_LOG(ORTE_ERROR);
533 exit_status = ORTE_ERROR;
534 goto cleanup;
535 }
536
537 if( ORTE_SUCCESS != (ret = metadata_write_str(handle_info, key_str, value))) {
538 ORTE_ERROR_LOG(ret);
539 exit_status = ret;
540 goto cleanup;
541 }
542 }
543
544 cleanup:
545 if( NULL != key_str ) {
546 free(key_str);
547 key_str = NULL;
548 }
549
550 return exit_status;
551 }
552
orte_sstore_central_global_sync(orte_sstore_base_handle_t handle)553 int orte_sstore_central_global_sync(orte_sstore_base_handle_t handle)
554 {
555 int ret, exit_status = ORTE_SUCCESS;
556 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
557
558 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
559 "sstore:central:(global): sync()"));
560
561 /*
562 * Lookup the handle
563 */
564 handle_info = find_handle_info(handle);
565 if( SSTORE_GLOBAL_SYNCING != handle_info->state ) {
566 handle_info->state = SSTORE_GLOBAL_SYNCING;
567 if( ORTE_SNAPC_LOCAL_COORD_TYPE == (orte_snapc_coord_type & ORTE_SNAPC_LOCAL_COORD_TYPE) ) {
568 return orte_sstore_central_local_sync(handle);
569 }
570 }
571
572 /*
573 * Synchronize all of the files
574 */
575 while(handle_info->num_procs_synced < handle_info->num_procs_total) {
576 opal_progress();
577 }
578
579 /*
580 * Finalize and close the metadata
581 */
582 if( ORTE_SUCCESS != (ret = metadata_write_timestamp(handle_info)) ) {
583 ORTE_ERROR_LOG(ret);
584 exit_status = ret;
585 goto cleanup;
586 }
587
588 if( handle_info->migrating ) {
589 if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
590 SSTORE_METADATA_INTERNAL_DONE_MIG_SEQ_STR,
591 handle_info->seq_num)) ) {
592 ORTE_ERROR_LOG(ret);
593 exit_status = ret;
594 goto cleanup;
595 }
596 } else {
597 if( ORTE_SUCCESS != (ret = metadata_write_int(handle_info,
598 SSTORE_METADATA_INTERNAL_DONE_SEQ_STR,
599 handle_info->seq_num)) ) {
600 ORTE_ERROR_LOG(ret);
601 exit_status = ret;
602 goto cleanup;
603 }
604 }
605
606 if( ORTE_SUCCESS != (ret = metadata_close(handle_info)) ) {
607 ORTE_ERROR_LOG(ret);
608 exit_status = ret;
609 goto cleanup;
610 }
611
612 /* JJH: We should lock this var! */
613 if( !handle_info->migrating ) {
614 orte_sstore_base_is_checkpoint_available = true;
615 orte_sstore_handle_last_stable = orte_sstore_handle_current;
616 }
617
618 cleanup:
619 return exit_status;
620 }
621
orte_sstore_central_global_remove(orte_sstore_base_handle_t handle)622 int orte_sstore_central_global_remove(orte_sstore_base_handle_t handle)
623 {
624 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
625 "sstore:central:(global): remove()"));
626
627 /*
628 * Lookup the handle
629 */
630
631 return ORTE_SUCCESS;
632 }
633
orte_sstore_central_global_pack(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_base_handle_t handle)634 int orte_sstore_central_global_pack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t handle)
635 {
636 int ret, exit_status = ORTE_SUCCESS;
637 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
638
639 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
640 "sstore:central:(global): pack()"));
641
642 /*
643 * Lookup the handle
644 */
645 handle_info = find_handle_info(handle);
646
647 /*
648 * Pack the handle ID
649 */
650 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &handle, 1, ORTE_SSTORE_HANDLE))) {
651 ORTE_ERROR_LOG(ret);
652 exit_status = ret;
653 goto cleanup;
654 }
655
656 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
657 "sstore:central:(global): pack(%d, %d, %s)",
658 handle_info->id,
659 handle_info->seq_num,
660 handle_info->ref_name));
661
662 /*
663 * Pack any metadata
664 */
665 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->seq_num), 1, OPAL_INT )) ) {
666 ORTE_ERROR_LOG(ret);
667 exit_status = ret;
668 goto cleanup;
669 }
670
671 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->ref_name), 1, OPAL_STRING )) ) {
672 ORTE_ERROR_LOG(ret);
673 exit_status = ret;
674 goto cleanup;
675 }
676
677 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(handle_info->app_location_fmt), 1, OPAL_STRING )) ) {
678 ORTE_ERROR_LOG(ret);
679 exit_status = ret;
680 goto cleanup;
681 }
682
683 cleanup:
684 return exit_status;
685 }
686
orte_sstore_central_global_unpack(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_base_handle_t * handle)687 int orte_sstore_central_global_unpack(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_base_handle_t *handle)
688 {
689 int ret, exit_status = ORTE_SUCCESS;
690
691 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
692 "sstore:central:(global): unpack()"));
693
694 /*
695 * Unpack the handle id
696 */
697 if(OPAL_EQUAL == orte_util_compare_name_fields(ORTE_NS_CMP_JOBID,
698 ORTE_PROC_MY_NAME,
699 peer)) {
700 /*
701 * Differ to the orted version, so if we have application then they get updated too
702 */
703 if( ORTE_SUCCESS != (ret = orte_sstore_central_local_unpack(peer, buffer, handle)) ) {
704 ORTE_ERROR_LOG(ret);
705 exit_status = ret;
706 goto cleanup;
707 }
708 }
709
710 cleanup:
711 return exit_status;
712 }
713
714 /**************************
715 * Local functions
716 **************************/
create_new_handle_info(int seq,int type,orte_jobid_t jobid)717 static orte_sstore_central_global_snapshot_info_t *create_new_handle_info(int seq, int type, orte_jobid_t jobid)
718 {
719 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
720 orte_job_t *jdata = NULL;
721
722 handle_info = OBJ_NEW(orte_sstore_central_global_snapshot_info_t);
723
724 handle_info->jobid = jobid;
725
726 handle_info->state = SSTORE_GLOBAL_INIT;
727
728 handle_info->handle_type = type;
729
730 handle_info->seq_num = seq;
731
732 orte_sstore_base_get_global_snapshot_ref(&(handle_info->ref_name), getpid());
733
734 asprintf(&(handle_info->local_location), "%s/%d",
735 handle_info->ref_name, handle_info->seq_num);
736
737 asprintf(&(handle_info->app_location_fmt), "%s/%s/%s",
738 handle_info->base_location,
739 handle_info->local_location,
740 orte_sstore_base_local_snapshot_fmt);
741
742 asprintf(&(handle_info->metadata_filename), "%s/%s/%s",
743 handle_info->base_location,
744 handle_info->ref_name,
745 orte_sstore_base_global_metadata_filename);
746
747 jdata = orte_get_job_data_object(handle_info->jobid);
748 handle_info->num_procs_total = (int)jdata->num_procs;
749
750 opal_list_append(active_handles, &(handle_info->super));
751
752 return handle_info;
753 }
754
find_handle_info(orte_sstore_base_handle_t handle)755 static orte_sstore_central_global_snapshot_info_t *find_handle_info(orte_sstore_base_handle_t handle)
756 {
757 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
758 opal_list_item_t* item = NULL;
759
760 for(item = opal_list_get_first(active_handles);
761 item != opal_list_get_end(active_handles);
762 item = opal_list_get_next(item) ) {
763 handle_info = (orte_sstore_central_global_snapshot_info_t*)item;
764
765 if( handle_info->id == handle ) {
766 return handle_info;
767 }
768 }
769
770 return NULL;
771 }
772
find_handle_info_from_ref(char * ref,int seq)773 static orte_sstore_central_global_snapshot_info_t *find_handle_info_from_ref(char *ref, int seq)
774 {
775 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
776 opal_list_item_t* item = NULL;
777
778 for(item = opal_list_get_first(active_handles);
779 item != opal_list_get_end(active_handles);
780 item = opal_list_get_next(item) ) {
781 handle_info = (orte_sstore_central_global_snapshot_info_t*)item;
782
783 if( handle_info->seq_num == seq ) {
784 if( NULL != ref &&
785 strncmp(handle_info->ref_name, ref, strlen(ref)) ) {
786 return handle_info;
787 } else {
788 return handle_info;
789 }
790 }
791 }
792
793 return NULL;
794 }
795
sstore_central_global_start_listener(void)796 static int sstore_central_global_start_listener(void)
797 {
798 if( is_global_listener_active ) {
799 return ORTE_SUCCESS;
800 }
801
802 orte_rml.recv_buffer_nb(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL,
803 ORTE_RML_PERSISTENT, sstore_central_global_recv, NULL);
804
805 is_global_listener_active = true;
806 return ORTE_SUCCESS;
807 }
808
sstore_central_global_stop_listener(void)809 static int sstore_central_global_stop_listener(void)
810 {
811 orte_rml.recv_cancel(ORTE_NAME_WILDCARD, ORTE_RML_TAG_SSTORE_INTERNAL);
812
813 is_global_listener_active = false;
814 return ORTE_SUCCESS;
815 }
816
sstore_central_global_recv(int status,orte_process_name_t * sender,opal_buffer_t * buffer,orte_rml_tag_t tag,void * cbdata)817 static void sstore_central_global_recv(int status,
818 orte_process_name_t* sender,
819 opal_buffer_t* buffer,
820 orte_rml_tag_t tag,
821 void* cbdata)
822 {
823 int ret;
824 orte_sstore_central_cmd_flag_t command;
825 orte_std_cntr_t count;
826 orte_sstore_base_handle_t loc_id;
827 orte_sstore_central_global_snapshot_info_t *handle_info = NULL;
828
829 if( ORTE_RML_TAG_SSTORE_INTERNAL != tag ) {
830 return;
831 }
832
833 /*
834 * If this was an application process contacting us, then act like an orted
835 * instead of an HNP
836 */
837 if(OPAL_EQUAL != orte_util_compare_name_fields(ORTE_NS_CMP_JOBID,
838 ORTE_PROC_MY_NAME,
839 sender)) {
840 orte_sstore_central_local_recv(status, sender, buffer, tag, cbdata);
841 return;
842 }
843
844
845 OPAL_OUTPUT_VERBOSE((10, mca_sstore_central_component.super.output_handle,
846 "sstore:central:(global): process_cmd(%s)",
847 ORTE_NAME_PRINT(sender)));
848
849 count = 1;
850 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &command, &count, ORTE_SSTORE_CENTRAL_CMD))) {
851 ORTE_ERROR_LOG(ret);
852 goto cleanup;
853 }
854
855 count = 1;
856 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &loc_id, &count, ORTE_SSTORE_HANDLE )) ) {
857 ORTE_ERROR_LOG(ret);
858 goto cleanup;
859 }
860
861 /*
862 * Find the referenced handle
863 */
864 if(NULL == (handle_info = find_handle_info(loc_id)) ) {
865 ; /* JJH big problem */
866 }
867
868 /*
869 * Process the command
870 */
871 if( ORTE_SSTORE_CENTRAL_PULL == command ) {
872 process_local_pull(sender, buffer, handle_info);
873 }
874 else if( ORTE_SSTORE_CENTRAL_PUSH == command ) {
875 process_local_push(sender, buffer, handle_info);
876 }
877
878 cleanup:
879 return;
880 }
881
process_local_pull(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_central_global_snapshot_info_t * handle_info)882 static int process_local_pull(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_global_snapshot_info_t *handle_info)
883 {
884 int ret, exit_status = ORTE_SUCCESS;
885 opal_buffer_t *loc_buffer = NULL;
886 orte_sstore_central_cmd_flag_t command;
887
888 /*
889 * Push back the requested information
890 */
891 loc_buffer = OBJ_NEW(opal_buffer_t);
892
893 command = ORTE_SSTORE_CENTRAL_PUSH;
894 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &command, 1, ORTE_SSTORE_CENTRAL_CMD))) {
895 ORTE_ERROR_LOG(ret);
896 exit_status = ret;
897 goto cleanup;
898 }
899
900 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->id), 1, ORTE_SSTORE_HANDLE))) {
901 ORTE_ERROR_LOG(ret);
902 exit_status = ret;
903 goto cleanup;
904 }
905
906 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->seq_num), 1, OPAL_INT))) {
907 ORTE_ERROR_LOG(ret);
908 exit_status = ret;
909 goto cleanup;
910 }
911
912 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->ref_name), 1, OPAL_STRING))) {
913 ORTE_ERROR_LOG(ret);
914 exit_status = ret;
915 goto cleanup;
916 }
917
918 if (ORTE_SUCCESS != (ret = opal_dss.pack(loc_buffer, &(handle_info->app_location_fmt), 1, OPAL_STRING))) {
919 ORTE_ERROR_LOG(ret);
920 exit_status = ret;
921 goto cleanup;
922 }
923
924 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(peer, loc_buffer, ORTE_RML_TAG_SSTORE_INTERNAL,
925 orte_rml_send_callback, NULL))) {
926 ORTE_ERROR_LOG(ret);
927 exit_status = ret;
928 goto cleanup;
929 }
930 /* loc_buffer should not be released here; the callback releases it */
931 loc_buffer = NULL;
932
933 cleanup:
934 if (NULL != loc_buffer) {
935 OBJ_RELEASE(loc_buffer);
936 loc_buffer = NULL;
937 }
938
939 return exit_status;
940 }
941
process_local_push(orte_process_name_t * peer,opal_buffer_t * buffer,orte_sstore_central_global_snapshot_info_t * handle_info)942 static int process_local_push(orte_process_name_t* peer, opal_buffer_t* buffer, orte_sstore_central_global_snapshot_info_t *handle_info)
943 {
944 int ret, exit_status = ORTE_SUCCESS;
945 orte_std_cntr_t count;
946 size_t num_entries, i;
947 orte_process_name_t name;
948 bool ckpt_skipped = false;
949 char * crs_comp = NULL;
950 char * proc_name = NULL;
951
952 /*
953 * Unpack the data
954 */
955 count = 1;
956 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &num_entries, &count, OPAL_SIZE))) {
957 ORTE_ERROR_LOG(ret);
958 exit_status = ret;
959 goto cleanup;
960 }
961
962 for(i = 0; i < num_entries; ++i ) {
963 count = 1;
964 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &name, &count, ORTE_NAME))) {
965 ORTE_ERROR_LOG(ret);
966 exit_status = ret;
967 goto cleanup;
968 }
969
970 count = 1;
971 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &ckpt_skipped, &count, OPAL_BOOL))) {
972 ORTE_ERROR_LOG(ret);
973 exit_status = ret;
974 goto cleanup;
975 }
976
977 if( !ckpt_skipped ) {
978 count = 1;
979 if (ORTE_SUCCESS != (ret = opal_dss.unpack(buffer, &crs_comp, &count, OPAL_STRING))) {
980 ORTE_ERROR_LOG(ret);
981 exit_status = ret;
982 goto cleanup;
983 }
984
985 /*
986 * Write this information to the global metadata
987 */
988 orte_util_convert_process_name_to_string(&proc_name, &name);
989
990 metadata_write_str(handle_info,
991 SSTORE_METADATA_INTERNAL_PROCESS_STR,
992 proc_name);
993 metadata_write_str(handle_info,
994 SSTORE_METADATA_LOCAL_CRS_COMP_STR,
995 crs_comp);
996 }
997
998 if( NULL != crs_comp ) {
999 free(crs_comp);
1000 crs_comp = NULL;
1001 }
1002 if( NULL != proc_name ) {
1003 free(proc_name);
1004 proc_name = NULL;
1005 }
1006
1007 (handle_info->num_procs_synced)++;
1008 }
1009
1010 cleanup:
1011 if( NULL != crs_comp ) {
1012 free(crs_comp);
1013 crs_comp = NULL;
1014 }
1015 if( NULL != proc_name ) {
1016 free(proc_name);
1017 proc_name = NULL;
1018 }
1019
1020 return exit_status;
1021 }
1022
init_global_snapshot_directory(orte_sstore_central_global_snapshot_info_t * handle_info)1023 static int init_global_snapshot_directory(orte_sstore_central_global_snapshot_info_t *handle_info)
1024 {
1025 int ret, exit_status = ORTE_SUCCESS;
1026 char * dir_name = NULL;
1027 mode_t my_mode = S_IRWXU;
1028
1029 /*
1030 * Make the snapshot directory from the uniq_global_snapshot_name
1031 */
1032 asprintf(&dir_name, "%s/%s",
1033 handle_info->base_location,
1034 handle_info->local_location);
1035 if(OPAL_SUCCESS != (ret = opal_os_dirpath_create(dir_name, my_mode)) ) {
1036 ORTE_ERROR_LOG(ret);
1037 exit_status = ret;
1038 goto cleanup;
1039 }
1040
1041 /*
1042 * Open up the metadata file
1043 */
1044 if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1045 ORTE_ERROR_LOG(ret);
1046 exit_status = ret;
1047 goto cleanup;
1048 }
1049
1050 cleanup:
1051 if(NULL != dir_name) {
1052 free(dir_name);
1053 dir_name = NULL;
1054 }
1055
1056 return exit_status;
1057 }
1058
1059 /**************************
1060 * Metadata functions
1061 **************************/
metadata_open(orte_sstore_central_global_snapshot_info_t * handle_info)1062 static int metadata_open(orte_sstore_central_global_snapshot_info_t * handle_info)
1063 {
1064 /* If already open, then just return */
1065 if( NULL != handle_info->metadata ) {
1066 return ORTE_SUCCESS;
1067 }
1068
1069 if (NULL == (handle_info->metadata = fopen(handle_info->metadata_filename, "a")) ) {
1070 opal_output(orte_sstore_base_framework.framework_output,
1071 "sstore:central:(global):init_dir() Unable to open the file (%s)\n",
1072 handle_info->metadata_filename);
1073 ORTE_ERROR_LOG(ORTE_ERROR);
1074 return ORTE_ERROR;
1075 }
1076
1077 return ORTE_SUCCESS;
1078 }
1079
metadata_close(orte_sstore_central_global_snapshot_info_t * handle_info)1080 static int metadata_close(orte_sstore_central_global_snapshot_info_t * handle_info)
1081 {
1082 /* If already closed, then just return */
1083 if( NULL == handle_info->metadata ) {
1084 return ORTE_SUCCESS;
1085 }
1086
1087 fclose(handle_info->metadata);
1088 handle_info->metadata = NULL;
1089
1090 return ORTE_SUCCESS;
1091 }
1092
metadata_write_int(orte_sstore_central_global_snapshot_info_t * handle_info,char * key,int value)1093 static int metadata_write_int(orte_sstore_central_global_snapshot_info_t * handle_info, char *key, int value)
1094 {
1095 int ret, exit_status = ORTE_SUCCESS;
1096
1097 /* Make sure the metadata file is open */
1098 if( NULL == handle_info->metadata ) {
1099 if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1100 ORTE_ERROR_LOG(ret);
1101 exit_status = ret;
1102 goto cleanup;
1103 }
1104 }
1105
1106 fprintf(handle_info->metadata, "%s%d\n", key, value);
1107
1108 cleanup:
1109 return exit_status;
1110 }
1111
metadata_write_str(orte_sstore_central_global_snapshot_info_t * handle_info,char * key,char * value)1112 static int metadata_write_str(orte_sstore_central_global_snapshot_info_t * handle_info, char *key, char *value)
1113 {
1114 int ret, exit_status = ORTE_SUCCESS;
1115
1116 /* Make sure the metadata file is open */
1117 if( NULL == handle_info->metadata ) {
1118 if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1119 ORTE_ERROR_LOG(ret);
1120 exit_status = ret;
1121 goto cleanup;
1122 }
1123 }
1124
1125 fprintf(handle_info->metadata, "%s%s\n", key, value);
1126
1127 cleanup:
1128 return exit_status;
1129 }
1130
metadata_write_timestamp(orte_sstore_central_global_snapshot_info_t * handle_info)1131 static int metadata_write_timestamp(orte_sstore_central_global_snapshot_info_t * handle_info)
1132 {
1133 int ret, exit_status = ORTE_SUCCESS;
1134 time_t timestamp;
1135
1136 /* Make sure the metadata file is open */
1137 if( NULL == handle_info->metadata ) {
1138 if( ORTE_SUCCESS != (ret = metadata_open(handle_info)) ) {
1139 ORTE_ERROR_LOG(ret);
1140 exit_status = ret;
1141 goto cleanup;
1142 }
1143 }
1144
1145 timestamp = time(NULL);
1146 fprintf(handle_info->metadata, "%s%s",
1147 SSTORE_METADATA_INTERNAL_TIME_STR,
1148 ctime(×tamp));
1149
1150 cleanup:
1151 return exit_status;
1152 }
1153
orte_sstore_central_extract_global_metadata(orte_sstore_central_global_snapshot_info_t * handle_info,orte_sstore_base_global_snapshot_info_t * global_snapshot)1154 static int orte_sstore_central_extract_global_metadata(orte_sstore_central_global_snapshot_info_t * handle_info,
1155 orte_sstore_base_global_snapshot_info_t *global_snapshot)
1156 {
1157 int exit_status = ORTE_SUCCESS;
1158 orte_sstore_base_local_snapshot_info_t *vpid_snapshot = NULL;
1159 opal_list_item_t* item = NULL;
1160 int i = 0;
1161
1162 /*
1163 * Cleanup the structure a bit, so we can refresh it below
1164 */
1165 while (NULL != (item = opal_list_remove_first(&global_snapshot->local_snapshots))) {
1166 OBJ_RELEASE(item);
1167 }
1168
1169 if( NULL != global_snapshot->start_time ) {
1170 free( global_snapshot->start_time );
1171 global_snapshot->start_time = NULL;
1172 }
1173
1174 if( NULL != global_snapshot->end_time ) {
1175 free( global_snapshot->end_time );
1176 global_snapshot->end_time = NULL;
1177 }
1178
1179 /*
1180 * Create a structure for each application process
1181 */
1182 for(i = 0; i < handle_info->num_procs_total; ++i) {
1183 vpid_snapshot = OBJ_NEW(orte_sstore_base_local_snapshot_info_t);
1184 vpid_snapshot->ss_handle = handle_info->id;
1185
1186 vpid_snapshot->process_name.jobid = handle_info->jobid;
1187 vpid_snapshot->process_name.vpid = i;
1188
1189 vpid_snapshot->crs_comp = NULL;
1190 global_snapshot->start_time = NULL;
1191 global_snapshot->end_time = NULL;
1192
1193 opal_list_append(&global_snapshot->local_snapshots, &(vpid_snapshot->super));
1194 }
1195
1196 return exit_status;
1197 }
1198
central_snapshot_sort_compare_fn(opal_list_item_t ** a,opal_list_item_t ** b)1199 static int central_snapshot_sort_compare_fn(opal_list_item_t **a,
1200 opal_list_item_t **b)
1201 {
1202 orte_sstore_base_local_snapshot_info_t *snap_a, *snap_b;
1203
1204 snap_a = (orte_sstore_base_local_snapshot_info_t*)(*a);
1205 snap_b = (orte_sstore_base_local_snapshot_info_t*)(*b);
1206
1207 if( snap_a->process_name.vpid > snap_b->process_name.vpid ) {
1208 return 1;
1209 }
1210 else if( snap_a->process_name.vpid == snap_b->process_name.vpid ) {
1211 return 0;
1212 }
1213 else {
1214 return -1;
1215 }
1216 }
1217