1 /*
2 * Copyright (c) 2004-2012 The Trustees of Indiana University.
3 * All rights reserved.
4 * Copyright (c) 2004-2005 The Trustees of the University of Tennessee.
5 * All rights reserved.
6 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
7 * University of Stuttgart. All rights reserved.
8 * Copyright (c) 2004-2005 The Regents of the University of California.
9 * All rights reserved.
10 * $COPYRIGHT$
11 *
12 * Additional copyrights may follow
13 *
14 * $HEADER$
15 */
16
17 #include "orte_config.h"
18
19 #include <errno.h>
20 #include <sys/types.h>
21 #ifdef HAVE_UNISTD_H
22 #include <unistd.h>
23 #endif /* HAVE_UNISTD_H */
24 #ifdef HAVE_FCNTL_H
25 #include <fcntl.h>
26 #endif /* HAVE_FCNTL_H */
27 #ifdef HAVE_SYS_TYPES_H
28 #include <sys/types.h>
29 #endif /* HAVE_SYS_TYPES_H */
30 #ifdef HAVE_SYS_STAT_H
31 #include <sys/stat.h> /* for mkfifo */
32 #endif /* HAVE_SYS_STAT_H */
33 #include <signal.h>
34 #include <string.h>
35
36 #include "orte/runtime/orte_cr.h"
37 #include "orte/runtime/orte_globals.h"
38 #include "orte/runtime/orte_wait.h"
39 #include "opal/runtime/opal_cr.h"
40 #include "opal/util/output.h"
41 #include "opal/mca/event/event.h"
42 #include "opal/util/opal_environ.h"
43 #include "orte/mca/mca.h"
44 #include "opal/mca/base/base.h"
45 #include "opal/mca/crs/crs.h"
46 #include "opal/mca/crs/base/base.h"
47
48 #include "orte/util/name_fns.h"
49 #include "opal/mca/pmix/pmix.h"
50 #include "orte/mca/snapc/snapc.h"
51 #include "orte/mca/snapc/base/base.h"
52 #include "orte/mca/errmgr/errmgr.h"
53 #include "orte/mca/grpcomm/grpcomm.h"
54 #include "orte/mca/rml/rml.h"
55 #include "orte/mca/rml/rml_types.h"
56 #include "orte/mca/routed/routed.h"
57 #include "orte/mca/routed/base/base.h"
58
59 #include "snapc_full.h"
60
61 /************************************
62 * Locally Global vars & functions :)
63 ************************************/
64 static void snapc_full_app_signal_handler (int signo);
65 static int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp);
66 static int app_notify_resp_stage_1(opal_cr_ckpt_cmd_state_t resp);
67 static int app_notify_resp_stage_2(int cr_state );
68 static int app_notify_resp_stage_3(int cr_state, bool skip_fin_msg);
69 static int app_define_pipe_names(void);
70 static int snapc_full_app_notify_reopen_files(void);
71 static int snapc_full_app_ckpt_handshake_start(opal_cr_ckpt_cmd_state_t resp);
72 static int snapc_full_app_ckpt_handshake_end(int cr_state);
73
74 static int snapc_full_app_ft_event_update_process_info(orte_process_name_t proc, pid_t pid);
75 static int snapc_full_app_finished_msg(int cr_state);
76
77 static int app_notify_resp_inc_prep_only(int cr_state);
78
79 static char *app_comm_pipe_r = NULL;
80 static char *app_comm_pipe_w = NULL;
81 static int app_comm_pipe_r_fd = -1;
82 static int app_comm_pipe_w_fd = -1;
83
84 static opal_crs_base_snapshot_t *local_snapshot = NULL;
85
86 static bool app_notif_processed = false;
87
88 static bool currently_migrating = false;
89 static bool currently_all_migrating = false;
90
91 static bool currently_checkpointing = false;
92 static int current_unique_id = 0;
93
94 static int current_cr_state = OPAL_CRS_NONE;
95
96 static orte_sstore_base_handle_t current_ss_handle = ORTE_SSTORE_HANDLE_INVALID, last_ss_handle = ORTE_SSTORE_HANDLE_INVALID;
97 static opal_crs_base_ckpt_options_t *current_options = NULL;
98
99 /************************
100 * Function Definitions
101 ************************/
102
app_coord_init()103 int app_coord_init()
104 {
105 int ret, exit_status = ORTE_SUCCESS;
106 opal_cr_notify_callback_fn_t prev_notify_func;
107 orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
108 orte_snapc_base_request_op_event_t op_event = ORTE_SNAPC_OP_INIT;
109 opal_buffer_t *buffer = NULL;
110
111 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
112 "App) Initalized for Application %s\n",
113 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME)));
114
115 /*
116 * Register the INC notification callback
117 */
118 opal_cr_reg_notify_callback(snapc_full_app_notify_response, &prev_notify_func);
119
120 /*
121 * Set the pipe names
122 */
123 current_unique_id = 0;
124 app_define_pipe_names();
125
126 /*
127 * Setup a signal handler to catch and start the proper thread
128 * to handle the checkpoint
129 */
130 if( SIG_ERR == signal(opal_cr_entry_point_signal, snapc_full_app_signal_handler) ) {
131 opal_output(mca_snapc_full_component.super.output_handle,
132 "App) init: Error: Failed to register signal %d\n",
133 opal_cr_entry_point_signal);
134 ORTE_ERROR_LOG(OPAL_ERROR);
135 exit_status = OPAL_ERROR;
136 goto cleanup;
137 }
138
139 OPAL_OUTPUT_VERBOSE((15, mca_snapc_full_component.super.output_handle,
140 "app) Named Pipes (%s) (%s), Signal (%d)",
141 app_comm_pipe_r, app_comm_pipe_w, opal_cr_entry_point_signal));
142
143 /*
144 * All processes must sync here, so the Global coordinator can know that
145 * it is safe to checkpoint now.
146 * Rank 0: Sends confirmation message to the Global Coordinator
147 */
148 if( 0 == ORTE_PROC_MY_NAME->vpid ) {
149 OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
150 "app) Startup Barrier..."));
151 }
152
153 if (ORTE_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
154 ORTE_ERROR_LOG(ret);
155 exit_status = ret;
156 goto cleanup;
157 }
158
159 if( 0 == ORTE_PROC_MY_NAME->vpid ) {
160 OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
161 "app) Startup Barrier: Send INIT to HNP...!"));
162
163 buffer = OBJ_NEW(opal_buffer_t);
164
165 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
166 ORTE_ERROR_LOG(ret);
167 exit_status = ret;
168 OBJ_RELEASE(buffer);
169 return ORTE_ERROR;
170 }
171 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(ORTE_PROC_MY_NAME->jobid), 1, ORTE_JOBID))) {
172 ORTE_ERROR_LOG(ret);
173 exit_status = ret;
174 OBJ_RELEASE(buffer);
175 return ORTE_ERROR;
176 }
177
178 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(op_event), 1, OPAL_INT))) {
179 ORTE_ERROR_LOG(ret);
180 exit_status = ret;
181 OBJ_RELEASE(buffer);
182 goto cleanup;
183 }
184
185 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
186 ORTE_RML_TAG_SNAPC_FULL,
187 orte_rml_send_callback, 0))) {
188 ORTE_ERROR_LOG(ret);
189 exit_status = ret;
190 OBJ_RELEASE(buffer);
191 return ORTE_ERROR;
192 }
193 }
194
195 if( 0 == ORTE_PROC_MY_NAME->vpid ) {
196 OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
197 "app) Startup Barrier: Done!"));
198 }
199
200 cleanup:
201 return exit_status;
202 }
203
app_coord_finalize()204 int app_coord_finalize()
205 {
206 int ret, exit_status = ORTE_SUCCESS;
207 orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
208 orte_snapc_base_request_op_event_t op_event = ORTE_SNAPC_OP_FIN;
209 opal_buffer_t *buffer = NULL;
210 orte_std_cntr_t count;
211 orte_rml_recv_cb_t *rb = NULL;
212
213 /*
214 * All processes must sync here, so the Global coordinator can know that
215 * it is no longer safe to checkpoint.
216 * Rank 0: Sends confirmation message to the Global Coordinator
217 */
218 if( 0 == ORTE_PROC_MY_NAME->vpid ) {
219 OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
220 "app) Shutdown Barrier..."));
221 }
222
223 if (ORTE_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
224 ORTE_ERROR_LOG(ret);
225 exit_status = ret;
226 goto cleanup;
227 }
228
229 if( 0 == ORTE_PROC_MY_NAME->vpid ) {
230 OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
231 "app) Shutdown Barrier: Send FIN to HNP...!"));
232
233 /* Tell HNP that we are finalizing */
234 buffer = OBJ_NEW(opal_buffer_t);
235
236 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
237 ORTE_ERROR_LOG(ret);
238 exit_status = ret;
239 OBJ_RELEASE(buffer);
240 goto cleanup;
241 }
242 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(ORTE_PROC_MY_NAME->jobid), 1, ORTE_JOBID))) {
243 ORTE_ERROR_LOG(ret);
244 exit_status = ret;
245 OBJ_RELEASE(buffer);
246 goto cleanup;
247 }
248
249 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(op_event), 1, OPAL_INT))) {
250 ORTE_ERROR_LOG(ret);
251 exit_status = ret;
252 OBJ_RELEASE(buffer);
253 goto cleanup;
254 }
255
256 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer,
257 ORTE_RML_TAG_SNAPC_FULL,
258 orte_rml_send_callback, 0))) {
259 ORTE_ERROR_LOG(ret);
260 exit_status = ret;
261 OBJ_RELEASE(buffer);
262 goto cleanup;
263 }
264
265 /* buffer should not be released here; the callback releases it */
266 buffer = NULL;
267
268 OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
269 "app) Shutdown Barrier: Waiting on FIN_ACK...!"));
270
271 /* Wait for HNP to tell us that it is ok to finish finalization.
272 * We could have been checkpointing just as we entered finalize, so we
273 * need to wait until the checkpoint is finished before finishing.
274 */
275 rb = OBJ_NEW(orte_rml_recv_cb_t);
276 rb->active = true;
277 orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb);
278 ORTE_WAIT_FOR_COMPLETION(rb->active);
279
280 count = 1;
281 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) {
282 ORTE_ERROR_LOG(ret);
283 exit_status = ret;
284 goto cleanup;
285 }
286
287 count = 1;
288 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) {
289 ORTE_ERROR_LOG(ret);
290 exit_status = ret;
291 goto cleanup;
292 }
293
294 OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
295 "app) Shutdown Barrier: Waiting on barrier...!"));
296 }
297
298 if( 0 == ORTE_PROC_MY_NAME->vpid ) {
299 OPAL_OUTPUT_VERBOSE((3, mca_snapc_full_component.super.output_handle,
300 "app) Shutdown Barrier, Done!"));
301 }
302
303 cleanup:
304 /* cleanup */
305 if (NULL != buffer) {
306 OBJ_RELEASE(buffer);
307 buffer = NULL;
308 }
309 if (NULL != rb) {
310 OBJ_RELEASE(rb);
311 rb = NULL;
312 }
313
314 /*
315 * Cleanup named pipes
316 */
317 if( NULL != app_comm_pipe_r) {
318 free(app_comm_pipe_r);
319 app_comm_pipe_r = NULL;
320 }
321
322 if( NULL != app_comm_pipe_w) {
323 free(app_comm_pipe_w);
324 app_comm_pipe_w = NULL;
325 }
326
327 return exit_status;
328 }
329
330 /******************
331 * Local functions
332 ******************/
snapc_full_app_signal_handler(int signo)333 static void snapc_full_app_signal_handler (int signo)
334 {
335 if( opal_cr_entry_point_signal != signo ) {
336 OPAL_OUTPUT_VERBOSE((1, mca_snapc_full_component.super.output_handle,
337 "App) signal_handler: Received unknown signal %d",
338 signo));
339 /* Not our signal */
340 return;
341 }
342 if( currently_checkpointing ) {
343 opal_output(0, "snapc:full:(app) Error: Received a signal to checkpoint, but Already checkpointing. Ignoring request!");
344 }
345 else {
346 currently_checkpointing = true;
347 /*
348 * Signal thread to start checkpoint handshake
349 */
350 opal_cr_checkpoint_request = OPAL_CR_STATUS_REQUESTED;
351
352 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
353 "App) signal_handler: Receive Checkpoint Request."));
354 }
355 }
356
357 /*
358 * Respond to an asynchronous checkpoint request
359 */
snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp)360 int snapc_full_app_notify_response(opal_cr_ckpt_cmd_state_t resp)
361 {
362 static int cr_state;
363 int app_pid;
364 int ret, exit_status = ORTE_SUCCESS;
365
366 /*
367 * Clear the options set
368 */
369 if( NULL == current_options ) {
370 current_options = OBJ_NEW(opal_crs_base_ckpt_options_t);
371 }
372
373 if( opal_cr_currently_stalled ) {
374 goto STAGE_1;
375 }
376
377 /* Default: use the fast way */
378 opal_cr_continue_like_restart = false;
379 orte_cr_flush_restart_files = true;
380
381 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
382 "App) notify_response: Stage 1..."));
383 if( ORTE_SUCCESS != (ret = app_notify_resp_stage_1(resp) ) ) {
384 ORTE_ERROR_LOG(ret);
385 exit_status = ret;
386 goto ckpt_cleanup;
387 }
388
389 cr_state = OPAL_CRS_RUNNING;
390 current_cr_state = cr_state;
391
392 #if OPAL_ENABLE_CRDEBUG == 1
393 if( current_options->attach_debugger ) {
394 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
395 "App) notify_response: C/R Debug: Wait for debugger..."));
396 MPIR_debug_with_checkpoint = true;
397 }
398 if( current_options->detach_debugger ) {
399 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
400 "App) notify_response: C/R Debug: Do not wait for debugger..."));
401 MPIR_debug_with_checkpoint = false;
402 }
403 #endif
404
405 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
406 "App) notify_response: Start checkpoint..."));
407 STAGE_1:
408 opal_cr_currently_stalled = false;
409
410 app_pid = getpid();
411 if( orte_snapc_full_skip_app ) {
412 OPAL_OUTPUT_VERBOSE((2, mca_snapc_full_component.super.output_handle,
413 "App) notify_response: Skipping App. (%d)\n",
414 app_pid));
415 ret = ORTE_SUCCESS;
416 cr_state = OPAL_CRS_CONTINUE;
417 }
418 else {
419 /*
420 * INC: Prepare stack using the registered coordination routine
421 */
422 if(OPAL_SUCCESS != (ret = opal_cr_inc_core_prep() ) ) {
423 if( OPAL_EXISTS == ret ) {
424 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
425 "App) notify_response: Stalling the checkpoint progress until state is stable again (PID = %d)\n",
426 app_pid));
427 opal_cr_currently_stalled = true;
428 return exit_status;
429 }
430 else {
431 opal_output(mca_snapc_full_component.super.output_handle,
432 "App) notify_response: Error: checkpoint notification failed. %d\n", ret);
433 ORTE_ERROR_LOG(ret);
434 exit_status = ret;
435 goto ckpt_cleanup;
436 }
437 }
438
439 /*
440 * If this is a quiesce_start operation then we can stop here after calling
441 * the INC prep. Need to keep the connection open for the quiesce_end()
442 * operation though.
443 */
444 if( current_options->inc_prep_only ) {
445 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
446 "App) notify_response: INC Prep Only..."));
447 return app_notify_resp_inc_prep_only(cr_state);
448 } else {
449 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
450 "App) notify_response: Normal operation..."));
451 }
452
453 /*
454 * INC: Take the checkpoint
455 *
456 * If migrating, only checkpoint if you are the target process
457 * otherwise just continue.
458 */
459 if( currently_all_migrating ) {
460 opal_cr_continue_like_restart = true;
461 orte_cr_flush_restart_files = false;
462 }
463 if( !currently_migrating && currently_all_migrating ) {
464 OPAL_OUTPUT_VERBOSE((2, mca_snapc_full_component.super.output_handle,
465 "App) notify_response: Skipping App. (%d) - This process is not migrating \n",
466 app_pid));
467 ret = ORTE_SUCCESS;
468 cr_state = OPAL_CRS_CONTINUE;
469 }
470 else {
471 ret = opal_cr_inc_core_ckpt(app_pid, local_snapshot, current_options, &cr_state);
472 }
473 current_cr_state = cr_state;
474
475 /*
476 * Tell Local Coordinator that we are done with local checkpoint
477 * (only if not restarting, on restart we are not attached to the Local
478 * Coordinator. )
479 */
480 if( OPAL_CRS_RESTART != cr_state ) {
481 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
482 "App) notify_response: Stage 2..."));
483 if( ORTE_SUCCESS != (ret = app_notify_resp_stage_2(cr_state) ) ) {
484 ORTE_ERROR_LOG(ret);
485 exit_status = ret;
486 goto ckpt_cleanup;
487 }
488 }
489
490 /*
491 * INC: Recover stack using the registered coordination routine
492 */
493 if( !currently_all_migrating ) {
494 if( OPAL_SUCCESS != (ret = opal_cr_inc_core_recover(cr_state)) ) {
495 ORTE_ERROR_LOG(ret);
496 exit_status = ret;
497 goto ckpt_cleanup;
498 }
499 }
500 /*
501 * If this is a migrating target process, then do not recover the stack, but terminate.
502 * All non-migrating processes will wait in the recovery until the target processes are
503 * restarted on the target nodes.
504 */
505 else {
506 /*
507 * If we are one of the processes migrating, then terminate after checkpointing
508 */
509 if( currently_migrating ) {
510 if( OPAL_CRS_RESTART != cr_state ) {
511 current_options->term = true;
512 }
513 else {
514 if( OPAL_SUCCESS != (ret = opal_cr_inc_core_recover(cr_state)) ) {
515 ORTE_ERROR_LOG(ret);
516 exit_status = ret;
517 goto ckpt_cleanup;
518 }
519 }
520 }
521 /*
522 * If we are not one of the processes migrating, then wait for release.
523 * Need to act like we are restarting during recovery, since the migrating processes
524 * will expect this logic.
525 */
526 else {
527 if( OPAL_SUCCESS != (ret = opal_cr_inc_core_recover(OPAL_CRS_RESTART)) ) {
528 ORTE_ERROR_LOG(ret);
529 exit_status = ret;
530 goto ckpt_cleanup;
531 }
532 }
533 }
534 }
535
536 /* Don't stall any longer */
537 opal_cr_stall_check = false;
538
539 if(OPAL_CRS_RESTART == cr_state) {
540 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
541 "App) notify_response: Restarting... (%s : %d)\n",
542 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), app_pid));
543
544 current_options->term = false;
545 /* Do not respond to the non-existent command line tool */
546 goto ckpt_cleanup;
547 }
548 else if(cr_state == OPAL_CRS_CONTINUE) {
549 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
550 "App) notify_response: Continuing...(%s : %d)\n",
551 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), app_pid));
552 ; /* Don't need to do anything here */
553 }
554 else if(cr_state == OPAL_CRS_TERM ) {
555 ; /* Don't need to do anything here */
556 }
557 else {
558 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
559 "App) notify_response: Unknown cr_state(%d) [%d]",
560 cr_state, app_pid));
561 }
562
563 ckpt_cleanup:
564 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
565 "App) notify_response: Stage 3..."));
566 if( ORTE_SUCCESS != (ret = app_notify_resp_stage_3(cr_state, false) )) {
567 ORTE_ERROR_LOG(ret);
568 exit_status = ret;
569 goto ckpt_cleanup;
570 }
571
572 if( current_options->term ) {
573 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
574 "App) notify_response: User has asked to terminate the application"));
575 /* Wait here for termination.
576 * If we call 'exit' then the job will fail in an ugly way, instead just
577 * wait for the Global coordinator to terminate us.
578 */
579 while(1) {
580 opal_progress();
581 sleep(1);
582 }
583 }
584
585 if( NULL != current_options ) {
586 OBJ_RELEASE(current_options);
587 current_options = NULL;
588 }
589
590 currently_checkpointing = false;
591
592 return exit_status;
593 }
594
app_notify_resp_stage_1(opal_cr_ckpt_cmd_state_t resp)595 static int app_notify_resp_stage_1(opal_cr_ckpt_cmd_state_t resp)
596 {
597 int ret, exit_status = ORTE_SUCCESS;
598
599 OPAL_CR_CLEAR_TIMERS();
600 opal_cr_timing_my_rank = ORTE_PROC_MY_NAME->vpid;
601 OPAL_CR_SET_TIMER(OPAL_CR_TIMER_ENTRY0);
602
603 /*
604 * Open communication channels
605 */
606 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
607 "App) notify_response: Open Communication Channels."));
608 if (ORTE_SUCCESS != (ret = snapc_full_app_notify_reopen_files())) {
609 ORTE_ERROR_LOG(ret);
610 exit_status = ret;
611 goto cleanup;
612 }
613
614 /*
615 * Initial Handshake
616 */
617 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
618 "App) notify_response: Initial Handshake."));
619 if( ORTE_SUCCESS != (ret = snapc_full_app_ckpt_handshake_start(resp) ) ) {
620 ORTE_ERROR_LOG(ret);
621 exit_status = ret;
622 goto cleanup;
623 }
624
625 OPAL_CR_SET_TIMER(OPAL_CR_TIMER_ENTRY1);
626
627 /*
628 * Register with SStore
629 */
630 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
631 "App) notify_response: Register with SStore..."));
632 if( OPAL_SUCCESS != (ret = orte_sstore.register_handle(current_ss_handle)) ) {
633 ORTE_ERROR_LOG(ret);
634 exit_status = ret;
635 goto cleanup;
636 }
637
638 local_snapshot = OBJ_NEW(opal_crs_base_snapshot_t);
639
640 if( !currently_migrating && currently_all_migrating ) {
641 orte_sstore.set_attr(current_ss_handle,
642 SSTORE_METADATA_LOCAL_SKIP_CKPT,
643 "1");
644 }
645
646 orte_sstore.get_attr(current_ss_handle,
647 SSTORE_METADATA_LOCAL_SNAP_LOC,
648 &(local_snapshot->snapshot_directory));
649 orte_sstore.get_attr(current_ss_handle,
650 SSTORE_METADATA_LOCAL_SNAP_META,
651 &(local_snapshot->metadata_filename));
652
653 OPAL_CR_SET_TIMER(OPAL_CR_TIMER_ENTRY2);
654
655 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
656 "App) notify_response: Start checkpoint... (%d)", (int)current_ss_handle));
657
658 cleanup:
659 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
660 "App) notify_response: Are we migrating [%5s]. Am I migrating [%5s]",
661 (currently_all_migrating ? "True" : "False"),
662 (currently_migrating ? "True" : "False") ));
663
664 return exit_status;
665 }
666
app_notify_resp_inc_prep_only(int cr_state)667 static int app_notify_resp_inc_prep_only(int cr_state)
668 {
669 int ret, exit_status = ORTE_SUCCESS;
670
671 /*
672 * Tell the local coordinator that we are done with the INC prep
673 */
674 if( sizeof(int) != (ret = write(app_comm_pipe_w_fd, &cr_state, sizeof(int))) ) {
675 opal_output(mca_snapc_full_component.super.output_handle,
676 "App) notify_response: Error: Unable to write cr_state to named pipe (%s).\n",
677 app_comm_pipe_w);
678 ORTE_ERROR_LOG(ret);
679 exit_status = ret;
680 goto cleanup;
681 }
682
683 app_notif_processed = true;
684
685 cleanup:
686 return exit_status;
687 }
688
app_notify_resp_stage_2(int cr_state)689 static int app_notify_resp_stage_2(int cr_state )
690 {
691 int ret;
692
693 OPAL_CR_SET_TIMER(OPAL_CR_TIMER_ENTRY3);
694
695 /*
696 * Sync SStore
697 * If we stopped the process, then we already did this
698 */
699 if( !(current_options->stop) ) {
700 if( currently_migrating || !currently_all_migrating ) {
701 orte_sstore.set_attr(current_ss_handle,
702 SSTORE_METADATA_LOCAL_CRS_COMP,
703 local_snapshot->component_name);
704 }
705
706 orte_sstore.sync(current_ss_handle);
707 }
708 last_ss_handle = current_ss_handle;
709 current_ss_handle = 0;
710
711 /*
712 * Final Handshake
713 */
714 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
715 "App) notify_response: Waiting for final handshake."));
716 if( ORTE_SUCCESS != (ret = snapc_full_app_ckpt_handshake_end(cr_state ) ) ) {
717 ORTE_ERROR_LOG(ret);
718 return ret;
719 }
720
721 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
722 "App) notify_response: Final Handshake complete."));
723
724 return ORTE_SUCCESS;
725 }
726
app_define_pipe_names(void)727 static int app_define_pipe_names(void)
728 {
729 if( NULL != app_comm_pipe_r ) {
730 free(app_comm_pipe_r);
731 app_comm_pipe_r = NULL;
732 }
733
734 if( NULL != app_comm_pipe_w ) {
735 free(app_comm_pipe_w);
736 app_comm_pipe_w = NULL;
737 }
738
739 asprintf(&app_comm_pipe_r, "%s/%s.%d_%d",
740 opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_R,
741 (int)getpid(), current_unique_id);
742 asprintf(&app_comm_pipe_w, "%s/%s.%d_%d",
743 opal_cr_pipe_dir, OPAL_CR_NAMED_PROG_W,
744 (int)getpid(), current_unique_id);
745
746 ++current_unique_id;
747
748 return ORTE_SUCCESS;
749 }
750
app_notify_resp_stage_3(int cr_state,bool skip_fin_msg)751 static int app_notify_resp_stage_3(int cr_state, bool skip_fin_msg)
752 {
753 /*
754 * Send a message to the local daemon letting it know that we are done
755 */
756 if( !skip_fin_msg ) {
757 snapc_full_app_finished_msg(cr_state);
758 }
759
760 /*
761 * Close and cleanup pipes
762 */
763 if( 0 <= app_comm_pipe_r_fd ) {
764 close(app_comm_pipe_r_fd);
765 app_comm_pipe_r_fd = -1;
766 }
767 if( 0 <= app_comm_pipe_w_fd ) {
768 close(app_comm_pipe_w_fd);
769 app_comm_pipe_w_fd = -1;
770 }
771
772 remove(app_comm_pipe_r);
773 remove(app_comm_pipe_w);
774
775 app_comm_pipe_r_fd = -1;
776 app_comm_pipe_w_fd = -1;
777
778 if( OPAL_CRS_RESTART == cr_state ) {
779 current_unique_id = 0;
780 }
781
782 app_define_pipe_names();
783
784 /* Prepare to wait for another checkpoint action */
785 opal_cr_checkpointing_state = OPAL_CR_STATUS_NONE;
786 opal_cr_currently_stalled = false;
787
788 currently_all_migrating = false;
789 currently_migrating = false;
790
791 OPAL_CR_SET_TIMER(OPAL_CR_TIMER_ENTRY4);
792 if(OPAL_CRS_RESTART != cr_state) {
793 OPAL_CR_DISPLAY_ALL_TIMERS();
794 }
795
796 return ORTE_SUCCESS;
797 }
798
snapc_full_app_finished_msg(int cr_state)799 static int snapc_full_app_finished_msg(int cr_state) {
800 int ret, exit_status = ORTE_SUCCESS;
801 opal_buffer_t *buffer = NULL;
802 orte_snapc_cmd_flag_t command = ORTE_SNAPC_LOCAL_FINISH_CMD;
803
804 buffer = OBJ_NEW(opal_buffer_t);
805
806 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_CMD))) {
807 ORTE_ERROR_LOG(ret);
808 exit_status = ret;
809 goto cleanup;
810 }
811
812 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &cr_state, 1, OPAL_INT))) {
813 ORTE_ERROR_LOG(ret);
814 exit_status = ret;
815 goto cleanup;
816 }
817
818 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
819 ORTE_RML_TAG_SNAPC,
820 orte_rml_send_callback, 0))) {
821 ORTE_ERROR_LOG(ret);
822 exit_status = ret;
823 goto cleanup;
824 }
825
826 return ORTE_SUCCESS;
827 cleanup:
828 OBJ_RELEASE(buffer);
829
830 return exit_status;
831 }
832
snapc_full_app_notify_reopen_files(void)833 static int snapc_full_app_notify_reopen_files(void)
834 {
835 int ret = OPAL_ERR_NOT_IMPLEMENTED;
836
837 #ifndef HAVE_MKFIFO
838 return ret;
839 #else
840 /*
841 * Open up the read pipe
842 */
843 if( (ret = mkfifo(app_comm_pipe_r, 0660)) < 0) {
844 if(EEXIST == ret || -1 == ret ) {
845 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
846 "App) notify_reopen_files: mkfifo failed because file (%s) already exists, attempting to use this pipe. (%d)",
847 app_comm_pipe_r, ret));
848 }
849 else {
850 opal_output(mca_snapc_full_component.super.output_handle,
851 "App) notify_reopen_files: Error: mkfifo failed to make named pipe (%s). (%d)\n",
852 app_comm_pipe_r, ret);
853 return ORTE_ERROR;
854 }
855 }
856
857 app_comm_pipe_r_fd = open(app_comm_pipe_r, O_RDWR);
858 if(app_comm_pipe_r_fd < 0) {
859 opal_output(mca_snapc_full_component.super.output_handle,
860 "App) init: Error: open failed to open the named pipe (%s). %d\n",
861 app_comm_pipe_r, app_comm_pipe_r_fd);
862 return ORTE_ERROR;
863 }
864
865 /*
866 * Open up the write pipe
867 */
868 if( (ret = mkfifo(app_comm_pipe_w, 0660)) < 0) {
869 if(EEXIST == ret || -1 == ret ) {
870 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
871 "App) notify_reopen_files: mkfifo failed because file (%s) already exists, attempting to use this pipe. (%d)",
872 app_comm_pipe_w, ret));
873 }
874 else {
875 opal_output(mca_snapc_full_component.super.output_handle,
876 "App) notify_reopen_files: Error: mkfifo failed to make named pipe (%s). (%d)\n",
877 app_comm_pipe_w, ret);
878 return ORTE_ERROR;
879 }
880 }
881
882 app_comm_pipe_w_fd = open(app_comm_pipe_w, O_WRONLY);
883 if(app_comm_pipe_w_fd < 0) {
884 opal_output(mca_snapc_full_component.super.output_handle,
885 "App) notify_reopen_files: Error: open failed to open the named pipe (%s). (%d)\n",
886 app_comm_pipe_w, app_comm_pipe_w_fd);
887 return ORTE_ERROR;
888 }
889
890 return ORTE_SUCCESS;
891 #endif /* HAVE_MKFIFO */
892 }
893
snapc_full_app_ckpt_handshake_start(opal_cr_ckpt_cmd_state_t resp)894 static int snapc_full_app_ckpt_handshake_start(opal_cr_ckpt_cmd_state_t resp)
895 {
896 int ret, exit_status = ORTE_SUCCESS;
897 int tmp_resp, opt_rep;
898
899 /*
900 * Get the initial handshake command:
901 * - Migrating option [all, me]
902 * - Term argument
903 * - Stop argument
904 */
905 if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
906 opal_output(mca_snapc_full_component.super.output_handle,
907 "App) notify_response: Error: Unable to read the all_migrating option from named pipe (%s). %d\n",
908 app_comm_pipe_r, ret);
909 ORTE_ERROR_LOG(ret);
910 goto cleanup;
911 }
912 currently_all_migrating = OPAL_INT_TO_BOOL(opt_rep);
913
914 if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
915 opal_output(mca_snapc_full_component.super.output_handle,
916 "App) notify_response: Error: Unable to read the migrating option from named pipe (%s). %d\n",
917 app_comm_pipe_r, ret);
918 ORTE_ERROR_LOG(ret);
919 goto cleanup;
920 }
921 currently_migrating = OPAL_INT_TO_BOOL(opt_rep);
922
923 if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
924 opal_output(mca_snapc_full_component.super.output_handle,
925 "App) notify_response: Error: Unable to read the 'term' from named pipe (%s). %d\n",
926 app_comm_pipe_r, ret);
927 ORTE_ERROR_LOG(ret);
928 goto cleanup;
929 }
930 current_options->term = OPAL_INT_TO_BOOL(opt_rep);
931
932 if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
933 opal_output(mca_snapc_full_component.super.output_handle,
934 "App) notify_response: Error: Unable to read the 'stop' from named pipe (%s). %d\n",
935 app_comm_pipe_r, ret);
936 ORTE_ERROR_LOG(ret);
937 goto cleanup;
938 }
939 current_options->stop = OPAL_INT_TO_BOOL(opt_rep);
940
941 if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
942 opal_output(mca_snapc_full_component.super.output_handle,
943 "App) notify_response: Error: Unable to read the 'inc_prep_only' from named pipe (%s). %d\n",
944 app_comm_pipe_r, ret);
945 ORTE_ERROR_LOG(ret);
946 goto cleanup;
947 }
948 current_options->inc_prep_only = OPAL_INT_TO_BOOL(opt_rep);
949
950 if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
951 opal_output(mca_snapc_full_component.super.output_handle,
952 "App) notify_response: Error: Unable to read the 'inc_recover_only' from named pipe (%s). %d\n",
953 app_comm_pipe_r, ret);
954 ORTE_ERROR_LOG(ret);
955 goto cleanup;
956 }
957 current_options->inc_recover_only = OPAL_INT_TO_BOOL(opt_rep);
958
959 #if OPAL_ENABLE_CRDEBUG == 1
960 if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
961 opal_output(mca_snapc_full_component.super.output_handle,
962 "App) notify_response: Error: Unable to read the 'attach_debugger' from named pipe (%s). %d\n",
963 app_comm_pipe_r, ret);
964 ORTE_ERROR_LOG(ret);
965 goto cleanup;
966 }
967 current_options->attach_debugger = OPAL_INT_TO_BOOL(opt_rep);
968
969 if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
970 opal_output(mca_snapc_full_component.super.output_handle,
971 "App) notify_response: Error: Unable to read the 'detach_debugger' from named pipe (%s). %d\n",
972 app_comm_pipe_r, ret);
973 ORTE_ERROR_LOG(ret);
974 goto cleanup;
975 }
976 current_options->detach_debugger = OPAL_INT_TO_BOOL(opt_rep);
977 #endif
978
979 /*
980 * Get SStore Handle
981 */
982 if( sizeof(orte_sstore_base_handle_t) != (ret = read(app_comm_pipe_r_fd, ¤t_ss_handle, sizeof(orte_sstore_base_handle_t))) ) {
983 opal_output(mca_snapc_full_component.super.output_handle,
984 "App) notify_response: Error: Unable to read the sstore handle from named pipe (%s). %d\n",
985 app_comm_pipe_r, ret);
986 ORTE_ERROR_LOG(ret);
987 goto cleanup;
988 }
989
990 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
991 "App) %s Received Options... Responding with %d\n",
992 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), (int)resp));
993
994 /*
995 * Write back the response to the request (message printed below)
996 */
997 tmp_resp = (int)resp;
998 if( sizeof(int) != (ret = write(app_comm_pipe_w_fd, &tmp_resp, sizeof(int)) ) ) {
999 opal_output(mca_snapc_full_component.super.output_handle,
1000 "App) notify_response: %d: Error: Unable to write to pipe (%s) ret = %d [Line %d]\n",
1001 tmp_resp, app_comm_pipe_w, ret, __LINE__);
1002 ORTE_ERROR_LOG(ret);
1003 goto cleanup;
1004 }
1005
1006 /*
1007 * Respond that the checkpoint is currently in progress
1008 */
1009 if( OPAL_CHECKPOINT_CMD_IN_PROGRESS == resp ) {
1010 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1011 "App) notify_response: Checkpoint in progress, cannot start (%d)",
1012 getpid()));
1013 ORTE_ERROR_LOG(ret);
1014 goto cleanup;
1015 }
1016 /*
1017 * Respond that the application is unable to be checkpointed
1018 */
1019 else if( OPAL_CHECKPOINT_CMD_NULL == resp ) {
1020 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1021 "App) notify_response: Non-checkpointable application, cannot start (%d)",
1022 getpid()));
1023 ORTE_ERROR_LOG(ret);
1024 goto cleanup;
1025 }
1026 /*
1027 * Respond that some error has occurred such that the application is
1028 * not able to be checkpointed
1029 */
1030 else if( OPAL_CHECKPOINT_CMD_ERROR == resp ) {
1031 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1032 "App) notify_response: Error generated, cannot start (%d)",
1033 getpid()));
1034 ORTE_ERROR_LOG(ret);
1035 goto cleanup;
1036 }
1037
1038 /*
1039 * Respond signalng that we wish to respond to this request
1040 */
1041 OPAL_OUTPUT_VERBOSE((10, mca_snapc_full_component.super.output_handle,
1042 "App) notify_response: Starting checkpoint request (%d)",
1043 getpid()));
1044
1045 /*
1046 * Get the sentinel value indicating that we can start now
1047 * JJH: Check for an error here indicating that even though this process is
1048 * OK to checkpoint others might not be in which case we should cleanup
1049 * properly.
1050 */
1051 if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &opt_rep, sizeof(int))) ) {
1052 opal_output(mca_snapc_full_component.super.output_handle,
1053 "App) notify_response: Error: Unable to read from named pipe (%s). %d\n",
1054 app_comm_pipe_r, ret);
1055 ORTE_ERROR_LOG(ret);
1056 goto cleanup;
1057 }
1058
1059 cleanup:
1060 return exit_status;
1061 }
1062
snapc_full_app_ckpt_handshake_end(int cr_state)1063 static int snapc_full_app_ckpt_handshake_end(int cr_state)
1064 {
1065 int ret, exit_status = ORTE_SUCCESS;
1066 int last_cmd = 0;
1067 int err;
1068
1069 /*
1070 * Return the final checkpoint state to the local coordinator
1071 */
1072 if( sizeof(int) != (ret = write(app_comm_pipe_w_fd, &cr_state, sizeof(int))) ) {
1073 err = errno;
1074 opal_output(mca_snapc_full_component.super.output_handle,
1075 "App) notify_response: Error: Unable to write cr_state to named pipe (%s). %d/%d/%s\n",
1076 app_comm_pipe_w, ret, err, strerror(err));
1077 ORTE_ERROR_LOG(ret);
1078 exit_status = ret;
1079 goto cleanup;
1080 }
1081
1082 if( currently_all_migrating && currently_migrating ) {
1083 app_notify_resp_stage_3(cr_state, true);
1084 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1085 "App) handshake_end: Waiting for termination (%d)",
1086 getpid()));
1087 /* Wait here for termination, do not terminate ourselves.
1088 * JJH: We cannot terminate ourselves without killing the job...
1089 */
1090 while(1) {
1091 opal_progress();
1092 sleep(1);
1093 }
1094 }
1095
1096 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1097 "App) handshake_end: Waiting for release (%d)",
1098 getpid()));
1099
1100 /*
1101 * Wait for the local coordinator to release us
1102 */
1103 if( sizeof(int) != (ret = read(app_comm_pipe_r_fd, &last_cmd, sizeof(int))) ) {
1104 opal_output(mca_snapc_full_component.super.output_handle,
1105 "App) notify_response: Error: Unable to read the 'last_cmd' from named pipe (%s). %d\n",
1106 app_comm_pipe_r, ret);
1107 ORTE_ERROR_LOG(ret);
1108 exit_status = ret;
1109 goto cleanup;
1110 }
1111
1112 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1113 "App) handshake_end: Released... (%d)",
1114 getpid()));
1115
1116 cleanup:
1117 return exit_status;
1118 }
1119
app_coord_ft_event(int state)1120 int app_coord_ft_event(int state) {
1121 int ret, exit_status = ORTE_SUCCESS;
1122
1123 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1124 "App) In ft_event(%d)", state));
1125
1126 /******** Checkpoint Prep ********/
1127 if(OPAL_CRS_CHECKPOINT == state) {
1128 /*
1129 * Record the job session directory
1130 * This way we will recreate it on restart so that any components that
1131 * have old references to it (like btl/sm) can reference their files
1132 * (to close the fd's to them) on restart. We will remove it before we
1133 * create the new session directory.
1134 */
1135 orte_sstore.set_attr(orte_sstore_handle_current,
1136 SSTORE_METADATA_LOCAL_MKDIR,
1137 orte_process_info.job_session_dir);
1138
1139 /*
1140 * If stopping then sync early
1141 */
1142 if( current_options->stop ) {
1143 orte_sstore.set_attr(current_ss_handle,
1144 SSTORE_METADATA_LOCAL_CRS_COMP,
1145 opal_crs_base_selected_component.base_version.mca_component_name);
1146
1147 orte_sstore.sync(current_ss_handle);
1148 }
1149 }
1150 /******** Continue Recovery ********/
1151 else if (OPAL_CRS_CONTINUE == state ) {
1152 #if OPAL_ENABLE_CRDEBUG == 1
1153 /*
1154 * Send PID to HNP/daemon if debugging as an indicator that we have
1155 * finished the checkpoint operation.
1156 */
1157 if( ORTE_SUCCESS != (ret = snapc_full_app_ft_event_update_process_info(orte_process_info.my_name, getpid())) ) {
1158 ORTE_ERROR_LOG(ret);
1159 exit_status = ret;
1160 goto cleanup;
1161 }
1162 #endif
1163 ; /* Nothing */
1164 }
1165 /******** Restart Pre-Recovery ********/
1166 else if (OPAL_CRS_RESTART_PRE == state ) {
1167 ; /* Nothing */
1168 }
1169 /******** Restart Recovery ********/
1170 else if (OPAL_CRS_RESTART == state ) {
1171 OPAL_OUTPUT_VERBOSE((20, mca_snapc_full_component.super.output_handle,
1172 "App) Initalized for Application %s (Restart) (%5d)\n",
1173 ORTE_NAME_PRINT(ORTE_PROC_MY_NAME), getpid()));
1174
1175 /*
1176 * Send new PID to HNP/daemon
1177 * The checkpointer could have used a proxy program to boot us
1178 * so the pid that the orted got from fork() may not be the
1179 * PID of this application.
1180 * - Note: BLCR does this because it tries to preseve the PID
1181 * of the program across checkpointes
1182 */
1183 if( ORTE_SUCCESS != (ret = snapc_full_app_ft_event_update_process_info(orte_process_info.my_name, getpid())) ) {
1184 ORTE_ERROR_LOG(ret);
1185 exit_status = ret;
1186 goto cleanup;
1187 }
1188
1189 /*
1190 * JJH: Optionally the non-migrating processes can wait here in stage_2
1191 * JJH: This will delay the initial checkpoint, but potentially speed up
1192 * JJH: restart.
1193 */
1194 }
1195 /******** Termination ********/
1196 else if (OPAL_CRS_TERM == state ) {
1197 ; /* Nothing */
1198 }
1199 /******** Error State ********/
1200 else {
1201 ; /* Nothing */
1202 }
1203
1204 cleanup:
1205 return exit_status;
1206 }
1207
snapc_full_app_ft_event_update_process_info(orte_process_name_t proc,pid_t proc_pid)1208 static int snapc_full_app_ft_event_update_process_info(orte_process_name_t proc, pid_t proc_pid)
1209 {
1210 int ret, exit_status = ORTE_SUCCESS;
1211 opal_buffer_t *buffer = NULL;
1212 orte_snapc_cmd_flag_t command = ORTE_SNAPC_LOCAL_UPDATE_CMD;
1213
1214 buffer = OBJ_NEW(opal_buffer_t);
1215
1216 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_CMD))) {
1217 ORTE_ERROR_LOG(ret);
1218 exit_status = ret;
1219 goto cleanup;
1220 }
1221
1222 /* JJH CLEANUP: Do we really need this, it is equal to sender */
1223 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &proc, 1, ORTE_NAME))) {
1224 ORTE_ERROR_LOG(ret);
1225 exit_status = ret;
1226 goto cleanup;
1227 }
1228
1229 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &proc_pid, 1, OPAL_PID))) {
1230 ORTE_ERROR_LOG(ret);
1231 exit_status = ret;
1232 goto cleanup;
1233 }
1234
1235 #if OPAL_ENABLE_CRDEBUG == 1
1236 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &MPIR_debug_with_checkpoint, 1, OPAL_BOOL))) {
1237 ORTE_ERROR_LOG(ret);
1238 exit_status = ret;
1239 goto cleanup;
1240 }
1241 #endif
1242
1243 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_DAEMON, buffer,
1244 ORTE_RML_TAG_SNAPC,
1245 orte_rml_send_callback, 0))) {
1246 ORTE_ERROR_LOG(ret);
1247 exit_status = ret;
1248 goto cleanup;
1249 }
1250
1251 return ORTE_SUCCESS;
1252 cleanup:
1253 OBJ_RELEASE(buffer);
1254
1255 return exit_status;
1256 }
1257
app_coord_request_op(orte_snapc_base_request_op_t * datum)1258 int app_coord_request_op(orte_snapc_base_request_op_t *datum)
1259 {
1260 int ret, exit_status = ORTE_SUCCESS;
1261 orte_snapc_full_cmd_flag_t command = ORTE_SNAPC_FULL_REQUEST_OP_CMD;
1262 opal_buffer_t *buffer = NULL;
1263 orte_std_cntr_t count;
1264 orte_rml_recv_cb_t *rb = NULL;
1265 int op_event, op_state;
1266 char *seq_str = NULL, *tmp_str = NULL;
1267 int cr_state = OPAL_CRS_CONTINUE;
1268 int app_pid, i;
1269
1270 /*
1271 * Quiesce_end recovers the library before talking to the Global coord.
1272 */
1273 if( ORTE_SNAPC_OP_QUIESCE_END == datum->event) {
1274 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1275 "App) Quiesce_end: Recovering the stack..."));
1276
1277 /*
1278 * INC: Recover the stack
1279 */
1280 if( NULL == local_snapshot->component_name ) {
1281 local_snapshot->component_name = strdup("");
1282 }
1283 if( ORTE_SUCCESS != (ret = app_notify_resp_stage_2(cr_state) ) ) {
1284 exit_status = ret;
1285 ORTE_ERROR_LOG(ret);
1286 goto cleanup;
1287 }
1288
1289 if(OPAL_SUCCESS != (ret = opal_cr_inc_core_recover(cr_state) ) ) {
1290 exit_status = ret;
1291 ORTE_ERROR_LOG(ret);
1292 goto cleanup;
1293 }
1294
1295 if( ORTE_SUCCESS != (ret = app_notify_resp_stage_3(cr_state, false) )) {
1296 exit_status = ret;
1297 ORTE_ERROR_LOG(ret);
1298 goto cleanup;
1299 }
1300
1301 currently_checkpointing = false;
1302 app_notif_processed = false;
1303
1304 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1305 "App) Quiesce_end: Recovered."));
1306 }
1307 else if( ORTE_SNAPC_OP_QUIESCE_CHECKPOINT == datum->event) {
1308 app_pid = getpid();
1309 cr_state = OPAL_CRS_RUNNING;
1310 if( OPAL_SUCCESS != (ret = opal_cr_inc_core_ckpt(app_pid, local_snapshot, current_options, &cr_state)) ) {
1311 ORTE_ERROR_LOG(ret);
1312 exit_status = ret;
1313 }
1314
1315 if( OPAL_CRS_RESTART != cr_state ) {
1316 orte_sstore.sync(current_ss_handle);
1317 }
1318
1319 orte_sstore.get_attr(current_ss_handle,
1320 SSTORE_METADATA_GLOBAL_SNAP_SEQ,
1321 &seq_str);
1322 if( NULL != seq_str ) {
1323 datum->seq_num = atoi(seq_str);
1324 } else {
1325 datum->seq_num = -1;
1326 }
1327
1328 orte_sstore.get_attr(current_ss_handle,
1329 SSTORE_METADATA_GLOBAL_SNAP_REF,
1330 &(datum->global_handle));
1331 if( NULL == datum->global_handle ) {
1332 datum->global_handle = strdup("Unknown");
1333 }
1334
1335 return exit_status;
1336 }
1337
1338 /*
1339 * Leader: Send the info to the head node
1340 */
1341 if( datum->leader == (int)ORTE_PROC_MY_NAME->vpid ) {
1342 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1343 "App) Request_op: Sending request (%3d)...",
1344 datum->event));
1345 /*
1346 * Send request to HNP
1347 */
1348 buffer = OBJ_NEW(opal_buffer_t);
1349
1350 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &command, 1, ORTE_SNAPC_FULL_CMD))) {
1351 ORTE_ERROR_LOG(ret);
1352 exit_status = ret;
1353 goto cleanup;
1354 }
1355 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(ORTE_PROC_MY_NAME->jobid), 1, ORTE_JOBID))) {
1356 ORTE_ERROR_LOG(ret);
1357 exit_status = ret;
1358 goto cleanup;
1359 }
1360
1361 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->event), 1, OPAL_INT))) {
1362 ORTE_ERROR_LOG(ret);
1363 exit_status = ret;
1364 goto cleanup;
1365 }
1366
1367 if( ORTE_SNAPC_OP_RESTART == datum->event) {
1368 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->seq_num), 1, OPAL_INT))) {
1369 ORTE_ERROR_LOG(ret);
1370 exit_status = ret;
1371 goto cleanup;
1372 }
1373 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->global_handle), 1, OPAL_STRING))) {
1374 ORTE_ERROR_LOG(ret);
1375 exit_status = ret;
1376 goto cleanup;
1377 }
1378 }
1379 else if( ORTE_SNAPC_OP_MIGRATE == datum->event) {
1380 /*
1381 * Check information
1382 * Rank | Hostname | cr_off_node | Meaning
1383 * -------+-----------+--------------+---------
1384 * self | home/same | false | Do not move this process
1385 * | | true | ERROR
1386 * | NULL | false | Move wherever
1387 * | | true | Move off of this node
1388 * | other | false/true | Move to the 'other' node
1389 * -------+-----------+--------------+---------
1390 * peer | home/same | false | Move 'peer' to me
1391 * | | true | ERROR
1392 * | NULL | false | Move wherever (Default: Move 'peer' to me)
1393 * | | true | Move with peer to some other node
1394 * | other | false/true | Move with peer to 'other' node
1395 * -------+-----------+--------------+---------
1396 * If 'rank' is set to a peer other than self, and the peer sets
1397 * conflicting 'hostname' or 'cr_off_node' preferences, then that
1398 * is an error. In which case the migration should fail.
1399 */
1400 currently_all_migrating = true;
1401
1402 /*
1403 * Send information
1404 */
1405 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &(datum->mig_num), 1, OPAL_INT))) {
1406 ORTE_ERROR_LOG(ret);
1407 exit_status = ret;
1408 goto cleanup;
1409 }
1410
1411 for( i = 0; i < datum->mig_num; ++i ) {
1412 OPAL_OUTPUT_VERBOSE((30, mca_snapc_full_component.super.output_handle,
1413 "App) Migration %3d/%3d: Sending Rank %3d - Requested <%s> (%3d) %c\n",
1414 datum->mig_num, i,
1415 (datum->mig_vpids)[i],
1416 (datum->mig_host_pref)[i],
1417 (datum->mig_vpid_pref)[i],
1418 (OPAL_INT_TO_BOOL((datum->mig_off_node)[i]) ? 'T' : 'F')
1419 ));
1420
1421 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &((datum->mig_vpids)[i]), 1, OPAL_INT))) {
1422 ORTE_ERROR_LOG(ret);
1423 exit_status = ret;
1424 goto cleanup;
1425 }
1426 tmp_str = strdup((datum->mig_host_pref)[i]);
1427 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &tmp_str, 1, OPAL_STRING))) {
1428 ORTE_ERROR_LOG(ret);
1429 exit_status = ret;
1430 goto cleanup;
1431 }
1432 if( NULL != tmp_str ) {
1433 free(tmp_str);
1434 tmp_str = NULL;
1435 }
1436
1437 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &((datum->mig_vpid_pref)[i]), 1, OPAL_INT))) {
1438 ORTE_ERROR_LOG(ret);
1439 exit_status = ret;
1440 goto cleanup;
1441 }
1442 if (ORTE_SUCCESS != (ret = opal_dss.pack(buffer, &((datum->mig_off_node)[i]), 1, OPAL_INT))) {
1443 ORTE_ERROR_LOG(ret);
1444 exit_status = ret;
1445 goto cleanup;
1446 }
1447 }
1448 }
1449
1450 if (ORTE_SUCCESS != (ret = orte_rml.send_buffer_nb(ORTE_PROC_MY_HNP, buffer, ORTE_RML_TAG_SNAPC_FULL,
1451 orte_rml_send_callback, 0))) {
1452 ORTE_ERROR_LOG(ret);
1453 exit_status = ret;
1454 goto cleanup;
1455 }
1456 /* buffer should not be released here; the callback releases it */
1457 buffer = NULL;
1458 }
1459
1460 /*
1461 * Wait for the response
1462 */
1463 if( ORTE_SNAPC_OP_CHECKPOINT == datum->event) {
1464 if( datum->leader == (int)ORTE_PROC_MY_NAME->vpid ) {
1465 /*
1466 * Wait for local completion (need to check to see if we are restarting)
1467 */
1468 while(OPAL_CRS_CONTINUE != current_cr_state &&
1469 OPAL_CRS_RESTART != current_cr_state &&
1470 OPAL_CRS_ERROR != current_cr_state ) {
1471 opal_progress();
1472 OPAL_CR_TEST_CHECKPOINT_READY();
1473 }
1474
1475 /* Do not wait for a response if we are restarting (it will never arrive) */
1476 if( OPAL_CRS_RESTART == current_cr_state ) {
1477 orte_sstore.get_attr(current_ss_handle,
1478 SSTORE_METADATA_GLOBAL_SNAP_SEQ,
1479 &seq_str);
1480 if( NULL != seq_str ) {
1481 datum->seq_num = atoi(seq_str);
1482 } else {
1483 datum->seq_num = -1;
1484 }
1485
1486 orte_sstore.get_attr(current_ss_handle,
1487 SSTORE_METADATA_GLOBAL_SNAP_REF,
1488 &(datum->global_handle));
1489 if( NULL == datum->global_handle ) {
1490 datum->global_handle = strdup("Unknown");
1491 }
1492
1493 current_cr_state = OPAL_CRS_NONE;
1494
1495 exit_status = ORTE_SUCCESS;
1496 goto cleanup;
1497 }
1498
1499 /*
1500 * Wait for a response regarding completion
1501 */
1502 rb = OBJ_NEW(orte_rml_recv_cb_t);
1503 rb->active = true;
1504 orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb);
1505 ORTE_WAIT_FOR_COMPLETION(rb->active);
1506
1507 count = 1;
1508 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) {
1509 ORTE_ERROR_LOG(ret);
1510 exit_status = ret;
1511 goto cleanup;
1512 }
1513
1514 count = 1;
1515 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) {
1516 ORTE_ERROR_LOG(ret);
1517 exit_status = ret;
1518 goto cleanup;
1519 }
1520
1521 count = 1;
1522 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_state, &count, OPAL_INT))) {
1523 ORTE_ERROR_LOG(ret);
1524 exit_status = ret;
1525 goto cleanup;
1526 }
1527
1528 orte_sstore.get_attr(last_ss_handle,
1529 SSTORE_METADATA_GLOBAL_SNAP_SEQ,
1530 &seq_str);
1531 datum->seq_num = atoi(seq_str);
1532
1533 orte_sstore.get_attr(last_ss_handle,
1534 SSTORE_METADATA_GLOBAL_SNAP_REF,
1535 &(datum->global_handle));
1536 }
1537 }
1538 /*
1539 * Restart will terminate this process, so just wait...
1540 */
1541 else if( ORTE_SNAPC_OP_RESTART == datum->event) {
1542 while( 1 ) {
1543 opal_progress();
1544 OPAL_CR_TEST_CHECKPOINT_READY();
1545 sleep(1);
1546 }
1547 }
1548 /*
1549 * Leader waits for response
1550 */
1551 else if( ORTE_SNAPC_OP_MIGRATE == datum->event) {
1552 if( datum->leader == (int)ORTE_PROC_MY_NAME->vpid ) {
1553 while( currently_all_migrating ) {
1554 opal_progress();
1555 OPAL_CR_TEST_CHECKPOINT_READY();
1556 sleep(1);
1557 }
1558
1559 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1560 "App) Request_op: Leader waiting for Migrate release (%3d)...",
1561 datum->event));
1562
1563
1564 /*
1565 * Wait for a response regarding completion
1566 */
1567 rb = OBJ_NEW(orte_rml_recv_cb_t);
1568 rb->active = true;
1569 orte_rml.recv_buffer_nb(ORTE_PROC_MY_HNP, ORTE_RML_TAG_SNAPC_FULL, 0, orte_rml_recv_callback, rb);
1570 ORTE_WAIT_FOR_COMPLETION(rb->active);
1571
1572 count = 1;
1573 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &command, &count, ORTE_SNAPC_FULL_CMD))) {
1574 ORTE_ERROR_LOG(ret);
1575 exit_status = ret;
1576 goto cleanup;
1577 }
1578
1579 count = 1;
1580 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_event, &count, OPAL_INT))) {
1581 ORTE_ERROR_LOG(ret);
1582 exit_status = ret;
1583 goto cleanup;
1584 }
1585
1586 count = 1;
1587 if (ORTE_SUCCESS != (ret = opal_dss.unpack(&rb->data, &op_state, &count, OPAL_INT))) {
1588 ORTE_ERROR_LOG(ret);
1589 exit_status = ret;
1590 goto cleanup;
1591 }
1592
1593 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1594 "App) Request_op: Leader continuing from Migration (%3d)...",
1595 datum->event));
1596 }
1597 }
1598 /*
1599 * Everyone waits here for completion of Quiesce start
1600 */
1601 else if( ORTE_SNAPC_OP_QUIESCE_START == datum->event) {
1602 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1603 "App) Quiesce_start: Waiting for release..."));
1604
1605 while( !app_notif_processed ) {
1606 opal_progress();
1607 OPAL_CR_TEST_CHECKPOINT_READY();
1608 }
1609
1610 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1611 "App) Quiesce_start: Released"));
1612 }
1613 /*
1614 * No waiting for Quiesce end (barrier occurs in protocol)
1615 */
1616 else if( ORTE_SNAPC_OP_QUIESCE_END == datum->event) {
1617 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1618 "App) Quiesce_end: Waiting for release..."));
1619
1620 OPAL_OUTPUT_VERBOSE((5, mca_snapc_full_component.super.output_handle,
1621 "App) Quiesce_end: Released"));
1622 }
1623
1624
1625 cleanup:
1626 if (NULL != buffer) {
1627 OBJ_RELEASE(buffer);
1628 buffer = NULL;
1629 }
1630 if (NULL != rb) {
1631 OBJ_RELEASE(rb);
1632 rb = NULL;
1633 }
1634
1635 if( NULL != seq_str ) {
1636 free(seq_str);
1637 seq_str = NULL;
1638 }
1639
1640 if( NULL != tmp_str ) {
1641 free(tmp_str);
1642 tmp_str = NULL;
1643 }
1644
1645 return exit_status;
1646 }
1647