1 /*
2  * Copyright (c) 2004-2012 The Trustees of Indiana University.
3  *                         All rights reserved.
4  * Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
5  *                         All rights reserved.
6  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
7  *                         University of Stuttgart.  All rights reserved.
8  * Copyright (c) 2004-2005 The Regents of the University of California.
9  *                         All rights reserved.
10  * $COPYRIGHT$
11  *
12  * Additional copyrights may follow
13  *
14  * $HEADER$
15  */
16 
17 #include "orte_config.h"
18 
19 #include <errno.h>
20 #include <sys/types.h>
21 #ifdef HAVE_UNISTD_H
22 #include <unistd.h>
23 #endif  /* HAVE_UNISTD_H */
24 #ifdef HAVE_FCNTL_H
25 #include <fcntl.h>
26 #endif  /* HAVE_FCNTL_H */
27 #ifdef HAVE_SYS_TYPES_H
28 #include <sys/types.h>
29 #endif  /* HAVE_SYS_TYPES_H */
30 #ifdef HAVE_SYS_STAT_H
31 #include <sys/stat.h>  /* for mkfifo */
32 #endif  /* HAVE_SYS_STAT_H */
33 #include <signal.h>
34 #include <string.h>
35 
36 #include "orte/runtime/orte_cr.h"
37 #include "orte/runtime/orte_globals.h"
38 #include "orte/runtime/orte_wait.h"
39 #include "opal/runtime/opal_cr.h"
40 #include "opal/util/output.h"
41 #include "opal/mca/event/event.h"
42 #include "opal/util/opal_environ.h"
43 #include "orte/mca/mca.h"
44 #include "opal/mca/base/base.h"
45 #include "opal/mca/crs/crs.h"
46 #include "opal/mca/crs/base/base.h"
47 
48 #include "orte/util/name_fns.h"
49 #include "opal/mca/pmix/pmix.h"
50 #include "orte/mca/snapc/snapc.h"
51 #include "orte/mca/snapc/base/base.h"
52 #include "orte/mca/errmgr/errmgr.h"
53 #include "orte/mca/grpcomm/grpcomm.h"
54 #include "orte/mca/rml/rml.h"
55 #include "orte/mca/rml/rml_types.h"
56 #include "orte/mca/routed/routed.h"
57 #include "orte/mca/routed/base/base.h"
58 
59 #include "snapc_full.h"
60 
61 /************************************
62  * Locally Global vars & functions :)
63  ************************************/
64 static void snapc_full_app_signal_handler (int signo);
65 static int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp);
66 static int app_notify_resp_stage_1(opal_cr_ckpt_cmd_state_t resp);
67 static int app_notify_resp_stage_2(int cr_state );
68 static int app_notify_resp_stage_3(int cr_state, bool skip_fin_msg);
69 static int app_define_pipe_names(void);
70 static int snapc_full_app_notify_reopen_files(void);
71 static int snapc_full_app_ckpt_handshake_start(opal_cr_ckpt_cmd_state_t resp);
72 static int snapc_full_app_ckpt_handshake_end(int cr_state);
73 
74 static int snapc_full_app_ft_event_update_process_info(orte_process_name_t proc, pid_t pid);
75 static int snapc_full_app_finished_msg(int cr_state);
76 
77 static int app_notify_resp_inc_prep_only(int cr_state);
78 
79 static char *app_comm_pipe_r = NULL;
80 static char *app_comm_pipe_w = NULL;
81 static int   app_comm_pipe_r_fd = -1;
82 static int   app_comm_pipe_w_fd = -1;
83 
84 static opal_crs_base_snapshot_t *local_snapshot = NULL;
85 
86 static bool app_notif_processed = false;
87 
88 static bool currently_migrating = false;
89 static bool currently_all_migrating = false;
90 
91 static bool currently_checkpointing = false;
92 static int  current_unique_id = 0;
93 
94 static int current_cr_state = OPAL_CRS_NONE;
95 
96 static orte_sstore_base_handle_t current_ss_handle = ORTE_SSTORE_HANDLE_INVALID, last_ss_handle = ORTE_SSTORE_HANDLE_INVALID;
97 static opal_crs_base_ckpt_options_t *current_options = NULL;
98 
99 /************************
100  * Function Definitions
101  ************************/
102 
app_coord_init()103 int app_coord_init()
104 {
105     int ret, exit_status  = ORTE_SUCCESS;
106     opal_cr_notify_callback_fn_t prev_notify_func;
107     orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
108     orte_snapc_base_request_op_event_t op_event = ORTE_SNAPC_OP_INIT;
109     opal_buffer_t *buffer = NULL;
110 
111     OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
112                          "App) Initalized for Application %s\n",
113                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
114 
115     /*
116      * Register the INC notification callback
117      */
118     opal_cr_reg_notify_callback(snapc_full_app_notify_response, &prev_notify_func);
119 
120     /*
121      * Set the pipe names
122      */
123     current_unique_id = 0;
124     app_define_pipe_names();
125 
126     /*
127      * Setup a signal handler to catch and start the proper thread
128      * to handle the checkpoint
129      */
130     if( SIG_ERR == signal(opal_cr_entry_point_signal, snapc_full_app_signal_handler) ) {
131         opal_output(mca_snapc_full_component.super.output_handle,
132                     "App) init: Error: Failed to register signal %d\n",
133                     opal_cr_entry_point_signal);
134         ORTE_ERROR_LOG(OPAL_ERROR);
135         exit_status = OPAL_ERROR;
136         goto cleanup;
137     }
138 
139     OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
140                          "app) Named Pipes (%s) (%s), Signal (%d)",
141                          app_comm_pipe_r, app_comm_pipe_w, opal_cr_entry_point_signal));
142 
143     /*
144      * All processes must sync here, so the Global coordinator can know that
145      * it is safe to checkpoint now.
146      * Rank 0: Sends confirmation message to the Global Coordinator
147      */
148     if( 0 == ORTE_PROC_MY_NAME->vpid ) {
149         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
150                              "app) Startup Barrier..."));
151     }
152 
153     if (ORTE_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
154         ORTE_ERROR_LOG(ret);
155         exit_status = ret;
156         goto cleanup;
157     }
158 
159     if( 0 == ORTE_PROC_MY_NAME->vpid ) {
160         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
161                              "app) Startup Barrier: Send INIT to HNP...!"));
162 
163         buffer = OBJ_NEW(opal_buffer_t);
164 
165         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
166             ORTE_ERROR_LOG(ret);
167             exit_status = ret;
168             OBJ_RELEASE(buffer);
169             return ORTE_ERROR;
170         }
171         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(ORTE_PROC_MY_NAME->jobid), 1, ORTE_JOBID))) {
172             ORTE_ERROR_LOG(ret);
173             exit_status = ret;
174             OBJ_RELEASE(buffer);
175             return ORTE_ERROR;
176         }
177 
178         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(op_event), 1, OPAL_INT))) {
179             ORTE_ERROR_LOG(ret);
180             exit_status = ret;
181             OBJ_RELEASE(buffer);
182             goto cleanup;
183         }
184 
185         if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
186                                                            ORTE_RML_TAG_SNAPC_FULL,
187                                                            orte_rml_send_callback, 0))) {
188             ORTE_ERROR_LOG(ret);
189             exit_status = ret;
190             OBJ_RELEASE(buffer);
191             return ORTE_ERROR;
192         }
193     }
194 
195     if( 0 == ORTE_PROC_MY_NAME->vpid ) {
196         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
197                              "app) Startup Barrier: Done!"));
198     }
199 
200  cleanup:
201     return exit_status;
202 }
203 
app_coord_finalize()204 int app_coord_finalize()
205 {
206     int ret, exit_status = ORTE_SUCCESS;
207     orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
208     orte_snapc_base_request_op_event_t op_event = ORTE_SNAPC_OP_FIN;
209     opal_buffer_t *buffer = NULL;
210     orte_std_cntr_t count;
211     orte_rml_recv_cb_t *rb = NULL;
212 
213     /*
214      * All processes must sync here, so the Global coordinator can know that
215      * it is no longer safe to checkpoint.
216      * Rank 0: Sends confirmation message to the Global Coordinator
217      */
218     if( 0 == ORTE_PROC_MY_NAME->vpid ) {
219         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
220                              "app) Shutdown Barrier..."));
221     }
222 
223     if (ORTE_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
224         ORTE_ERROR_LOG(ret);
225         exit_status = ret;
226         goto cleanup;
227     }
228 
229     if( 0 == ORTE_PROC_MY_NAME->vpid ) {
230         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
231                              "app) Shutdown Barrier: Send FIN to HNP...!"));
232 
233         /* Tell HNP that we are finalizing */
234         buffer = OBJ_NEW(opal_buffer_t);
235 
236         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
237             ORTE_ERROR_LOG(ret);
238             exit_status = ret;
239             OBJ_RELEASE(buffer);
240             goto cleanup;
241         }
242         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(ORTE_PROC_MY_NAME->jobid), 1, ORTE_JOBID))) {
243             ORTE_ERROR_LOG(ret);
244             exit_status = ret;
245             OBJ_RELEASE(buffer);
246             goto cleanup;
247         }
248 
249         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(op_event), 1, OPAL_INT))) {
250             ORTE_ERROR_LOG(ret);
251             exit_status = ret;
252             OBJ_RELEASE(buffer);
253             goto cleanup;
254         }
255 
256         if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
257                                                            ORTE_RML_TAG_SNAPC_FULL,
258                                                            orte_rml_send_callback, 0))) {
259             ORTE_ERROR_LOG(ret);
260             exit_status = ret;
261             OBJ_RELEASE(buffer);
262             goto cleanup;
263         }
264 
265         /* buffer should not be released here; the callback releases it */
266         buffer = NULL;
267 
268         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
269                              "app) Shutdown Barrier: Waiting on FIN_ACK...!"));
270 
271         /* Wait for HNP to tell us that it is ok to finish finalization.
272          * We could have been checkpointing just as we entered finalize, so we
273          * need to wait until the checkpoint is finished before finishing.
274          */
275         rb = OBJ_NEW(orte_rml_recv_cb_t);
276         rb->active = true;
277         orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb);
278         ORTE_WAIT_FOR_COMPLETION(rb->active);
279 
280         count = 1;
281         if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) {
282             ORTE_ERROR_LOG(ret);
283             exit_status = ret;
284             goto cleanup;
285         }
286 
287         count = 1;
288         if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) {
289             ORTE_ERROR_LOG(ret);
290             exit_status = ret;
291             goto cleanup;
292         }
293 
294         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
295                              "app) Shutdown Barrier: Waiting on barrier...!"));
296     }
297 
298     if( 0 == ORTE_PROC_MY_NAME->vpid ) {
299         OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
300                              "app) Shutdown Barrier, Done!"));
301     }
302 
303  cleanup:
304     /* cleanup */
305     if (NULL != buffer) {
306         OBJ_RELEASE(buffer);
307         buffer = NULL;
308     }
309     if (NULL != rb) {
310         OBJ_RELEASE(rb);
311         rb = NULL;
312     }
313 
314     /*
315      * Cleanup named pipes
316      */
317     if( NULL != app_comm_pipe_r) {
318         free(app_comm_pipe_r);
319         app_comm_pipe_r = NULL;
320     }
321 
322     if( NULL != app_comm_pipe_w) {
323         free(app_comm_pipe_w);
324         app_comm_pipe_w = NULL;
325     }
326 
327     return exit_status;
328 }
329 
330 /******************
331  * Local functions
332  ******************/
snapc_full_app_signal_handler(int signo)333 static void snapc_full_app_signal_handler (int signo)
334 {
335     if( opal_cr_entry_point_signal != signo ) {
336         OPAL_OUTPUT_VERBOSE((1, mca_snapc_full_component.super.output_handle,
337                              "App) signal_handler: Received unknown signal %d",
338                              signo));
339         /* Not our signal */
340         return;
341     }
342     if( currently_checkpointing ) {
343         opal_output(0, "snapc:full:(app) Error: Received a signal to checkpoint, but Already checkpointing. Ignoring request!");
344     }
345     else {
346         currently_checkpointing = true;
347         /*
348          * Signal thread to start checkpoint handshake
349          */
350         opal_cr_checkpoint_request   = OPAL_CR_STATUS_REQUESTED;
351 
352         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
353                              "App) signal_handler: Receive Checkpoint Request."));
354     }
355 }
356 
357 /*
358  * Respond to an asynchronous checkpoint request
359  */
snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp)360 int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp)
361 {
362     static int cr_state;
363     int app_pid;
364     int ret, exit_status = ORTE_SUCCESS;
365 
366     /*
367      * Clear the options set
368      */
369     if( NULL == current_options ) {
370         current_options = OBJ_NEW(opal_crs_base_ckpt_options_t);
371     }
372 
373     if( opal_cr_currently_stalled ) {
374         goto STAGE_1;
375     }
376 
377     /* Default: use the fast way */
378     opal_cr_continue_like_restart = false;
379     orte_cr_flush_restart_files   = true;
380 
381     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
382                          "App) notify_response: Stage 1..."));
383     if( ORTE_SUCCESS != (ret = app_notify_resp_stage_1(resp) ) ) {
384         ORTE_ERROR_LOG(ret);
385         exit_status = ret;
386         goto ckpt_cleanup;
387     }
388 
389     cr_state = OPAL_CRS_RUNNING;
390     current_cr_state = cr_state;
391 
392 #if OPAL_ENABLE_CRDEBUG == 1
393     if( current_options->attach_debugger ) {
394         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
395                              "App) notify_response: C/R Debug: Wait for debugger..."));
396         MPIR_debug_with_checkpoint = true;
397     }
398     if( current_options->detach_debugger ) {
399         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
400                              "App) notify_response: C/R Debug: Do not wait for debugger..."));
401         MPIR_debug_with_checkpoint = false;
402     }
403 #endif
404 
405     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
406                          "App) notify_response: Start checkpoint..."));
407  STAGE_1:
408     opal_cr_currently_stalled = false;
409 
410     app_pid = getpid();
411     if( orte_snapc_full_skip_app ) {
412         OPAL_OUTPUT_VERBOSE((2, mca_snapc_full_component.super.output_handle,
413                              "App) notify_response: Skipping App. (%d)\n",
414                              app_pid));
415         ret = ORTE_SUCCESS;
416         cr_state = OPAL_CRS_CONTINUE;
417     }
418     else {
419         /*
420          * INC: Prepare stack using the registered coordination routine
421          */
422         if(OPAL_SUCCESS != (ret = opal_cr_inc_core_prep() ) ) {
423             if( OPAL_EXISTS == ret ) {
424                 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
425                                      "App) notify_response: Stalling the checkpoint progress until state is stable again (PID = %d)\n",
426                                      app_pid));
427                 opal_cr_currently_stalled = true;
428                 return exit_status;
429             }
430             else {
431                 opal_output(mca_snapc_full_component.super.output_handle,
432                             "App) notify_response: Error: checkpoint notification failed. %d\n", ret);
433                 ORTE_ERROR_LOG(ret);
434                 exit_status = ret;
435                 goto ckpt_cleanup;
436             }
437         }
438 
439         /*
440          * If this is a quiesce_start operation then we can stop here after calling
441          * the INC prep. Need to keep the connection open for the quiesce_end()
442          * operation though.
443          */
444         if( current_options->inc_prep_only ) {
445             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
446                                  "App) notify_response: INC Prep Only..."));
447             return app_notify_resp_inc_prep_only(cr_state);
448         } else {
449             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
450                                  "App) notify_response: Normal operation..."));
451         }
452 
453         /*
454          * INC: Take the checkpoint
455          *
456          * If migrating, only checkpoint if you are the target process
457          * otherwise just continue.
458          */
459         if( currently_all_migrating ) {
460             opal_cr_continue_like_restart = true;
461             orte_cr_flush_restart_files   = false;
462         }
463         if( !currently_migrating && currently_all_migrating ) {
464             OPAL_OUTPUT_VERBOSE((2, mca_snapc_full_component.super.output_handle,
465                                  "App) notify_response: Skipping App. (%d) - This process is not migrating \n",
466                                  app_pid));
467             ret = ORTE_SUCCESS;
468             cr_state = OPAL_CRS_CONTINUE;
469         }
470         else {
471             ret = opal_cr_inc_core_ckpt(app_pid, local_snapshot, current_options, &cr_state);
472         }
473         current_cr_state = cr_state;
474 
475         /*
476          * Tell Local Coordinator that we are done with local checkpoint
477          * (only if not restarting, on restart we are not attached to the Local
478          *  Coordinator. )
479          */
480         if( OPAL_CRS_RESTART != cr_state ) {
481             OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
482                                  "App) notify_response: Stage 2..."));
483             if( ORTE_SUCCESS != (ret = app_notify_resp_stage_2(cr_state) ) ) {
484                 ORTE_ERROR_LOG(ret);
485                 exit_status = ret;
486                 goto ckpt_cleanup;
487             }
488         }
489 
490         /*
491          * INC: Recover stack using the registered coordination routine
492          */
493         if( !currently_all_migrating ) {
494             if( OPAL_SUCCESS != (ret = opal_cr_inc_core_recover(cr_state)) ) {
495                 ORTE_ERROR_LOG(ret);
496                 exit_status = ret;
497                 goto ckpt_cleanup;
498             }
499         }
500         /*
501          * If this is a migrating target process, then do not recover the stack, but terminate.
502          * All non-migrating processes will wait in the recovery until the target processes are
503          * restarted on the target nodes.
504          */
505         else {
506             /*
507              * If we are one of the processes migrating, then terminate after checkpointing
508              */
509             if( currently_migrating ) {
510                 if( OPAL_CRS_RESTART != cr_state ) {
511                     current_options->term = true;
512                 }
513                 else {
514                     if( OPAL_SUCCESS != (ret = opal_cr_inc_core_recover(cr_state)) ) {
515                         ORTE_ERROR_LOG(ret);
516                         exit_status = ret;
517                         goto ckpt_cleanup;
518                     }
519                 }
520             }
521             /*
522              * If we are not one of the processes migrating, then wait for release.
523              * Need to act like we are restarting during recovery, since the migrating processes
524              * will expect this logic.
525              */
526             else {
527                 if( OPAL_SUCCESS != (ret = opal_cr_inc_core_recover(OPAL_CRS_RESTART)) ) {
528                     ORTE_ERROR_LOG(ret);
529                     exit_status = ret;
530                     goto ckpt_cleanup;
531                 }
532             }
533         }
534     }
535 
536     /* Don't stall any longer */
537     opal_cr_stall_check = false;
538 
539     if(OPAL_CRS_RESTART == cr_state) {
540         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
541                              "App) notify_response: Restarting... (%s : %d)\n",
542                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), app_pid));
543 
544         current_options->term = false;
545         /* Do not respond to the non-existent command line tool */
546         goto ckpt_cleanup;
547     }
548     else if(cr_state == OPAL_CRS_CONTINUE) {
549         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
550                              "App) notify_response: Continuing...(%s : %d)\n",
551                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), app_pid));
552         ; /* Don't need to do anything here */
553     }
554     else if(cr_state == OPAL_CRS_TERM ) {
555         ; /* Don't need to do anything here */
556     }
557     else {
558         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
559                              "App) notify_response: Unknown cr_state(%d) [%d]",
560                              cr_state, app_pid));
561     }
562 
563  ckpt_cleanup:
564     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
565                          "App) notify_response: Stage 3..."));
566     if( ORTE_SUCCESS != (ret = app_notify_resp_stage_3(cr_state, false) )) {
567         ORTE_ERROR_LOG(ret);
568         exit_status = ret;
569         goto ckpt_cleanup;
570     }
571 
572     if( current_options->term ) {
573         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
574                              "App) notify_response: User has asked to terminate the application"));
575         /* Wait here for termination.
576          * If we call 'exit' then the job will fail in an ugly way, instead just
577          * wait for the Global coordinator to terminate us.
578          */
579         while(1) {
580             opal_progress();
581             sleep(1);
582         }
583     }
584 
585     if( NULL != current_options ) {
586         OBJ_RELEASE(current_options);
587         current_options = NULL;
588     }
589 
590     currently_checkpointing = false;
591 
592     return exit_status;
593 }
594 
app_notify_resp_stage_1(opal_cr_ckpt_cmd_state_t resp)595 static int app_notify_resp_stage_1(opal_cr_ckpt_cmd_state_t resp)
596 {
597     int ret, exit_status = ORTE_SUCCESS;
598 
599     OPAL_CR_CLEAR_TIMERS();
600     opal_cr_timing_my_rank = ORTE_PROC_MY_NAME->vpid;
601     OPAL_CR_SET_TIMER(OPAL_CR_TIMER_ENTRY0);
602 
603     /*
604      * Open communication channels
605      */
606     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
607                          "App) notify_response: Open Communication Channels."));
608     if (ORTE_SUCCESS != (ret = snapc_full_app_notify_reopen_files())) {
609         ORTE_ERROR_LOG(ret);
610         exit_status = ret;
611         goto cleanup;
612     }
613 
614     /*
615      * Initial Handshake
616      */
617     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
618                          "App) notify_response: Initial Handshake."));
619     if( ORTE_SUCCESS != (ret = snapc_full_app_ckpt_handshake_start(resp) ) ) {
620         ORTE_ERROR_LOG(ret);
621         exit_status = ret;
622         goto cleanup;
623     }
624 
625     OPAL_CR_SET_TIMER(OPAL_CR_TIMER_ENTRY1);
626 
627     /*
628      * Register with SStore
629      */
630     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
631                          "App) notify_response: Register with SStore..."));
632     if( OPAL_SUCCESS != (ret = orte_sstore.register_handle(current_ss_handle)) ) {
633         ORTE_ERROR_LOG(ret);
634         exit_status = ret;
635         goto cleanup;
636     }
637 
638     local_snapshot = OBJ_NEW(opal_crs_base_snapshot_t);
639 
640     if( !currently_migrating && currently_all_migrating ) {
641         orte_sstore.set_attr(current_ss_handle,
642                              SSTORE_METADATA_LOCAL_SKIP_CKPT,
643                              "1");
644     }
645 
646     orte_sstore.get_attr(current_ss_handle,
647                          SSTORE_METADATA_LOCAL_SNAP_LOC,
648                          &(local_snapshot->snapshot_directory));
649     orte_sstore.get_attr(current_ss_handle,
650                          SSTORE_METADATA_LOCAL_SNAP_META,
651                          &(local_snapshot->metadata_filename));
652 
653     OPAL_CR_SET_TIMER(OPAL_CR_TIMER_ENTRY2);
654 
655     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
656                          "App) notify_response: Start checkpoint... (%d)", (int)current_ss_handle));
657 
658  cleanup:
659     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
660                          "App) notify_response: Are we migrating [%5s]. Am I migrating [%5s]",
661                          (currently_all_migrating ? "True" : "False"),
662                          (currently_migrating ? "True" : "False") ));
663 
664     return exit_status;
665 }
666 
app_notify_resp_inc_prep_only(int cr_state)667 static int app_notify_resp_inc_prep_only(int cr_state)
668 {
669     int ret, exit_status = ORTE_SUCCESS;
670 
671     /*
672      * Tell the local coordinator that we are done with the INC prep
673      */
674     if( sizeof(int) != (ret = write(app_comm_pipe_w_fd, &cr_state, sizeof(int))) ) {
675         opal_output(mca_snapc_full_component.super.output_handle,
676                     "App) notify_response: Error: Unable to write cr_state to named pipe (%s).\n",
677                     app_comm_pipe_w);
678         ORTE_ERROR_LOG(ret);
679         exit_status = ret;
680         goto cleanup;
681     }
682 
683     app_notif_processed = true;
684 
685  cleanup:
686     return exit_status;
687 }
688 
app_notify_resp_stage_2(int cr_state)689 static int app_notify_resp_stage_2(int cr_state )
690 {
691     int ret;
692 
693     OPAL_CR_SET_TIMER(OPAL_CR_TIMER_ENTRY3);
694 
695     /*
696      * Sync SStore
697      * If we stopped the process, then we already did this
698      */
699     if( !(current_options->stop) ) {
700         if( currently_migrating || !currently_all_migrating ) {
701             orte_sstore.set_attr(current_ss_handle,
702                                  SSTORE_METADATA_LOCAL_CRS_COMP,
703                                  local_snapshot->component_name);
704         }
705 
706         orte_sstore.sync(current_ss_handle);
707     }
708     last_ss_handle = current_ss_handle;
709     current_ss_handle = 0;
710 
711     /*
712      * Final Handshake
713      */
714     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
715                          "App) notify_response: Waiting for final handshake."));
716     if( ORTE_SUCCESS != (ret = snapc_full_app_ckpt_handshake_end(cr_state ) ) ) {
717         ORTE_ERROR_LOG(ret);
718         return ret;
719     }
720 
721     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
722                          "App) notify_response: Final Handshake complete."));
723 
724     return ORTE_SUCCESS;
725 }
726 
app_define_pipe_names(void)727 static int app_define_pipe_names(void)
728 {
729     if( NULL != app_comm_pipe_r ) {
730         free(app_comm_pipe_r);
731         app_comm_pipe_r = NULL;
732     }
733 
734     if( NULL != app_comm_pipe_w ) {
735         free(app_comm_pipe_w);
736         app_comm_pipe_w = NULL;
737     }
738 
739     asprintf(&app_comm_pipe_r, "%s/%s.%d_%d",
740              opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_R,
741              (int)getpid(), current_unique_id);
742     asprintf(&app_comm_pipe_w, "%s/%s.%d_%d",
743              opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_W,
744              (int)getpid(), current_unique_id);
745 
746     ++current_unique_id;
747 
748     return ORTE_SUCCESS;
749 }
750 
app_notify_resp_stage_3(int cr_state,bool skip_fin_msg)751 static int app_notify_resp_stage_3(int cr_state, bool skip_fin_msg)
752 {
753     /*
754      * Send a message to the local daemon letting it know that we are done
755      */
756     if( !skip_fin_msg ) {
757         snapc_full_app_finished_msg(cr_state);
758     }
759 
760     /*
761      * Close and cleanup pipes
762      */
763     if( 0 <= app_comm_pipe_r_fd ) {
764         close(app_comm_pipe_r_fd);
765         app_comm_pipe_r_fd = -1;
766     }
767     if( 0 <= app_comm_pipe_w_fd ) {
768         close(app_comm_pipe_w_fd);
769         app_comm_pipe_w_fd = -1;
770     }
771 
772     remove(app_comm_pipe_r);
773     remove(app_comm_pipe_w);
774 
775     app_comm_pipe_r_fd = -1;
776     app_comm_pipe_w_fd = -1;
777 
778     if( OPAL_CRS_RESTART == cr_state ) {
779         current_unique_id = 0;
780     }
781 
782     app_define_pipe_names();
783 
784     /* Prepare to wait for another checkpoint action */
785     opal_cr_checkpointing_state = OPAL_CR_STATUS_NONE;
786     opal_cr_currently_stalled   = false;
787 
788     currently_all_migrating = false;
789     currently_migrating     = false;
790 
791     OPAL_CR_SET_TIMER(OPAL_CR_TIMER_ENTRY4);
792     if(OPAL_CRS_RESTART != cr_state) {
793         OPAL_CR_DISPLAY_ALL_TIMERS();
794     }
795 
796     return ORTE_SUCCESS;
797 }
798 
snapc_full_app_finished_msg(int cr_state)799 static int snapc_full_app_finished_msg(int cr_state) {
800     int ret, exit_status = ORTE_SUCCESS;
801     opal_buffer_t *buffer = NULL;
802     orte_snapc_cmd_flag_t command = ORTE_SNAPC_LOCAL_FINISH_CMD;
803 
804     buffer = OBJ_NEW(opal_buffer_t);
805 
806     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_CMD))) {
807         ORTE_ERROR_LOG(ret);
808         exit_status = ret;
809         goto cleanup;
810     }
811 
812     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &cr_state, 1, OPAL_INT))) {
813         ORTE_ERROR_LOG(ret);
814         exit_status = ret;
815         goto cleanup;
816     }
817 
818     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
819                                                        ORTE_RML_TAG_SNAPC,
820                                                        orte_rml_send_callback, 0))) {
821         ORTE_ERROR_LOG(ret);
822         exit_status = ret;
823         goto cleanup;
824     }
825 
826     return ORTE_SUCCESS;
827  cleanup:
828     OBJ_RELEASE(buffer);
829 
830     return exit_status;
831 }
832 
snapc_full_app_notify_reopen_files(void)833 static int snapc_full_app_notify_reopen_files(void)
834 {
835     int ret = OPAL_ERR_NOT_IMPLEMENTED;
836 
837 #ifndef HAVE_MKFIFO
838     return ret;
839 #else
840     /*
841      * Open up the read pipe
842      */
843     if( (ret = mkfifo(app_comm_pipe_r, 0660)) < 0) {
844         if(EEXIST == ret || -1 == ret ) {
845             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
846                                  "App) notify_reopen_files: mkfifo failed because file (%s) already exists, attempting to use this pipe. (%d)",
847                                  app_comm_pipe_r, ret));
848         }
849         else {
850             opal_output(mca_snapc_full_component.super.output_handle,
851                         "App) notify_reopen_files: Error: mkfifo failed to make named pipe (%s). (%d)\n",
852                         app_comm_pipe_r, ret);
853             return ORTE_ERROR;
854         }
855     }
856 
857     app_comm_pipe_r_fd = open(app_comm_pipe_r, O_RDWR);
858     if(app_comm_pipe_r_fd < 0) {
859         opal_output(mca_snapc_full_component.super.output_handle,
860                     "App) init: Error: open failed to open the named pipe (%s). %d\n",
861                     app_comm_pipe_r, app_comm_pipe_r_fd);
862         return ORTE_ERROR;
863     }
864 
865     /*
866      * Open up the write pipe
867      */
868     if( (ret = mkfifo(app_comm_pipe_w, 0660)) < 0) {
869         if(EEXIST == ret || -1 == ret ) {
870             OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
871                                  "App) notify_reopen_files: mkfifo failed because file (%s) already exists, attempting to use this pipe. (%d)",
872                                  app_comm_pipe_w, ret));
873         }
874         else {
875             opal_output(mca_snapc_full_component.super.output_handle,
876                         "App) notify_reopen_files: Error: mkfifo failed to make named pipe (%s). (%d)\n",
877                         app_comm_pipe_w, ret);
878             return ORTE_ERROR;
879         }
880     }
881 
882     app_comm_pipe_w_fd = open(app_comm_pipe_w, O_WRONLY);
883     if(app_comm_pipe_w_fd < 0) {
884         opal_output(mca_snapc_full_component.super.output_handle,
885                     "App) notify_reopen_files: Error: open failed to open the named pipe (%s). (%d)\n",
886                     app_comm_pipe_w, app_comm_pipe_w_fd);
887         return ORTE_ERROR;
888     }
889 
890     return ORTE_SUCCESS;
891 #endif  /* HAVE_MKFIFO */
892 }
893 
snapc_full_app_ckpt_handshake_start(opal_cr_ckpt_cmd_state_t resp)894 static int snapc_full_app_ckpt_handshake_start(opal_cr_ckpt_cmd_state_t resp)
895 {
896     int ret, exit_status = ORTE_SUCCESS;
897     int tmp_resp, opt_rep;
898 
899     /*
900      * Get the initial handshake command:
901      * - Migrating option [all, me]
902      * - Term argument
903      * - Stop argument
904      */
905     if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
906         opal_output(mca_snapc_full_component.super.output_handle,
907                     "App) notify_response: Error: Unable to read the all_migrating option from named pipe (%s). %d\n",
908                     app_comm_pipe_r, ret);
909         ORTE_ERROR_LOG(ret);
910         goto cleanup;
911     }
912     currently_all_migrating = OPAL_INT_TO_BOOL(opt_rep);
913 
914     if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
915         opal_output(mca_snapc_full_component.super.output_handle,
916                     "App) notify_response: Error: Unable to read the migrating option from named pipe (%s). %d\n",
917                     app_comm_pipe_r, ret);
918         ORTE_ERROR_LOG(ret);
919         goto cleanup;
920     }
921     currently_migrating = OPAL_INT_TO_BOOL(opt_rep);
922 
923     if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
924         opal_output(mca_snapc_full_component.super.output_handle,
925                     "App) notify_response: Error: Unable to read the 'term' from named pipe (%s). %d\n",
926                     app_comm_pipe_r, ret);
927         ORTE_ERROR_LOG(ret);
928         goto cleanup;
929     }
930     current_options->term = OPAL_INT_TO_BOOL(opt_rep);
931 
932     if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
933         opal_output(mca_snapc_full_component.super.output_handle,
934                     "App) notify_response: Error: Unable to read the 'stop' from named pipe (%s). %d\n",
935                     app_comm_pipe_r, ret);
936         ORTE_ERROR_LOG(ret);
937         goto cleanup;
938     }
939     current_options->stop = OPAL_INT_TO_BOOL(opt_rep);
940 
941     if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
942         opal_output(mca_snapc_full_component.super.output_handle,
943                     "App) notify_response: Error: Unable to read the 'inc_prep_only' from named pipe (%s). %d\n",
944                     app_comm_pipe_r, ret);
945         ORTE_ERROR_LOG(ret);
946         goto cleanup;
947     }
948     current_options->inc_prep_only = OPAL_INT_TO_BOOL(opt_rep);
949 
950     if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
951         opal_output(mca_snapc_full_component.super.output_handle,
952                     "App) notify_response: Error: Unable to read the 'inc_recover_only' from named pipe (%s). %d\n",
953                     app_comm_pipe_r, ret);
954         ORTE_ERROR_LOG(ret);
955         goto cleanup;
956     }
957     current_options->inc_recover_only = OPAL_INT_TO_BOOL(opt_rep);
958 
959 #if OPAL_ENABLE_CRDEBUG == 1
960     if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
961         opal_output(mca_snapc_full_component.super.output_handle,
962                     "App) notify_response: Error: Unable to read the 'attach_debugger' from named pipe (%s). %d\n",
963                     app_comm_pipe_r, ret);
964         ORTE_ERROR_LOG(ret);
965         goto cleanup;
966     }
967     current_options->attach_debugger = OPAL_INT_TO_BOOL(opt_rep);
968 
969     if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
970         opal_output(mca_snapc_full_component.super.output_handle,
971                     "App) notify_response: Error: Unable to read the 'detach_debugger' from named pipe (%s). %d\n",
972                     app_comm_pipe_r, ret);
973         ORTE_ERROR_LOG(ret);
974         goto cleanup;
975     }
976     current_options->detach_debugger = OPAL_INT_TO_BOOL(opt_rep);
977 #endif
978 
979     /*
980      * Get SStore Handle
981      */
982     if( sizeof(orte_sstore_base_handle_t) != (ret = read(app_comm_pipe_r_fd, &current_ss_handle, sizeof(orte_sstore_base_handle_t))) ) {
983         opal_output(mca_snapc_full_component.super.output_handle,
984                     "App) notify_response: Error: Unable to read the sstore handle from named pipe (%s). %d\n",
985                     app_comm_pipe_r, ret);
986         ORTE_ERROR_LOG(ret);
987         goto cleanup;
988     }
989 
990     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
991                          "App) %s Received Options... Responding with %d\n",
992                          ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)resp));
993 
994     /*
995      * Write back the response to the request (message printed below)
996      */
997     tmp_resp = (int)resp;
998     if( sizeof(int) != (ret = write(app_comm_pipe_w_fd, &tmp_resp, sizeof(int)) ) ) {
999         opal_output(mca_snapc_full_component.super.output_handle,
1000                     "App) notify_response: %d: Error: Unable to write to pipe (%s) ret = %d [Line %d]\n",
1001                     tmp_resp, app_comm_pipe_w, ret, __LINE__);
1002         ORTE_ERROR_LOG(ret);
1003         goto cleanup;
1004     }
1005 
1006     /*
1007      * Respond that the checkpoint is currently in progress
1008      */
1009     if( OPAL_CHECKPOINT_CMD_IN_PROGRESS == resp ) {
1010         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1011                              "App) notify_response: Checkpoint in progress, cannot start (%d)",
1012                              getpid()));
1013         ORTE_ERROR_LOG(ret);
1014         goto cleanup;
1015     }
1016     /*
1017      * Respond that the application is unable to be checkpointed
1018      */
1019     else if( OPAL_CHECKPOINT_CMD_NULL == resp ) {
1020         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1021                              "App) notify_response: Non-checkpointable application, cannot start (%d)",
1022                              getpid()));
1023         ORTE_ERROR_LOG(ret);
1024         goto cleanup;
1025     }
1026     /*
1027      * Respond that some error has occurred such that the application is
1028      * not able to be checkpointed
1029      */
1030     else if( OPAL_CHECKPOINT_CMD_ERROR == resp ) {
1031         OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1032                              "App) notify_response: Error generated, cannot start (%d)",
1033                              getpid()));
1034         ORTE_ERROR_LOG(ret);
1035         goto cleanup;
1036     }
1037 
1038     /*
1039      * Respond signalng that we wish to respond to this request
1040      */
1041     OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1042                          "App) notify_response: Starting checkpoint request (%d)",
1043                          getpid()));
1044 
1045     /*
1046      * Get the sentinel value indicating that we can start now
1047      * JJH: Check for an error here indicating that even though this process is
1048      *      OK to checkpoint others might not be in which case we should cleanup
1049      *      properly.
1050      */
1051     if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
1052         opal_output(mca_snapc_full_component.super.output_handle,
1053                     "App) notify_response: Error: Unable to read from named pipe (%s). %d\n",
1054                     app_comm_pipe_r, ret);
1055         ORTE_ERROR_LOG(ret);
1056         goto cleanup;
1057     }
1058 
1059  cleanup:
1060     return exit_status;
1061 }
1062 
snapc_full_app_ckpt_handshake_end(int cr_state)1063 static int snapc_full_app_ckpt_handshake_end(int cr_state)
1064 {
1065     int ret, exit_status = ORTE_SUCCESS;
1066     int last_cmd = 0;
1067     int err;
1068 
1069     /*
1070      * Return the final checkpoint state to the local coordinator
1071      */
1072     if( sizeof(int) != (ret = write(app_comm_pipe_w_fd, &cr_state, sizeof(int))) ) {
1073         err = errno;
1074         opal_output(mca_snapc_full_component.super.output_handle,
1075                     "App) notify_response: Error: Unable to write cr_state to named pipe (%s). %d/%d/%s\n",
1076                     app_comm_pipe_w, ret, err, strerror(err));
1077         ORTE_ERROR_LOG(ret);
1078         exit_status = ret;
1079         goto cleanup;
1080     }
1081 
1082     if( currently_all_migrating && currently_migrating ) {
1083         app_notify_resp_stage_3(cr_state, true);
1084         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1085                              "App) handshake_end: Waiting for termination (%d)",
1086                              getpid()));
1087         /* Wait here for termination, do not terminate ourselves.
1088          * JJH: We cannot terminate ourselves without killing the job...
1089          */
1090         while(1) {
1091             opal_progress();
1092             sleep(1);
1093         }
1094     }
1095 
1096     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1097                          "App) handshake_end: Waiting for release (%d)",
1098                          getpid()));
1099 
1100     /*
1101      * Wait for the local coordinator to release us
1102      */
1103     if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &last_cmd, sizeof(int))) ) {
1104         opal_output(mca_snapc_full_component.super.output_handle,
1105                     "App) notify_response: Error: Unable to read the 'last_cmd' from named pipe (%s). %d\n",
1106                     app_comm_pipe_r, ret);
1107         ORTE_ERROR_LOG(ret);
1108         exit_status = ret;
1109         goto cleanup;
1110     }
1111 
1112     OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1113                          "App) handshake_end: Released... (%d)",
1114                          getpid()));
1115 
1116  cleanup:
1117     return exit_status;
1118 }
1119 
app_coord_ft_event(int state)1120 int app_coord_ft_event(int state) {
1121     int ret, exit_status = ORTE_SUCCESS;
1122 
1123     OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1124                          "App) In ft_event(%d)", state));
1125 
1126     /******** Checkpoint Prep ********/
1127     if(OPAL_CRS_CHECKPOINT == state) {
1128         /*
1129          * Record the job session directory
1130          * This way we will recreate it on restart so that any components that
1131          * have old references to it (like btl/sm) can reference their files
1132          * (to close the fd's to them) on restart. We will remove it before we
1133          * create the new session directory.
1134          */
1135         orte_sstore.set_attr(orte_sstore_handle_current,
1136                              SSTORE_METADATA_LOCAL_MKDIR,
1137                              orte_process_info.job_session_dir);
1138 
1139         /*
1140          * If stopping then sync early
1141          */
1142         if( current_options->stop ) {
1143             orte_sstore.set_attr(current_ss_handle,
1144                                  SSTORE_METADATA_LOCAL_CRS_COMP,
1145                                  opal_crs_base_selected_component.base_version.mca_component_name);
1146 
1147             orte_sstore.sync(current_ss_handle);
1148         }
1149     }
1150     /******** Continue Recovery ********/
1151     else if (OPAL_CRS_CONTINUE == state ) {
1152 #if OPAL_ENABLE_CRDEBUG == 1
1153         /*
1154          * Send PID to HNP/daemon if debugging as an indicator that we have
1155          * finished the checkpoint operation.
1156          */
1157         if( ORTE_SUCCESS != (ret = snapc_full_app_ft_event_update_process_info(orte_process_info.my_name, getpid())) ) {
1158             ORTE_ERROR_LOG(ret);
1159             exit_status = ret;
1160             goto cleanup;
1161         }
1162 #endif
1163         ; /* Nothing */
1164     }
1165     /******** Restart Pre-Recovery ********/
1166     else if (OPAL_CRS_RESTART_PRE == state ) {
1167         ; /* Nothing */
1168     }
1169     /******** Restart Recovery ********/
1170     else if (OPAL_CRS_RESTART == state ) {
1171         OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1172                              "App) Initalized for Application %s (Restart) (%5d)\n",
1173                              ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), getpid()));
1174 
1175         /*
1176          * Send new PID to HNP/daemon
1177          * The checkpointer could have used a proxy program to boot us
1178          * so the pid that the orted got from fork() may not be the
1179          * PID of this application.
1180          * - Note: BLCR does this because it tries to preseve the PID
1181          *         of the program across checkpointes
1182          */
1183         if( ORTE_SUCCESS != (ret = snapc_full_app_ft_event_update_process_info(orte_process_info.my_name, getpid())) ) {
1184             ORTE_ERROR_LOG(ret);
1185             exit_status = ret;
1186             goto cleanup;
1187         }
1188 
1189         /*
1190          * JJH: Optionally the non-migrating processes can wait here in stage_2
1191          * JJH: This will delay the initial checkpoint, but potentially speed up
1192          * JJH: restart.
1193          */
1194     }
1195     /******** Termination ********/
1196     else if (OPAL_CRS_TERM == state ) {
1197         ; /* Nothing */
1198     }
1199     /******** Error State ********/
1200     else {
1201         ; /* Nothing */
1202     }
1203 
1204  cleanup:
1205     return exit_status;
1206 }
1207 
snapc_full_app_ft_event_update_process_info(orte_process_name_t proc,pid_t proc_pid)1208 static int snapc_full_app_ft_event_update_process_info(orte_process_name_t proc, pid_t proc_pid)
1209 {
1210     int ret, exit_status = ORTE_SUCCESS;
1211     opal_buffer_t *buffer = NULL;
1212     orte_snapc_cmd_flag_t command = ORTE_SNAPC_LOCAL_UPDATE_CMD;
1213 
1214     buffer = OBJ_NEW(opal_buffer_t);
1215 
1216     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_CMD))) {
1217         ORTE_ERROR_LOG(ret);
1218         exit_status = ret;
1219         goto cleanup;
1220     }
1221 
1222     /* JJH CLEANUP: Do we really need this, it is equal to sender */
1223     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &proc, 1, ORTE_NAME))) {
1224         ORTE_ERROR_LOG(ret);
1225         exit_status = ret;
1226         goto cleanup;
1227     }
1228 
1229     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &proc_pid, 1, OPAL_PID))) {
1230         ORTE_ERROR_LOG(ret);
1231         exit_status = ret;
1232         goto cleanup;
1233     }
1234 
1235 #if OPAL_ENABLE_CRDEBUG == 1
1236     if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &MPIR_debug_with_checkpoint, 1, OPAL_BOOL))) {
1237         ORTE_ERROR_LOG(ret);
1238         exit_status = ret;
1239         goto cleanup;
1240     }
1241 #endif
1242 
1243     if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
1244                                                        ORTE_RML_TAG_SNAPC,
1245                                                        orte_rml_send_callback, 0))) {
1246         ORTE_ERROR_LOG(ret);
1247         exit_status = ret;
1248         goto cleanup;
1249     }
1250 
1251     return ORTE_SUCCESS;
1252  cleanup:
1253     OBJ_RELEASE(buffer);
1254 
1255     return exit_status;
1256 }
1257 
app_coord_request_op(orte_snapc_base_request_op_t * datum)1258 int app_coord_request_op(orte_snapc_base_request_op_t *datum)
1259 {
1260     int ret, exit_status = ORTE_SUCCESS;
1261     orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
1262     opal_buffer_t *buffer = NULL;
1263     orte_std_cntr_t count;
1264     orte_rml_recv_cb_t *rb = NULL;
1265     int op_event, op_state;
1266     char *seq_str = NULL, *tmp_str = NULL;
1267     int cr_state = OPAL_CRS_CONTINUE;
1268     int app_pid, i;
1269 
1270     /*
1271      * Quiesce_end recovers the library before talking to the Global coord.
1272      */
1273     if( ORTE_SNAPC_OP_QUIESCE_END == datum->event) {
1274         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1275                              "App) Quiesce_end: Recovering the stack..."));
1276 
1277         /*
1278          * INC: Recover the stack
1279          */
1280         if( NULL == local_snapshot->component_name ) {
1281             local_snapshot->component_name = strdup("");
1282         }
1283         if( ORTE_SUCCESS != (ret = app_notify_resp_stage_2(cr_state) ) ) {
1284             exit_status = ret;
1285             ORTE_ERROR_LOG(ret);
1286             goto cleanup;
1287         }
1288 
1289         if(OPAL_SUCCESS != (ret = opal_cr_inc_core_recover(cr_state) ) ) {
1290             exit_status = ret;
1291             ORTE_ERROR_LOG(ret);
1292             goto cleanup;
1293         }
1294 
1295         if( ORTE_SUCCESS != (ret = app_notify_resp_stage_3(cr_state, false) )) {
1296             exit_status = ret;
1297             ORTE_ERROR_LOG(ret);
1298             goto cleanup;
1299         }
1300 
1301         currently_checkpointing = false;
1302         app_notif_processed = false;
1303 
1304         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1305                              "App) Quiesce_end: Recovered."));
1306     }
1307     else if( ORTE_SNAPC_OP_QUIESCE_CHECKPOINT == datum->event) {
1308         app_pid = getpid();
1309         cr_state = OPAL_CRS_RUNNING;
1310         if( OPAL_SUCCESS != (ret = opal_cr_inc_core_ckpt(app_pid, local_snapshot, current_options, &cr_state)) ) {
1311             ORTE_ERROR_LOG(ret);
1312             exit_status = ret;
1313         }
1314 
1315         if( OPAL_CRS_RESTART != cr_state ) {
1316             orte_sstore.sync(current_ss_handle);
1317         }
1318 
1319         orte_sstore.get_attr(current_ss_handle,
1320                              SSTORE_METADATA_GLOBAL_SNAP_SEQ,
1321                              &seq_str);
1322         if( NULL != seq_str ) {
1323             datum->seq_num = atoi(seq_str);
1324         } else {
1325             datum->seq_num = -1;
1326         }
1327 
1328         orte_sstore.get_attr(current_ss_handle,
1329                              SSTORE_METADATA_GLOBAL_SNAP_REF,
1330                              &(datum->global_handle));
1331         if( NULL == datum->global_handle ) {
1332             datum->global_handle = strdup("Unknown");
1333         }
1334 
1335         return exit_status;
1336     }
1337 
1338     /*
1339      * Leader: Send the info to the head node
1340      */
1341     if( datum->leader == (int)ORTE_PROC_MY_NAME->vpid ) {
1342         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1343                              "App) Request_op: Sending request (%3d)...",
1344                              datum->event));
1345         /*
1346          * Send request to HNP
1347          */
1348         buffer = OBJ_NEW(opal_buffer_t);
1349 
1350         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
1351             ORTE_ERROR_LOG(ret);
1352             exit_status = ret;
1353             goto cleanup;
1354         }
1355         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(ORTE_PROC_MY_NAME->jobid), 1, ORTE_JOBID))) {
1356             ORTE_ERROR_LOG(ret);
1357             exit_status = ret;
1358             goto cleanup;
1359         }
1360 
1361         if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->event), 1, OPAL_INT))) {
1362             ORTE_ERROR_LOG(ret);
1363             exit_status = ret;
1364             goto cleanup;
1365         }
1366 
1367         if( ORTE_SNAPC_OP_RESTART == datum->event) {
1368             if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->seq_num), 1, OPAL_INT))) {
1369                 ORTE_ERROR_LOG(ret);
1370                 exit_status = ret;
1371                 goto cleanup;
1372             }
1373             if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->global_handle), 1, OPAL_STRING))) {
1374                 ORTE_ERROR_LOG(ret);
1375                 exit_status = ret;
1376                 goto cleanup;
1377             }
1378         }
1379         else if( ORTE_SNAPC_OP_MIGRATE == datum->event) {
1380             /*
1381              * Check information
1382              *  Rank  | Hostname  | cr_off_node  | Meaning
1383              * -------+-----------+--------------+---------
1384              *  self  | home/same | false        | Do not move this process
1385              *        |           | true         | ERROR
1386              *        | NULL      | false        | Move wherever
1387              *        |           | true         | Move off of this node
1388              *        | other     | false/true   | Move to the 'other' node
1389              * -------+-----------+--------------+---------
1390              *  peer  | home/same | false        | Move 'peer' to me
1391              *        |           | true         | ERROR
1392              *        | NULL      | false        | Move wherever (Default: Move 'peer' to me)
1393              *        |           | true         | Move with peer to some other node
1394              *        | other     | false/true   | Move with peer to 'other' node
1395              * -------+-----------+--------------+---------
1396              * If 'rank' is set to a peer other than self, and the peer sets
1397              * conflicting 'hostname' or 'cr_off_node' preferences, then that
1398              * is an error. In which case the migration should fail.
1399              */
1400             currently_all_migrating = true;
1401 
1402             /*
1403              * Send information
1404              */
1405             if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->mig_num), 1, OPAL_INT))) {
1406                 ORTE_ERROR_LOG(ret);
1407                 exit_status = ret;
1408                 goto cleanup;
1409             }
1410 
1411             for( i = 0; i < datum->mig_num; ++i ) {
1412                 OPAL_OUTPUT_VERBOSE((30, mca_snapc_full_component.super.output_handle,
1413                                      "App) Migration %3d/%3d: Sending Rank %3d - Requested <%s> (%3d) %c\n",
1414                                      datum->mig_num, i,
1415                                      (datum->mig_vpids)[i],
1416                                      (datum->mig_host_pref)[i],
1417                                      (datum->mig_vpid_pref)[i],
1418                                      (OPAL_INT_TO_BOOL((datum->mig_off_node)[i]) ? 'T' : 'F')
1419                                      ));
1420 
1421                 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &((datum->mig_vpids)[i]), 1, OPAL_INT))) {
1422                     ORTE_ERROR_LOG(ret);
1423                     exit_status = ret;
1424                     goto cleanup;
1425                 }
1426                 tmp_str = strdup((datum->mig_host_pref)[i]);
1427                 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &tmp_str, 1, OPAL_STRING))) {
1428                     ORTE_ERROR_LOG(ret);
1429                     exit_status = ret;
1430                     goto cleanup;
1431                 }
1432                 if( NULL != tmp_str ) {
1433                     free(tmp_str);
1434                     tmp_str = NULL;
1435                 }
1436 
1437                 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &((datum->mig_vpid_pref)[i]), 1, OPAL_INT))) {
1438                     ORTE_ERROR_LOG(ret);
1439                     exit_status = ret;
1440                     goto cleanup;
1441                 }
1442                 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &((datum->mig_off_node)[i]), 1, OPAL_INT))) {
1443                     ORTE_ERROR_LOG(ret);
1444                     exit_status = ret;
1445                     goto cleanup;
1446                 }
1447             }
1448         }
1449 
1450         if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer, ORTE_RML_TAG_SNAPC_FULL,
1451                                                            orte_rml_send_callback, 0))) {
1452             ORTE_ERROR_LOG(ret);
1453             exit_status = ret;
1454             goto cleanup;
1455         }
1456         /* buffer should not be released here; the callback releases it */
1457         buffer = NULL;
1458     }
1459 
1460     /*
1461      * Wait for the response
1462      */
1463     if( ORTE_SNAPC_OP_CHECKPOINT == datum->event) {
1464         if( datum->leader == (int)ORTE_PROC_MY_NAME->vpid ) {
1465             /*
1466              * Wait for local completion (need to check to see if we are restarting)
1467              */
1468             while(OPAL_CRS_CONTINUE != current_cr_state &&
1469                   OPAL_CRS_RESTART  != current_cr_state &&
1470                   OPAL_CRS_ERROR    != current_cr_state ) {
1471                 opal_progress();
1472                 OPAL_CR_TEST_CHECKPOINT_READY();
1473             }
1474 
1475             /* Do not wait for a response if we are restarting (it will never arrive) */
1476             if( OPAL_CRS_RESTART == current_cr_state ) {
1477                 orte_sstore.get_attr(current_ss_handle,
1478                                      SSTORE_METADATA_GLOBAL_SNAP_SEQ,
1479                                      &seq_str);
1480                 if( NULL != seq_str ) {
1481                     datum->seq_num = atoi(seq_str);
1482                 } else {
1483                     datum->seq_num = -1;
1484                 }
1485 
1486                 orte_sstore.get_attr(current_ss_handle,
1487                                      SSTORE_METADATA_GLOBAL_SNAP_REF,
1488                                      &(datum->global_handle));
1489                 if( NULL == datum->global_handle ) {
1490                     datum->global_handle = strdup("Unknown");
1491                 }
1492 
1493                 current_cr_state = OPAL_CRS_NONE;
1494 
1495                 exit_status = ORTE_SUCCESS;
1496                 goto cleanup;
1497             }
1498 
1499             /*
1500              * Wait for a response regarding completion
1501              */
1502             rb = OBJ_NEW(orte_rml_recv_cb_t);
1503             rb->active = true;
1504             orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb);
1505             ORTE_WAIT_FOR_COMPLETION(rb->active);
1506 
1507             count = 1;
1508             if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) {
1509                 ORTE_ERROR_LOG(ret);
1510                 exit_status = ret;
1511                 goto cleanup;
1512             }
1513 
1514             count = 1;
1515             if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) {
1516                 ORTE_ERROR_LOG(ret);
1517                 exit_status = ret;
1518                 goto cleanup;
1519             }
1520 
1521             count = 1;
1522             if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_state, &count, OPAL_INT))) {
1523                 ORTE_ERROR_LOG(ret);
1524                 exit_status = ret;
1525                 goto cleanup;
1526             }
1527 
1528             orte_sstore.get_attr(last_ss_handle,
1529                                  SSTORE_METADATA_GLOBAL_SNAP_SEQ,
1530                                  &seq_str);
1531             datum->seq_num = atoi(seq_str);
1532 
1533             orte_sstore.get_attr(last_ss_handle,
1534                                  SSTORE_METADATA_GLOBAL_SNAP_REF,
1535                                  &(datum->global_handle));
1536         }
1537     }
1538     /*
1539      * Restart will terminate this process, so just wait...
1540      */
1541     else if( ORTE_SNAPC_OP_RESTART == datum->event) {
1542         while( 1 ) {
1543             opal_progress();
1544             OPAL_CR_TEST_CHECKPOINT_READY();
1545             sleep(1);
1546         }
1547     }
1548     /*
1549      * Leader waits for response
1550      */
1551     else if( ORTE_SNAPC_OP_MIGRATE == datum->event) {
1552         if( datum->leader == (int)ORTE_PROC_MY_NAME->vpid ) {
1553             while( currently_all_migrating ) {
1554                 opal_progress();
1555                 OPAL_CR_TEST_CHECKPOINT_READY();
1556                 sleep(1);
1557             }
1558 
1559             OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1560                                  "App) Request_op: Leader waiting for Migrate release (%3d)...",
1561                                  datum->event));
1562 
1563 
1564             /*
1565              * Wait for a response regarding completion
1566              */
1567             rb = OBJ_NEW(orte_rml_recv_cb_t);
1568             rb->active = true;
1569             orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb);
1570             ORTE_WAIT_FOR_COMPLETION(rb->active);
1571 
1572             count = 1;
1573             if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) {
1574                 ORTE_ERROR_LOG(ret);
1575                 exit_status = ret;
1576                 goto cleanup;
1577             }
1578 
1579             count = 1;
1580             if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) {
1581                 ORTE_ERROR_LOG(ret);
1582                 exit_status = ret;
1583                 goto cleanup;
1584             }
1585 
1586             count = 1;
1587             if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_state, &count, OPAL_INT))) {
1588                 ORTE_ERROR_LOG(ret);
1589                 exit_status = ret;
1590                 goto cleanup;
1591             }
1592 
1593             OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1594                                  "App) Request_op: Leader continuing from Migration (%3d)...",
1595                                  datum->event));
1596         }
1597     }
1598     /*
1599      * Everyone waits here for completion of Quiesce start
1600      */
1601     else if( ORTE_SNAPC_OP_QUIESCE_START == datum->event) {
1602         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1603                              "App) Quiesce_start: Waiting for release..."));
1604 
1605         while( !app_notif_processed ) {
1606             opal_progress();
1607             OPAL_CR_TEST_CHECKPOINT_READY();
1608         }
1609 
1610         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1611                              "App) Quiesce_start: Released"));
1612     }
1613     /*
1614      * No waiting for Quiesce end (barrier occurs in protocol)
1615      */
1616     else if( ORTE_SNAPC_OP_QUIESCE_END == datum->event) {
1617         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1618                              "App) Quiesce_end: Waiting for release..."));
1619 
1620         OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1621                              "App) Quiesce_end: Released"));
1622     }
1623 
1624 
1625  cleanup:
1626     if (NULL != buffer) {
1627         OBJ_RELEASE(buffer);
1628         buffer = NULL;
1629     }
1630     if (NULL != rb) {
1631         OBJ_RELEASE(rb);
1632         rb = NULL;
1633     }
1634 
1635     if( NULL != seq_str ) {
1636         free(seq_str);
1637         seq_str = NULL;
1638     }
1639 
1640     if( NULL != tmp_str ) {
1641         free(tmp_str);
1642         tmp_str = NULL;
1643     }
1644 
1645     return exit_status;
1646 }
1647