1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3 * Copyright (c) 2004-2011 The Trustees of Indiana University.
4 * All rights reserved.
5 * Copyright (c) 2010-2013 The University of Tennessee and The University
6 * of Tennessee Research Foundation. All rights
7 * reserved.
8 * Copyright (c) 2010-2012 Oracle and/or its affiliates. All rights reserved.
9 * Copyright (c) 2012-2015 Los Alamos National Security, LLC. All rights
10 * reserved.
11 * Copyright (c) 2015 Intel, Inc. All rights reserved.
12 * $COPYRIGHT$
13 *
14 * Additional copyrights may follow
15 *
16 * $HEADER$
17 */
18
19 /*
20 *
21 */
22 #include "ompi_config.h"
23
24 #include <sys/types.h>
25 #ifdef HAVE_UNISTD_H
26 #include <unistd.h>
27 #endif /* HAVE_UNIST_H */
28
29 #include "opal/dss/dss.h"
30 #include "opal/runtime/opal_cr.h"
31 #include "opal/mca/event/event.h"
32 #include "opal/util/output.h"
33
34 #include "opal/util/opal_environ.h"
35 #include "ompi/mca/mca.h"
36 #include "opal/mca/pmix/pmix.h"
37
38 #include "ompi/request/request.h"
39 #include "ompi/mca/rte/rte.h"
40 #include "ompi/mca/pml/pml.h"
41 #include "ompi/mca/pml/base/base.h"
42 #include "ompi/mca/pml/base/pml_base_request.h"
43 #include "ompi/mca/crcp/crcp.h"
44 #include "ompi/mca/crcp/base/base.h"
45
46 #include "opal/class/opal_free_list.h"
47 #include "ompi/runtime/ompi_cr.h"
48 #include "orte/runtime/orte_wait.h"
49
50 #include "crcp_bkmrk.h"
51 #include "crcp_bkmrk_pml.h"
52
53 /************************************
54 * Locally Global vars
55 ************************************/
56 #define PROBE_ANY_SIZE ((size_t) 0)
57 #define PROBE_ANY_COUNT ((size_t) 0)
58
59 #define PERSIST_MARKER ((int) -1)
60
61 #define RECV_MATCH_RESP_DONE 0
62 #define RECV_MATCH_RESP_MORE 1
63 #define RECV_MATCH_RESP_ERROR 2
64
65 #define INVALID_INT -123456789
66
67 #define FIND_MSG_TRUE 0
68 #define FIND_MSG_FALSE 1
69 #define FIND_MSG_UNKNOWN 2
70
71 /* Pointers to the 'real' PML */
72 static mca_pml_base_component_t *wrapped_pml_component = NULL;
73 static mca_pml_base_module_t *wrapped_pml_module = NULL;
74
75 /* A unique ID for each message signature in the system */
76 static uint64_t message_seq_num = 1;
77 static uint64_t content_ref_seq_num = 1;
78
79 /* The current message being worked on */
80 static uint64_t current_msg_id = 0;
81 static ompi_crcp_bkmrk_pml_message_type_t current_msg_type = 0;
82
83 /* If we need to stall the C/R coordination until the current
84 * operation is complete */
85 static bool stall_for_completion;
86
87 /*
88 * State of the ft_event
89 */
90 static int ft_event_state = OPAL_CRS_RUNNING;
91
92 /*
93 * List of known peers
94 */
95 opal_list_t ompi_crcp_bkmrk_pml_peer_refs;
96
97 /*
98 * MPI_ANY_SOURCE recv lists
99 */
100 opal_list_t unknown_recv_from_list;
101 opal_list_t unknown_persist_recv_list;
102
103 /*
104 * List of pending drain acks
105 */
106 opal_list_t drained_msg_ack_list;
107
108 /*
109 * Free lists
110 */
111 opal_free_list_t coord_state_free_list;
112 opal_free_list_t content_ref_free_list;
113 opal_free_list_t peer_ref_free_list;
114 opal_free_list_t traffic_msg_ref_free_list;
115 opal_free_list_t drain_msg_ref_free_list;
116 opal_free_list_t drain_ack_msg_ref_free_list;
117
118 /*
119 * Quiescence requests to wait on
120 */
121 ompi_request_t ** quiesce_requests = NULL;
122 ompi_status_public_t ** quiesce_statuses = NULL;
123 int quiesce_request_count = 0;
124
125 /************************************
126 * Local Funcation Decls.
127 ************************************/
128
129 static int ompi_crcp_bkmrk_pml_start_isend_init(ompi_request_t **request);
130 static int ompi_crcp_bkmrk_pml_start_irecv_init(ompi_request_t **request);
131 static int ompi_crcp_bkmrk_pml_start_drain_irecv_init(ompi_request_t **request, bool *found_drain);
132
133 static int ompi_crcp_bkmrk_request_complete_isend_init(struct ompi_request_t *request,
134 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
135 int src, int tag, int tmp_ddt_size);
136 static int ompi_crcp_bkmrk_request_complete_isend(struct ompi_request_t *request,
137 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
138 int src, int tag, int tmp_ddt_size);
139 static int ompi_crcp_bkmrk_request_complete_irecv_init(struct ompi_request_t *request,
140 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
141 int src, int tag, int tmp_ddt_size);
142 static int ompi_crcp_bkmrk_request_complete_irecv(struct ompi_request_t *request,
143 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
144 int src, int tag, int tmp_ddt_size);
145
146 /*
147 * Traffic Message: Append
148 */
149 static int traffic_message_append(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
150 opal_list_t * append_list,
151 ompi_crcp_bkmrk_pml_message_type_t msg_type,
152 size_t count,
153 ompi_datatype_t *datatype,
154 size_t ddt_size,
155 int tag,
156 int dest,
157 struct ompi_communicator_t* comm,
158 ompi_crcp_bkmrk_pml_traffic_message_ref_t **msg_ref);
159
160 /*
161 * Traffic Message: Start a persistent send/recv
162 */
163 static int traffic_message_start(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
164 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
165 ompi_request_t **request,
166 opal_list_t * peer_list,
167 ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref);
168
169 /*
170 * Traffic Message: Move a message from one list to another
171 * - useful when moving messages from the unknown lists
172 */
173 static int traffic_message_move(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
174 ompi_crcp_bkmrk_pml_message_type_t msg_type,
175 ompi_crcp_bkmrk_pml_peer_ref_t *from_peer_ref,
176 opal_list_t * from_list,
177 ompi_crcp_bkmrk_pml_peer_ref_t *to_peer_ref,
178 opal_list_t * to_list,
179 ompi_crcp_bkmrk_pml_traffic_message_ref_t **new_msg_ref,
180 bool keep_active, /* If you have to create a new context, should it be initialized to active? */
181 bool remove); /* Remove the original? - false = copy() */
182
183 /*
184 * Traffic Message: Strip off the first matching request
185 */
186 static int traffic_message_grab_content(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
187 ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref,
188 bool remove,
189 bool already_drained);
190
191 /*
192 * Traffic Message: Find a persistent message, and mark it approprately
193 */
194 static int traffic_message_find_mark_persistent(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
195 ompi_request_t **request,
196 bool cur_active,
197 bool set_is_active,
198 ompi_crcp_bkmrk_pml_message_content_ref_t **content_ref);
199
200 /*
201 * Traffic Message: Find a message that matches the given signature
202 */
203 static int traffic_message_find(opal_list_t * search_list,
204 size_t count, int tag, int peer, uint32_t comm_id,
205 size_t ddt_size,
206 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** found_msg_ref,
207 int active);
208
209 /*
210 * Traffic Message: Determine if we have received a message matching this signature.
211 * Return a reference to the message on all matching lists.
212 */
213 static int traffic_message_find_recv(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
214 int rank, uint32_t comm_id, int tag,
215 size_t count, size_t datatype_size,
216 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_recv_msg_ref,
217 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_irecv_msg_ref,
218 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_precv_msg_ref,
219 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_unknown_recv_msg_ref,
220 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_unknown_precv_msg_ref);
221
222 /*
223 * Traffic Message: For all of the 'active' recvs, create a drain message
224 */
225 static int traffic_message_create_drain_message(bool post_drain,
226 int max_post,
227 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
228 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_msg_ref,
229 int *num_posted);
230
231 /*
232 * Drain Message: Append
233 */
234 static int drain_message_append(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
235 ompi_crcp_bkmrk_pml_message_type_t msg_type,
236 size_t count, size_t ddt_size,
237 int tag,int dest,
238 struct ompi_communicator_t* comm,
239 ompi_crcp_bkmrk_pml_drain_message_ref_t **msg_ref);
240
241 /*
242 * Drain Message: Remove
243 */
244 static int drain_message_remove(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
245 ompi_crcp_bkmrk_pml_drain_message_ref_t *msg_ref,
246 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref);
247
248 /*
249 * Drain Message: Check if this receive has been drained
250 */
251 static int drain_message_check_recv(void **buf, size_t count,
252 ompi_datatype_t *datatype,
253 int *src, int *tag,
254 struct ompi_communicator_t* comm,
255 struct ompi_request_t **request,
256 ompi_status_public_t** status,
257 bool *found_drain);
258
259 /*
260 * Drain Message: Find a message matching the given signature on this peer list
261 */
262 static int drain_message_find(opal_list_t * search_list,
263 size_t count, int tag, int peer,
264 uint32_t comm_id, size_t ddt_size,
265 ompi_crcp_bkmrk_pml_drain_message_ref_t ** found_msg_ref,
266 ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref);
267
268 /*
269 * Drain Message: Find a message matching the given signature on any list from any peer
270 */
271 static int drain_message_find_any(size_t count, int tag, int peer,
272 struct ompi_communicator_t* comm, size_t ddt_size,
273 ompi_crcp_bkmrk_pml_drain_message_ref_t ** found_msg_ref,
274 ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref,
275 ompi_crcp_bkmrk_pml_peer_ref_t **peer_ref);
276
277 /*
278 * Drain Message: Grab a content reference, do not remove
279 */
280 static int drain_message_grab_content(ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref,
281 ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref);
282
283 /*
284 * Drain Message: Copy this drain message to the signature provided, remove drain message
285 */
286 static int drain_message_copy_remove(ompi_crcp_bkmrk_pml_drain_message_ref_t *msg_ref,
287 ompi_crcp_bkmrk_pml_message_content_ref_t * content_ref,
288 int *src, int *tag,
289 struct ompi_request_t **request,
290 ompi_status_public_t **status,
291 ompi_datatype_t *datatype, int count, void **buf,
292 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref);
293
294 /*
295 * Drain Message: Copy this persistent drain message to the signature provided, remove drain message
296 */
297 static int drain_message_copy_remove_persistent(ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref,
298 ompi_crcp_bkmrk_pml_message_content_ref_t *drain_content_ref,
299 ompi_crcp_bkmrk_pml_traffic_message_ref_t *traffic_msg_ref,
300 ompi_request_t *request,
301 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref);
302
303 /*
304 * Peer List: Find the peer reference matching the ORTE process name
305 */
306 static ompi_crcp_bkmrk_pml_peer_ref_t* find_peer(ompi_process_name_t proc);
307
308 /*
309 * Peer List: Find the peer reference matching the index into the communicator
310 */
311 static int find_peer_in_comm(struct ompi_communicator_t* comm, int proc_idx,
312 ompi_crcp_bkmrk_pml_peer_ref_t **peer_ref);
313
314 /*
315 * Coordinate Peers
316 * - Quiet channels
317 */
318 static int ft_event_coordinate_peers(void);
319
320 /*
321 * Finalize the coordination of peers.
322 * - Mostly cleanup.
323 */
324 static int ft_event_finalize_exchange(void);
325
326 /*
327 * Exchange the bookmarks
328 * - Staggered All-to-All
329 * LAM/MPI used a staggered all-to-all algoritm for bookmark exachange
330 * http://www.lam-mpi.org/papers/lacsi2003/
331 */
332 static int ft_event_exchange_bookmarks(void);
333
334 /*
335 * Send Bookmarks to peer
336 */
337 static int send_bookmarks(int peer_idx);
338
339 /*
340 * Recv Bookmarks from peer
341 */
342 static int recv_bookmarks(int peer_idx);
343
344 /*
345 * Callback to receive the bookmarks from a peer
346 */
347 static void recv_bookmarks_cbfunc(int status,
348 ompi_process_name_t* sender,
349 opal_buffer_t *buffer,
350 ompi_rml_tag_t tag,
351 void* cbdata);
352 static int total_recv_bookmarks = 0;
353
354 /*
355 * Now that we have all the bookmarks, check them to see if we need to
356 * drain any messages.
357 */
358 static int ft_event_check_bookmarks(void);
359
360 /*
361 * Send message details to peer
362 * - matched with recv_msg_details()
363 */
364 static int send_msg_details(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
365 int total_sent, int total_matched);
366
367 /*
368 * Send a single message reference to a peer.
369 * found_match = true if peer found a message to drain.
370 */
371 static int do_send_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
372 ompi_crcp_bkmrk_pml_traffic_message_ref_t*msg_ref,
373 int *num_matches,
374 int *total_found,
375 bool *finished);
376 /*
377 * Recv message details from peer
378 * - matched with send_msg_details()
379 */
380 static int recv_msg_details(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
381 int total_recv, int total_matched);
382
383 /*
384 * Receive a single message reference from a peer.
385 */
386 static int do_recv_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
387 int *rank, uint32_t *comm_id, int *tag,
388 size_t *count, size_t *datatype_size,
389 int *p_num_sent);
390
391 /*
392 * Check the message reference to determine if:
393 * - We have received this message already, or
394 * - We need to post this message
395 */
396 static int do_recv_msg_detail_check_drain(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
397 int rank, uint32_t comm_id, int tag,
398 size_t count, size_t datatype_size,
399 int p_num_sent,
400 int *num_resolved);
401
402 /*
403 * Respond to peer regarding a received message detail
404 */
405 static int do_recv_msg_detail_resp(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
406 int resp,
407 int num_resolv,
408 int total_found);
409
410 /*
411 * Post the Drain Message Acks
412 * - These are sent once the receiver has finished receiving
413 * all of the messages it needed to drain off the wire.
414 */
415 static int ft_event_post_drain_acks(void);
416
417 /*
418 * Callback to service drain message acks.
419 */
420 static void drain_message_ack_cbfunc(int status,
421 ompi_process_name_t* sender,
422 opal_buffer_t *buffer,
423 ompi_rml_tag_t tag,
424 void* cbdata);
425
426 /*
427 * Post the Drain Messages
428 * - These are irecvs to be completed in any order.
429 */
430 static int ft_event_post_drained(void);
431
432 static int ft_event_post_drain_message(ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref,
433 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref);
434
435 /*
436 * Wait for all drained messages and acks to complete.
437 * - Once this this finished then all channels associated
438 * with this process have been drained.
439 */
440 static int ft_event_wait_quiesce(void);
441
442 /*
443 * Wait for all the posted drain messages to finish
444 */
445 static int wait_quiesce_drained(void);
446
447 /*
448 * An optimized local version of waitall.
449 * - Remove some unnecessary logic
450 * - Remove logic to 'free' the request
451 */
452 static int coord_request_wait_all( size_t count,
453 ompi_request_t ** requests,
454 ompi_status_public_t ** statuses);
455
456 /*
457 * An optimized local version of wait.
458 * - Remove some unnecessary logic
459 * - Remove logic to 'free' the request
460 * - Allow it to return if we need to stop waiting
461 */
462 static int coord_request_wait( ompi_request_t * request,
463 ompi_status_public_t * status);
464
465 /*
466 * Wait for all the drain ACKs to be received
467 */
468 static int wait_quiesce_drain_ack(void);
469
470 /************************************
471 * A few timing structures
472 *
473 * CRCP Timing Information
474 * -----------------------
475 * Pi Pj | Variable
476 * ---- -----+----------
477 * exchange_bookmark() | CRCP_TIMER_CKPT_EX_B
478 * -------------> | CRCP_TIMER_CKPT_EX_PEER_S
479 * <------------- | CRCP_TIMER_CKPT_EX_PEER_R
480 * -> wait_for_bk_done() |
481 * -- -- | CRCP_TIMER_CKPT_EX_WAIT
482 * check_bookmarks() |
483 * -- -- | CRCP_TIMER_CKPT_CHECK_B
484 * -> exchange_details (*) |
485 * -------------> | CRCP_TIMER_CKPT_CHECK_PEER_S
486 * <------------- | CRCP_TIMER_CKPT_CHECK_PEER_R
487 * post_drain[ack]() |
488 * -- -- | CRCP_TIMER_CKPT_POST_DRAIN
489 * wait_quiescence() |
490 * -- -- | CRCP_TIMER_CKPT_WAIT_QUI
491 * Finish checkpoint | -- Total Pre-Checkpoint
492 * -- -- | CRCP_TIMER_TOTAL_CKPT
493 * finalize_exchange() | -- Total Continue / Restart
494 * -- -- | CRCP_TIMER_TOTAL_CONT / _RST
495 *-----------------------------+
496 * (*) If needed.
497 *
498 * timing_enabled:
499 * < 0 : Off
500 * 1 : Summary only
501 * 2 : Per Peer messages + Barrier
502 * 3 : Messages from all procs
503 *
504 ************************************/
505 #define CRCP_TIMER_TOTAL_CKPT 0
506 #define CRCP_TIMER_CKPT_EX_B 1
507 #define CRCP_TIMER_CKPT_EX_PEER_S 2
508 #define CRCP_TIMER_CKPT_EX_PEER_R 3
509 #define CRCP_TIMER_CKPT_EX_WAIT 4
510 #define CRCP_TIMER_CKPT_CHECK_B 5
511 #define CRCP_TIMER_CKPT_CHECK_PEER_S 6
512 #define CRCP_TIMER_CKPT_CHECK_PEER_R 7
513 #define CRCP_TIMER_CKPT_POST_DRAIN 8
514 #define CRCP_TIMER_CKPT_WAIT_QUI 9
515 #define CRCP_TIMER_TOTAL_CONT 10
516 #define CRCP_TIMER_TOTAL_RST 11
517 #define CRCP_TIMER_MAX 12
518
519 static double get_time(void);
520 static void start_time(int idx);
521 static void end_time(int idx);
522 static void display_indv_timer(int idx, int proc, int msgs);
523 static void display_indv_timer_core(int idx, int proc, int msgs, bool direct);
524 static void display_all_timers(int state);
525 static void clear_timers(void);
526
527 double timer_start[CRCP_TIMER_MAX];
528 double timer_end[CRCP_TIMER_MAX];
529 char * timer_label[CRCP_TIMER_MAX];
530
531 #define START_TIMER(idx) \
532 { \
533 if(OPAL_UNLIKELY(timing_enabled > 0)) { \
534 start_time(idx); \
535 } \
536 }
537
538 #define END_TIMER(idx) \
539 { \
540 if(OPAL_UNLIKELY(timing_enabled > 0)) { \
541 end_time(idx); \
542 } \
543 }
544
545 #define DISPLAY_INDV_TIMER(idx, proc, msg) \
546 { \
547 if(OPAL_UNLIKELY(timing_enabled > 0)) { \
548 display_indv_timer(idx, proc, msg); \
549 } \
550 }
551
552 #define DISPLAY_ALL_TIMERS(var) \
553 { \
554 if(OPAL_UNLIKELY(timing_enabled > 0)) { \
555 display_all_timers(var); \
556 } \
557 }
558
559 /************************************
560 * Additional Debuging dumps
561 ************************************/
562 #if OPAL_ENABLE_DEBUG
563 static void traffic_message_dump_peer(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref, char * msg, bool root_only);
564 static void traffic_message_dump_msg_list(opal_list_t *msg_list, bool is_drain);
565 static void traffic_message_dump_msg_indv(ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref, char * msg, bool vshort);
566 static void traffic_message_dump_msg_content_indv(ompi_crcp_bkmrk_pml_message_content_ref_t * content_ref);
567
568 static void traffic_message_dump_drain_msg_indv(ompi_crcp_bkmrk_pml_drain_message_ref_t * msg_ref, char * msg, bool vshort);
569
570 #define TRAFFIC_MSG_DUMP_PEER(lv, a) { \
571 if( lv <= mca_crcp_bkmrk_component.super.verbose ) { \
572 traffic_message_dump_peer a; \
573 } \
574 }
575 #define TRAFFIC_MSG_DUMP_MSG_LIST(lv, a) { \
576 if( lv <= mca_crcp_bkmrk_component.super.verbose ) { \
577 traffic_message_dump_msg_list a; \
578 } \
579 }
580 #define TRAFFIC_MSG_DUMP_MSG_INDV(lv, a) { \
581 if( lv <= mca_crcp_bkmrk_component.super.verbose ) { \
582 traffic_message_dump_msg_indv a; \
583 } \
584 }
585 #define TRAFFIC_MSG_DUMP_MSG_CONTENT_INDV(lv, a) { \
586 if( lv <= mca_crcp_bkmrk_component.super.verbose ) { \
587 traffic_message_dump_msg_content_indv a; \
588 } \
589 }
590 #define TRAFFIC_MSG_DUMP_DRAIN_MSG_INDV(lv, a) { \
591 if( lv <= mca_crcp_bkmrk_component.super.verbose ) { \
592 traffic_message_dump_drain_msg_indv a; \
593 } \
594 }
595 #else
596 #define TRAFFIC_MSG_DUMP_PEER(lv, a) ;
597 #define TRAFFIC_MSG_DUMP_MSG_LIST(lv, a) ;
598 #define TRAFFIC_MSG_DUMP_MSG_INDV(lv, a) ;
599 #define TRAFFIC_MSG_DUMP_MSG_CONTENT_INDV(lv, a) ;
600 #define TRAFFIC_MSG_DUMP_DRAIN_MSG_INDV(lv, a) ;
601 #endif
602
603 #define ERROR_SHOULD_NEVER_HAPPEN(msg) { \
604 opal_output(0, msg \
605 " ---------- This should never happen ---------- (%s:%d)", \
606 __FILE__, __LINE__); \
607 }
608
609 #define ERROR_SHOULD_NEVER_HAPPEN_ARG(msg, arg) { \
610 opal_output(0, msg \
611 " ---------- This should never happen ---------- (%s:%d)", \
612 arg, __FILE__, __LINE__); \
613 }
614
615 /************************************
616 * Declare/Define Object Structures
617 ************************************/
618 /*
619 * Free List Maintenance
620 */
621 #define HOKE_PEER_REF_ALLOC(peer_ref) \
622 do { \
623 peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t *) \
624 opal_free_list_wait (&peer_ref_free_list); \
625 } while(0)
626
627 #define HOKE_PEER_REF_RETURN(peer_ref) \
628 do { \
629 opal_free_list_return (&peer_ref_free_list, \
630 (opal_free_list_item_t*)peer_ref); \
631 } while(0)
632
633
634 #define HOKE_CONTENT_REF_ALLOC(content_ref) \
635 do { \
636 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*) \
637 opal_free_list_wait (&content_ref_free_list); \
638 content_ref->msg_id = content_ref_seq_num; \
639 content_ref_seq_num++; \
640 } while(0)
641
642 #define HOKE_CONTENT_REF_RETURN(content_ref) \
643 do { \
644 opal_free_list_return (&content_ref_free_list, \
645 (opal_free_list_item_t*)content_ref); \
646 } while(0)
647
648
649 #define HOKE_TRAFFIC_MSG_REF_ALLOC(msg_ref) \
650 do { \
651 msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*) \
652 opal_free_list_wait (&traffic_msg_ref_free_list); \
653 } while(0)
654
655 #define HOKE_TRAFFIC_MSG_REF_RETURN(msg_ref) \
656 do { \
657 opal_free_list_return (&traffic_msg_ref_free_list, \
658 (opal_free_list_item_t*)msg_ref); \
659 } while(0)
660
661
662 #define HOKE_DRAIN_MSG_REF_ALLOC(msg_ref) \
663 do { \
664 msg_ref = (ompi_crcp_bkmrk_pml_drain_message_ref_t *) \
665 opal_free_list_wait (&drain_msg_ref_free_list); \
666 } while(0)
667
668 #define HOKE_DRAIN_MSG_REF_RETURN(msg_ref) \
669 do { \
670 opal_free_list_return (&drain_msg_ref_free_list, \
671 (opal_free_list_item_t*)msg_ref); \
672 } while(0)
673
674
675 #define HOKE_DRAIN_ACK_MSG_REF_ALLOC(msg_ref) \
676 do { \
677 msg_ref = (ompi_crcp_bkmrk_pml_drain_message_ack_ref_t *) \
678 opal_free_list_wait (&drain_ack_msg_ref_free_list); \
679 } while(0)
680
681 #define HOKE_DRAIN_ACK_MSG_REF_RETURN(msg_ref) \
682 do { \
683 opal_free_list_return (&drain_ack_msg_ref_free_list, \
684 (opal_free_list_item_t*)msg_ref); \
685 } while(0)
686
687
688 /*
689 * Peer reference
690 */
691 OBJ_CLASS_INSTANCE(ompi_crcp_bkmrk_pml_peer_ref_t,
692 opal_list_item_t,
693 ompi_crcp_bkmrk_pml_peer_ref_construct,
694 ompi_crcp_bkmrk_pml_peer_ref_destruct);
695
ompi_crcp_bkmrk_pml_peer_ref_construct(ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref)696 void ompi_crcp_bkmrk_pml_peer_ref_construct(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref) {
697 peer_ref->proc_name.jobid = ORTE_JOBID_INVALID;
698 peer_ref->proc_name.vpid = ORTE_VPID_INVALID;
699
700 OBJ_CONSTRUCT(&peer_ref->send_list, opal_list_t);
701 OBJ_CONSTRUCT(&peer_ref->isend_list, opal_list_t);
702 OBJ_CONSTRUCT(&peer_ref->send_init_list, opal_list_t);
703
704 OBJ_CONSTRUCT(&peer_ref->recv_list, opal_list_t);
705 OBJ_CONSTRUCT(&peer_ref->irecv_list, opal_list_t);
706 OBJ_CONSTRUCT(&peer_ref->recv_init_list, opal_list_t);
707
708 OBJ_CONSTRUCT(&peer_ref->drained_list, opal_list_t);
709
710 peer_ref->total_msgs_sent = 0;
711 peer_ref->matched_msgs_sent = 0;
712
713 peer_ref->total_msgs_recvd = 0;
714 peer_ref->matched_msgs_recvd = 0;
715
716 peer_ref->total_drained_msgs = 0;
717
718 peer_ref->ack_required = false;
719 }
720
ompi_crcp_bkmrk_pml_peer_ref_destruct(ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref)721 void ompi_crcp_bkmrk_pml_peer_ref_destruct( ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref) {
722 opal_list_item_t* item = NULL;
723
724 peer_ref->proc_name.jobid = ORTE_JOBID_INVALID;
725 peer_ref->proc_name.vpid = ORTE_VPID_INVALID;
726
727 while( NULL != (item = opal_list_remove_first(&peer_ref->send_list)) ) {
728 HOKE_TRAFFIC_MSG_REF_RETURN(item);
729 }
730 OBJ_DESTRUCT(&peer_ref->send_list);
731 while( NULL != (item = opal_list_remove_first(&peer_ref->isend_list)) ) {
732 HOKE_TRAFFIC_MSG_REF_RETURN(item);
733 }
734 OBJ_DESTRUCT(&peer_ref->isend_list);
735 while( NULL != (item = opal_list_remove_first(&peer_ref->send_init_list)) ) {
736 HOKE_TRAFFIC_MSG_REF_RETURN(item);
737 }
738 OBJ_DESTRUCT(&peer_ref->send_init_list);
739
740 while( NULL != (item = opal_list_remove_first(&peer_ref->recv_list)) ) {
741 HOKE_TRAFFIC_MSG_REF_RETURN(item);
742 }
743 OBJ_DESTRUCT(&peer_ref->recv_list);
744 while( NULL != (item = opal_list_remove_first(&peer_ref->irecv_list)) ) {
745 HOKE_TRAFFIC_MSG_REF_RETURN(item);
746 }
747 OBJ_DESTRUCT(&peer_ref->irecv_list);
748 while( NULL != (item = opal_list_remove_first(&peer_ref->recv_init_list)) ) {
749 HOKE_TRAFFIC_MSG_REF_RETURN(item);
750 }
751 OBJ_DESTRUCT(&peer_ref->recv_init_list);
752
753 while( NULL != (item = opal_list_remove_first(&peer_ref->drained_list)) ) {
754 HOKE_DRAIN_MSG_REF_RETURN(item);
755 }
756 OBJ_DESTRUCT(&peer_ref->drained_list);
757
758 peer_ref->total_msgs_sent = 0;
759 peer_ref->matched_msgs_sent = 0;
760
761 peer_ref->total_msgs_recvd = 0;
762 peer_ref->matched_msgs_recvd = 0;
763
764 peer_ref->total_drained_msgs = 0;
765
766 peer_ref->ack_required = false;
767 }
768
769 /*
770 * Message Content Structure
771 */
772 OBJ_CLASS_INSTANCE(ompi_crcp_bkmrk_pml_message_content_ref_t,
773 opal_list_item_t,
774 ompi_crcp_bkmrk_pml_message_content_ref_construct,
775 ompi_crcp_bkmrk_pml_message_content_ref_destruct);
776
ompi_crcp_bkmrk_pml_message_content_ref_construct(ompi_crcp_bkmrk_pml_message_content_ref_t * content_ref)777 void ompi_crcp_bkmrk_pml_message_content_ref_construct(ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref)
778 {
779 content_ref->buffer = NULL;
780 content_ref->request = NULL;
781 content_ref->active = false;
782
783 content_ref->done = false;
784 content_ref->active = false;
785 content_ref->already_posted = false;
786 content_ref->already_drained = false;
787
788 content_ref->msg_id = 0;
789 }
790
ompi_crcp_bkmrk_pml_message_content_ref_destruct(ompi_crcp_bkmrk_pml_message_content_ref_t * content_ref)791 void ompi_crcp_bkmrk_pml_message_content_ref_destruct( ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref)
792 {
793 if( NULL != content_ref->buffer ) {
794 free(content_ref->buffer);
795 }
796 content_ref->buffer = NULL;
797
798 if( NULL != content_ref->request ) {
799 OBJ_RELEASE(content_ref->request);
800 }
801 content_ref->request = NULL;
802
803 content_ref->active = false;
804
805 content_ref->done = false;
806 content_ref->active = false;
807 content_ref->already_posted = false;
808 content_ref->already_drained = false;
809
810 content_ref->msg_id = 0;
811 }
812
813 /*
814 * Traffic Message
815 */
816 OBJ_CLASS_INSTANCE(ompi_crcp_bkmrk_pml_traffic_message_ref_t,
817 opal_list_item_t,
818 ompi_crcp_bkmrk_pml_traffic_message_ref_construct,
819 ompi_crcp_bkmrk_pml_traffic_message_ref_destruct);
820
ompi_crcp_bkmrk_pml_traffic_message_ref_construct(ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref)821 void ompi_crcp_bkmrk_pml_traffic_message_ref_construct(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref) {
822 msg_ref->msg_id = 0;
823 msg_ref->msg_type = COORD_MSG_TYPE_UNKNOWN;
824
825 msg_ref->count = 0;
826 msg_ref->ddt_size = 0;
827 msg_ref->tag = 0;
828 msg_ref->rank = 0;
829 msg_ref->comm = NULL;
830
831 OBJ_CONSTRUCT(&msg_ref->msg_contents, opal_list_t);
832
833 msg_ref->proc_name.jobid = ORTE_JOBID_INVALID;
834 msg_ref->proc_name.vpid = ORTE_VPID_INVALID;
835
836 msg_ref->matched = INVALID_INT;
837 msg_ref->done = INVALID_INT;
838 msg_ref->active = INVALID_INT;
839 msg_ref->posted = INVALID_INT;
840 msg_ref->active_drain = INVALID_INT;
841 }
842
ompi_crcp_bkmrk_pml_traffic_message_ref_destruct(ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref)843 void ompi_crcp_bkmrk_pml_traffic_message_ref_destruct( ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref) {
844 opal_list_item_t* item = NULL;
845
846 msg_ref->msg_id = 0;
847 msg_ref->msg_type = COORD_MSG_TYPE_UNKNOWN;
848
849 msg_ref->count = 0;
850 msg_ref->ddt_size = 0;
851 msg_ref->tag = 0;
852 msg_ref->rank = 0;
853 msg_ref->comm = NULL;
854
855 while( NULL != (item = opal_list_remove_first(&(msg_ref->msg_contents)) ) ) {
856 HOKE_CONTENT_REF_RETURN(item);
857 }
858 OBJ_DESTRUCT(&(msg_ref->msg_contents));
859
860 msg_ref->proc_name.jobid = ORTE_JOBID_INVALID;
861 msg_ref->proc_name.vpid = ORTE_VPID_INVALID;
862
863 msg_ref->matched = INVALID_INT;
864 msg_ref->done = INVALID_INT;
865 msg_ref->active = INVALID_INT;
866 msg_ref->posted = INVALID_INT;
867 msg_ref->active_drain = INVALID_INT;
868 }
869
870 /*
871 * Drain Message
872 */
873 OBJ_CLASS_INSTANCE(ompi_crcp_bkmrk_pml_drain_message_ref_t,
874 opal_list_item_t,
875 ompi_crcp_bkmrk_pml_drain_message_ref_construct,
876 ompi_crcp_bkmrk_pml_drain_message_ref_destruct);
877
ompi_crcp_bkmrk_pml_drain_message_ref_construct(ompi_crcp_bkmrk_pml_drain_message_ref_t * msg_ref)878 void ompi_crcp_bkmrk_pml_drain_message_ref_construct(ompi_crcp_bkmrk_pml_drain_message_ref_t *msg_ref) {
879 msg_ref->msg_id = 0;
880 msg_ref->msg_type = COORD_MSG_TYPE_UNKNOWN;
881
882 msg_ref->count = 0;
883
884 msg_ref->datatype = NULL;
885 msg_ref->ddt_size = 0;
886
887 msg_ref->tag = 0;
888 msg_ref->rank = 0;
889 msg_ref->comm = NULL;
890
891 OBJ_CONSTRUCT(&msg_ref->msg_contents, opal_list_t);
892
893 msg_ref->proc_name.jobid = ORTE_JOBID_INVALID;
894 msg_ref->proc_name.vpid = ORTE_VPID_INVALID;
895
896 msg_ref->done = INVALID_INT;
897 msg_ref->active = INVALID_INT;
898 msg_ref->already_posted = INVALID_INT;
899 }
900
ompi_crcp_bkmrk_pml_drain_message_ref_destruct(ompi_crcp_bkmrk_pml_drain_message_ref_t * msg_ref)901 void ompi_crcp_bkmrk_pml_drain_message_ref_destruct( ompi_crcp_bkmrk_pml_drain_message_ref_t *msg_ref) {
902 opal_list_item_t* item = NULL;
903
904 msg_ref->msg_id = 0;
905 msg_ref->msg_type = COORD_MSG_TYPE_UNKNOWN;
906
907 msg_ref->count = 0;
908
909 if( NULL != msg_ref->datatype ) {
910 OBJ_RELEASE(msg_ref->datatype);
911 msg_ref->datatype = NULL;
912 }
913 msg_ref->ddt_size = 0;
914
915 msg_ref->tag = 0;
916 msg_ref->rank = 0;
917 msg_ref->comm = NULL;
918
919 while( NULL != (item = opal_list_remove_first(&(msg_ref->msg_contents)) ) ) {
920 HOKE_CONTENT_REF_RETURN(item);
921 }
922 OBJ_DESTRUCT(&(msg_ref->msg_contents));
923
924 msg_ref->proc_name.jobid = ORTE_JOBID_INVALID;
925 msg_ref->proc_name.vpid = ORTE_VPID_INVALID;
926
927 msg_ref->done = INVALID_INT;
928 msg_ref->active = INVALID_INT;
929 msg_ref->already_posted = INVALID_INT;
930 }
931
932 /*
933 * Drain Ack Message
934 */
935 OBJ_CLASS_INSTANCE(ompi_crcp_bkmrk_pml_drain_message_ack_ref_t,
936 opal_list_item_t,
937 ompi_crcp_bkmrk_pml_drain_message_ack_ref_construct,
938 ompi_crcp_bkmrk_pml_drain_message_ack_ref_destruct);
939
ompi_crcp_bkmrk_pml_drain_message_ack_ref_construct(ompi_crcp_bkmrk_pml_drain_message_ack_ref_t * msg_ack_ref)940 void ompi_crcp_bkmrk_pml_drain_message_ack_ref_construct(ompi_crcp_bkmrk_pml_drain_message_ack_ref_t *msg_ack_ref) {
941 msg_ack_ref->complete = false;
942
943 msg_ack_ref->peer.jobid = ORTE_JOBID_INVALID;
944 msg_ack_ref->peer.vpid = ORTE_VPID_INVALID;
945 }
946
ompi_crcp_bkmrk_pml_drain_message_ack_ref_destruct(ompi_crcp_bkmrk_pml_drain_message_ack_ref_t * msg_ack_ref)947 void ompi_crcp_bkmrk_pml_drain_message_ack_ref_destruct( ompi_crcp_bkmrk_pml_drain_message_ack_ref_t *msg_ack_ref) {
948 msg_ack_ref->complete = false;
949
950 msg_ack_ref->peer.jobid = ORTE_JOBID_INVALID;
951 msg_ack_ref->peer.vpid = ORTE_VPID_INVALID;
952 }
953
954
955 /*
956 * PML state
957 */
958 OBJ_CLASS_INSTANCE(ompi_crcp_bkmrk_pml_state_t,
959 ompi_crcp_base_pml_state_t,
960 NULL,
961 NULL
962 );
963
964 /************************************
965 * Some Macro shortcuts
966 ************************************/
967 #define CRCP_COORD_STATE_ALLOC(state_ref) \
968 do { \
969 state_ref = (ompi_crcp_bkmrk_pml_state_t *) \
970 opal_free_list_wait (&coord_state_free_list); \
971 } while(0)
972
973 #define CRCP_COORD_STATE_RETURN(state_ref) \
974 do { \
975 opal_free_list_return (&coord_state_free_list, \
976 (opal_free_list_item_t *)state_ref); \
977 } while(0)
978
979 #define CREATE_COORD_STATE(coord_state, pml_state, v_peer_ref, v_msg_ref) \
980 { \
981 CRCP_COORD_STATE_ALLOC(coord_state); \
982 \
983 coord_state->prev_ptr = pml_state; \
984 coord_state->p_super.super = pml_state->super; \
985 coord_state->p_super.state = pml_state->state; \
986 coord_state->p_super.error_code = pml_state->error_code; \
987 coord_state->p_super.wrapped_pml_component = pml_state->wrapped_pml_component; \
988 coord_state->p_super.wrapped_pml_module = pml_state->wrapped_pml_module; \
989 \
990 coord_state->peer_ref = v_peer_ref; \
991 coord_state->msg_ref = v_msg_ref; \
992 }
993
994 #define EXTRACT_COORD_STATE(pml_state, v_coord_state, v_rtn_state, v_peer_ref, v_msg_ref) \
995 { \
996 v_coord_state = (ompi_crcp_bkmrk_pml_state_t*)pml_state; \
997 v_rtn_state = v_coord_state->prev_ptr; \
998 v_peer_ref = v_coord_state->peer_ref; \
999 v_msg_ref = v_coord_state->msg_ref; \
1000 }
1001
1002
1003 #define CREATE_NEW_MSG(msg_ref, v_type, v_count, v_ddt_size, v_tag, v_rank, v_comm, p_jobid, p_vpid) \
1004 { \
1005 HOKE_TRAFFIC_MSG_REF_ALLOC(msg_ref); \
1006 \
1007 msg_ref->msg_id = message_seq_num; \
1008 message_seq_num++; \
1009 \
1010 msg_ref->msg_type = v_type; \
1011 \
1012 msg_ref->count = v_count; \
1013 \
1014 msg_ref->ddt_size = v_ddt_size; \
1015 \
1016 msg_ref->tag = v_tag; \
1017 msg_ref->rank = v_rank; \
1018 msg_ref->comm = v_comm; \
1019 \
1020 msg_ref->proc_name.jobid = p_jobid; \
1021 msg_ref->proc_name.vpid = p_vpid; \
1022 \
1023 msg_ref->matched = 0; \
1024 msg_ref->done = 0; \
1025 msg_ref->active = 0; \
1026 msg_ref->posted = 0; \
1027 msg_ref->active_drain = 0; \
1028 }
1029
1030 #define CREATE_NEW_DRAIN_MSG(msg_ref, v_type, v_count, v_ddt_size, v_tag, v_rank, v_comm, p_jobid, p_vpid) \
1031 { \
1032 HOKE_DRAIN_MSG_REF_ALLOC(msg_ref); \
1033 \
1034 msg_ref->msg_id = message_seq_num; \
1035 message_seq_num++; \
1036 \
1037 msg_ref->msg_type = v_type; \
1038 \
1039 msg_ref->count = v_count; \
1040 \
1041 msg_ref->datatype = NULL; \
1042 msg_ref->ddt_size = ddt_size; \
1043 \
1044 msg_ref->tag = v_tag; \
1045 msg_ref->rank = v_rank; \
1046 msg_ref->comm = v_comm; \
1047 \
1048 msg_ref->proc_name.jobid = p_jobid; \
1049 msg_ref->proc_name.vpid = p_vpid; \
1050 }
1051
1052
1053 #define PACK_BUFFER(buffer, var, count, type, error_msg) \
1054 { \
1055 if (OMPI_SUCCESS != (ret = opal_dss.pack(buffer, &(var), count, type)) ) { \
1056 opal_output(mca_crcp_bkmrk_component.super.output_handle, \
1057 "%s (Return %d)", error_msg, ret); \
1058 exit_status = ret; \
1059 goto cleanup; \
1060 } \
1061 }
1062
1063 #define UNPACK_BUFFER(buffer, var, count, type, error_msg) \
1064 { \
1065 int32_t n = count; \
1066 if (OPAL_SUCCESS != (ret = opal_dss.unpack(buffer, &(var), &n, type)) ) { \
1067 opal_output(mca_crcp_bkmrk_component.super.output_handle, \
1068 "%s (Return %d)", error_msg, ret); \
1069 exit_status = ret; \
1070 goto cleanup; \
1071 } \
1072 }
1073
1074 /****************
1075 * PML Wrapper Init/Finalize
1076 ****************/
ompi_crcp_bkmrk_pml_init(void)1077 int ompi_crcp_bkmrk_pml_init(void) {
1078 message_seq_num = 1;
1079 current_msg_id = 0;
1080 current_msg_type = COORD_MSG_TYPE_UNKNOWN;
1081 stall_for_completion = false;
1082 ft_event_state = OPAL_CRS_RUNNING;
1083
1084 OBJ_CONSTRUCT(&ompi_crcp_bkmrk_pml_peer_refs, opal_list_t);
1085
1086 OBJ_CONSTRUCT(&unknown_recv_from_list, opal_list_t);
1087 OBJ_CONSTRUCT(&unknown_persist_recv_list, opal_list_t);
1088
1089 OBJ_CONSTRUCT(&drained_msg_ack_list, opal_list_t);
1090
1091 /* Create free lists for
1092 * - Coord State
1093 * - Peer Refs
1094 * - Traffic Message Refs
1095 * - Drain Message Refs
1096 * - Drain ACK Messsage Refs
1097 * - Message Contents?
1098 */
1099 OBJ_CONSTRUCT(&coord_state_free_list, opal_free_list_t);
1100 opal_free_list_init (&coord_state_free_list,
1101 sizeof(ompi_crcp_bkmrk_pml_state_t),
1102 opal_cache_line_size,
1103 OBJ_CLASS(ompi_crcp_bkmrk_pml_state_t),
1104 0,opal_cache_line_size,
1105 4, /* Initial number */
1106 -1, /* Max = Unlimited */
1107 4, /* Increment by */
1108 NULL, 0, NULL, NULL, NULL);
1109
1110 OBJ_CONSTRUCT(&content_ref_free_list, opal_free_list_t);
1111 opal_free_list_init (&content_ref_free_list,
1112 sizeof(ompi_crcp_bkmrk_pml_message_content_ref_t),
1113 opal_cache_line_size,
1114 OBJ_CLASS(ompi_crcp_bkmrk_pml_message_content_ref_t),
1115 0,opal_cache_line_size,
1116 80, /* Initial number */
1117 -1, /* Max = Unlimited */
1118 32, /* Increment by */
1119 NULL, 0, NULL, NULL, NULL);
1120
1121 OBJ_CONSTRUCT(&peer_ref_free_list, opal_free_list_t);
1122 opal_free_list_init (&peer_ref_free_list,
1123 sizeof(ompi_crcp_bkmrk_pml_peer_ref_t),
1124 opal_cache_line_size,
1125 OBJ_CLASS(ompi_crcp_bkmrk_pml_peer_ref_t),
1126 0,opal_cache_line_size,
1127 16, /* Initial number */
1128 -1, /* Max = Unlimited */
1129 16, /* Increment by */
1130 NULL, 0, NULL, NULL, NULL);
1131
1132 OBJ_CONSTRUCT(&traffic_msg_ref_free_list, opal_free_list_t);
1133 opal_free_list_init (&traffic_msg_ref_free_list,
1134 sizeof(ompi_crcp_bkmrk_pml_traffic_message_ref_t),
1135 opal_cache_line_size,
1136 OBJ_CLASS(ompi_crcp_bkmrk_pml_traffic_message_ref_t),
1137 0,opal_cache_line_size,
1138 32, /* Initial number */
1139 -1, /* Max = Unlimited */
1140 64, /* Increment by */
1141 NULL, 0, NULL, NULL, NULL);
1142
1143 OBJ_CONSTRUCT(&drain_msg_ref_free_list, opal_free_list_t);
1144 opal_free_list_init (&drain_msg_ref_free_list,
1145 sizeof(ompi_crcp_bkmrk_pml_drain_message_ref_t),
1146 opal_cache_line_size,
1147 OBJ_CLASS(ompi_crcp_bkmrk_pml_drain_message_ref_t),
1148 0,opal_cache_line_size,
1149 32, /* Initial number */
1150 -1, /* Max = Unlimited */
1151 64, /* Increment by */
1152 NULL, 0, NULL, NULL, NULL);
1153
1154 OBJ_CONSTRUCT(&drain_ack_msg_ref_free_list, opal_free_list_t);
1155 opal_free_list_init (&drain_ack_msg_ref_free_list,
1156 sizeof(ompi_crcp_bkmrk_pml_drain_message_ack_ref_t),
1157 opal_cache_line_size,
1158 OBJ_CLASS(ompi_crcp_bkmrk_pml_drain_message_ack_ref_t),
1159 0,opal_cache_line_size,
1160 16, /* Initial number */
1161 -1, /* Max = Unlimited */
1162 16, /* Increment by */
1163 NULL, 0, NULL, NULL, NULL);
1164
1165 clear_timers();
1166
1167 if( timing_enabled > 0 ) {
1168 timer_label[CRCP_TIMER_TOTAL_CKPT] = strdup("Total Ckpt.");
1169 timer_label[CRCP_TIMER_CKPT_EX_B] = strdup("Exchange Bookmarks");
1170 timer_label[CRCP_TIMER_CKPT_EX_PEER_S] = strdup(" Ex.Bk. Send Peer");
1171 timer_label[CRCP_TIMER_CKPT_EX_PEER_R] = strdup(" Ex.Bk. Recv Peer");
1172 timer_label[CRCP_TIMER_CKPT_EX_WAIT] = strdup(" Ex.Bk. Wait");
1173
1174 timer_label[CRCP_TIMER_CKPT_CHECK_B] = strdup("Check Bookmarks");
1175 timer_label[CRCP_TIMER_CKPT_CHECK_PEER_S] = strdup(" Ck.Bk. Send Peer");
1176 timer_label[CRCP_TIMER_CKPT_CHECK_PEER_R] = strdup(" Ck.Bk. Recv Peer");
1177
1178 timer_label[CRCP_TIMER_CKPT_POST_DRAIN] = strdup("Post Drain Msgs.");
1179 timer_label[CRCP_TIMER_CKPT_WAIT_QUI] = strdup("Wait for Quiescence");
1180
1181 timer_label[CRCP_TIMER_TOTAL_CONT] = strdup("Total Continue");
1182
1183 timer_label[CRCP_TIMER_TOTAL_RST] = strdup("Total Restart");
1184 }
1185
1186 return OMPI_SUCCESS;
1187 }
1188
ompi_crcp_bkmrk_pml_finalize(void)1189 int ompi_crcp_bkmrk_pml_finalize(void) {
1190 int i;
1191
1192 current_msg_id = 0;
1193 current_msg_type = COORD_MSG_TYPE_UNKNOWN;
1194 stall_for_completion = false;
1195 ft_event_state = OPAL_CRS_RUNNING;
1196
1197 OBJ_DESTRUCT(&ompi_crcp_bkmrk_pml_peer_refs);
1198
1199 OBJ_DESTRUCT(&unknown_recv_from_list);
1200 OBJ_DESTRUCT(&unknown_persist_recv_list);
1201
1202 OBJ_DESTRUCT(&drained_msg_ack_list);
1203
1204 /* Destroy All Free Lists */
1205 OBJ_DESTRUCT(&peer_ref_free_list);
1206 OBJ_DESTRUCT(&traffic_msg_ref_free_list);
1207 OBJ_DESTRUCT(&drain_msg_ref_free_list);
1208 OBJ_DESTRUCT(&drain_ack_msg_ref_free_list);
1209 OBJ_DESTRUCT(&content_ref_free_list);
1210
1211 if( timing_enabled > 0 ) {
1212 for(i = 0; i < CRCP_TIMER_MAX; ++i) {
1213 free(timer_label[i]);
1214 timer_label[i] = NULL;
1215 }
1216 }
1217
1218 return OMPI_SUCCESS;
1219 }
1220
1221 /****************
1222 * PML Wrapper
1223 ****************/
1224 /**************** Enable *****************/
ompi_crcp_bkmrk_pml_enable(bool enable,ompi_crcp_base_pml_state_t * pml_state)1225 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_enable(
1226 bool enable,
1227 ompi_crcp_base_pml_state_t* pml_state )
1228 {
1229 /* Note: This function is not used. Set to NULL in crcp_bkmrk_module.c */
1230 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1231 "crcp:bkmrk: pml_enable()"));
1232
1233 pml_state->error_code = OMPI_SUCCESS;
1234 return pml_state;
1235 }
1236
1237 /**************** Progress *****************/
ompi_crcp_bkmrk_pml_progress(ompi_crcp_base_pml_state_t * pml_state)1238 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_progress(
1239 ompi_crcp_base_pml_state_t* pml_state)
1240 {
1241 /* Note: This function is not used. Set to NULL in crcp_bkmrk_module.c */
1242
1243 OPAL_OUTPUT_VERBOSE((35, mca_crcp_bkmrk_component.super.output_handle,
1244 "crcp:bkmrk: pml_progress()"));
1245
1246 pml_state->error_code = OMPI_SUCCESS;
1247 return pml_state;
1248 }
1249
1250 /**************** Probe *****************/
1251 /* JJH - Code reuse: Combine iprobe and probe logic */
ompi_crcp_bkmrk_pml_iprobe(int dst,int tag,struct ompi_communicator_t * comm,int * matched,ompi_status_public_t * status,ompi_crcp_base_pml_state_t * pml_state)1252 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_iprobe(
1253 int dst, int tag,
1254 struct ompi_communicator_t* comm,
1255 int *matched,
1256 ompi_status_public_t* status,
1257 ompi_crcp_base_pml_state_t* pml_state )
1258 {
1259 ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref = NULL;
1260 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
1261 int exit_status = OMPI_SUCCESS;
1262 int ret;
1263
1264 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1265 "crcp:bkmrk: pml_iprobe(%d, %d)", dst, tag));
1266
1267 /*
1268 * Before PML Call
1269 * - Determine if this can be satisfied from the drained list
1270 * - Otherwise let the PML handle it
1271 */
1272 if( OMPI_CRCP_PML_PRE == pml_state->state) {
1273 /*
1274 * Check to see if this message is in the drained message list
1275 */
1276 if( OMPI_SUCCESS != (ret = drain_message_find_any(PROBE_ANY_COUNT, tag, dst,
1277 comm, PROBE_ANY_SIZE,
1278 &drain_msg_ref,
1279 &content_ref,
1280 NULL) ) ) {
1281 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_iprobe(): Failed trying to find a drained message.");
1282 exit_status = ret;
1283 goto DONE;
1284 }
1285
1286 /*
1287 * If the message is a drained message
1288 * - Copy of the status structure to pass back to the user
1289 * - Mark the 'matched' flag as true
1290 */
1291 if( NULL != drain_msg_ref ) {
1292 OPAL_OUTPUT_VERBOSE((12, mca_crcp_bkmrk_component.super.output_handle,
1293 "crcp:bkmrk: pml_iprobe(): Matched a drained message..."));
1294
1295 /* Copy the status information */
1296 if( MPI_STATUS_IGNORE != status ) {
1297 memcpy(status, &content_ref->status, sizeof(ompi_status_public_t));
1298 }
1299
1300 /* Mark as complete */
1301 *matched = 1;
1302
1303 /* This will identify to the wrapper that this message is complete */
1304 pml_state->state = OMPI_CRCP_PML_DONE;
1305 pml_state->error_code = OMPI_SUCCESS;
1306 return pml_state;
1307 }
1308 /*
1309 * Otherwise the message is not drained (common case), so let the PML deal with it
1310 */
1311 else {
1312 /* Mark as not complete */
1313 *matched = 0;
1314 }
1315 }
1316
1317 DONE:
1318 pml_state->error_code = exit_status;
1319 return pml_state;
1320 }
1321
ompi_crcp_bkmrk_pml_probe(int dst,int tag,struct ompi_communicator_t * comm,ompi_status_public_t * status,ompi_crcp_base_pml_state_t * pml_state)1322 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_probe(
1323 int dst, int tag,
1324 struct ompi_communicator_t* comm,
1325 ompi_status_public_t* status,
1326 ompi_crcp_base_pml_state_t* pml_state )
1327 {
1328 ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref = NULL;
1329 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
1330 int exit_status = OMPI_SUCCESS;
1331 int ret;
1332
1333 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1334 "crcp:bkmrk: pml_probe(%d, %d)", dst, tag));
1335
1336 /*
1337 * Before PML Call
1338 * - Determine if this can be satisfied from the drained list
1339 * - Otherwise let the PML handle it
1340 */
1341 if( OMPI_CRCP_PML_PRE == pml_state->state) {
1342 /*
1343 * Check to see if this message is in the drained message list
1344 */
1345 if( OMPI_SUCCESS != (ret = drain_message_find_any(PROBE_ANY_COUNT, tag, dst,
1346 comm, PROBE_ANY_SIZE,
1347 &drain_msg_ref,
1348 &content_ref,
1349 NULL) ) ) {
1350 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_probe(): Failed trying to find a drained message.");
1351 exit_status = ret;
1352 goto DONE;
1353 }
1354
1355 /*
1356 * If the message is a drained message
1357 * - Copy of the status structure to pass back to the user
1358 */
1359 if( NULL != drain_msg_ref ) {
1360 OPAL_OUTPUT_VERBOSE((12, mca_crcp_bkmrk_component.super.output_handle,
1361 "crcp:bkmrk: pml_iprobe(): Matched a drained message..."));
1362
1363 /* Copy the status information */
1364 if( MPI_STATUS_IGNORE != status ) {
1365 memcpy(status, &content_ref->status, sizeof(ompi_status_public_t));
1366 }
1367
1368 /* This will identify to the wrapper that this message is complete */
1369 pml_state->state = OMPI_CRCP_PML_DONE;
1370 pml_state->error_code = OMPI_SUCCESS;
1371 return pml_state;
1372 }
1373 }
1374
1375 DONE:
1376 pml_state->error_code = exit_status;
1377 return pml_state;
1378 }
1379
1380 /**************** Dump *****************/
ompi_crcp_bkmrk_pml_dump(struct ompi_communicator_t * comm,int verbose,ompi_crcp_base_pml_state_t * pml_state)1381 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_dump(
1382 struct ompi_communicator_t* comm,
1383 int verbose,
1384 ompi_crcp_base_pml_state_t* pml_state )
1385 {
1386 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1387 "crcp:bkmrk: pml_dump()"));
1388
1389 pml_state->error_code = OMPI_SUCCESS;
1390 return pml_state;
1391 }
1392
1393
1394 /**************** Communicator *****************/
ompi_crcp_bkmrk_pml_add_comm(struct ompi_communicator_t * comm,ompi_crcp_base_pml_state_t * pml_state)1395 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_add_comm(
1396 struct ompi_communicator_t* comm,
1397 ompi_crcp_base_pml_state_t* pml_state )
1398 {
1399 /* Note: This function is not used. Set to NULL in crcp_bkmrk_module.c */
1400
1401 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1402 "crcp:bkmrk: pml_add_comm()"));
1403
1404 pml_state->error_code = OMPI_SUCCESS;
1405 return pml_state;
1406 }
1407
ompi_crcp_bkmrk_pml_del_comm(struct ompi_communicator_t * comm,ompi_crcp_base_pml_state_t * pml_state)1408 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_del_comm(
1409 struct ompi_communicator_t* comm,
1410 ompi_crcp_base_pml_state_t* pml_state )
1411 {
1412 /* Note: This function is not used. Set to NULL in crcp_bkmrk_module.c */
1413
1414 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1415 "crcp:bkmrk: pml_del_comm()"));
1416
1417 pml_state->error_code = OMPI_SUCCESS;
1418 return pml_state;
1419 }
1420
1421 /**************** Processes *****************/
ompi_crcp_bkmrk_pml_add_procs(struct ompi_proc_t ** procs,size_t nprocs,ompi_crcp_base_pml_state_t * pml_state)1422 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_add_procs(
1423 struct ompi_proc_t **procs,
1424 size_t nprocs,
1425 ompi_crcp_base_pml_state_t* pml_state )
1426 {
1427 ompi_crcp_bkmrk_pml_peer_ref_t *new_peer_ref;
1428 size_t i;
1429
1430 if( OMPI_CRCP_PML_PRE != pml_state->state ){
1431 goto DONE;
1432 }
1433
1434 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1435 "crcp:bkmrk: pml_add_procs()"));
1436
1437 /*
1438 * Save pointers to the wrapped PML
1439 */
1440 wrapped_pml_component = pml_state->wrapped_pml_component;
1441 wrapped_pml_module = pml_state->wrapped_pml_module;
1442
1443 /*
1444 * Create a peer_ref for each peer added
1445 */
1446 for( i = 0; i < nprocs; ++i) {
1447 HOKE_PEER_REF_ALLOC(new_peer_ref);
1448
1449 new_peer_ref->proc_name.jobid = OMPI_CAST_RTE_NAME(&procs[i]->super.proc_name)->jobid;
1450 new_peer_ref->proc_name.vpid = OMPI_CAST_RTE_NAME(&procs[i]->super.proc_name)->vpid;
1451
1452 opal_list_append(&ompi_crcp_bkmrk_pml_peer_refs, &(new_peer_ref->super));
1453 }
1454
1455 DONE:
1456 pml_state->error_code = OMPI_SUCCESS;
1457 return pml_state;
1458 }
1459
ompi_crcp_bkmrk_pml_del_procs(struct ompi_proc_t ** procs,size_t nprocs,ompi_crcp_base_pml_state_t * pml_state)1460 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_del_procs(
1461 struct ompi_proc_t **procs,
1462 size_t nprocs,
1463 ompi_crcp_base_pml_state_t* pml_state )
1464 {
1465 opal_list_item_t *item = NULL;
1466 ompi_crcp_bkmrk_pml_peer_ref_t *old_peer_ref;
1467 int exit_status = OMPI_SUCCESS;
1468 size_t i;
1469
1470 if( OMPI_CRCP_PML_PRE != pml_state->state ){
1471 goto DONE;
1472 }
1473
1474 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1475 "crcp:bkmrk: pml_del_procs()"));
1476
1477 for( i = 0; i < nprocs; ++i) {
1478 item = (opal_list_item_t*)find_peer(*(ompi_process_name_t*)&procs[i]->super.proc_name);
1479 if(NULL == item) {
1480 opal_output(mca_crcp_bkmrk_component.super.output_handle,
1481 "crcp:bkmrk: del_procs: Unable to find peer %s\n",
1482 OMPI_NAME_PRINT(&procs[i]->super.proc_name));
1483 exit_status = OMPI_ERROR;
1484 goto DONE;
1485 }
1486
1487 /* Remove the found peer from the list */
1488 opal_list_remove_item(&ompi_crcp_bkmrk_pml_peer_refs, item);
1489 old_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
1490 HOKE_PEER_REF_RETURN(old_peer_ref);
1491 }
1492
1493 DONE:
1494 pml_state->error_code = exit_status;
1495 return pml_state;
1496 }
1497
1498 /**************** Send *****************/
ompi_crcp_bkmrk_pml_isend_init(void * buf,size_t count,ompi_datatype_t * datatype,int dst,int tag,mca_pml_base_send_mode_t mode,struct ompi_communicator_t * comm,struct ompi_request_t ** request,ompi_crcp_base_pml_state_t * pml_state)1499 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_isend_init(
1500 void *buf, size_t count,
1501 ompi_datatype_t *datatype,
1502 int dst, int tag,
1503 mca_pml_base_send_mode_t mode,
1504 struct ompi_communicator_t* comm,
1505 struct ompi_request_t **request,
1506 ompi_crcp_base_pml_state_t* pml_state )
1507 {
1508 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
1509 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
1510 ompi_crcp_bkmrk_pml_state_t *coord_state = NULL;
1511 int exit_status = OMPI_SUCCESS;
1512 int ret;
1513
1514 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1515 "crcp:bkmrk: pml_isend_init()"));
1516
1517 /*
1518 * Before the PML gets the message:
1519 * - Setup structure to track the message
1520 */
1521 if( OMPI_CRCP_PML_PRE == pml_state->state ) {
1522 /*
1523 * Find the peer reference
1524 */
1525 if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, dst, &peer_ref) ) ){
1526 opal_output(mca_crcp_bkmrk_component.super.output_handle,
1527 "crcp:bkmrk: isend: Failed to find peer_ref\n");
1528 exit_status = ret;
1529 goto DONE;
1530 }
1531
1532 /*
1533 * Archive the message Message Object
1534 */
1535 traffic_message_append(peer_ref, &(peer_ref->send_init_list),
1536 COORD_MSG_TYPE_P_SEND,
1537 count, datatype, 0, tag, dst, comm,
1538 &msg_ref);
1539
1540 /* Save the pointers */
1541 CREATE_COORD_STATE(coord_state, pml_state,
1542 peer_ref, msg_ref);
1543
1544 coord_state->p_super.error_code = OMPI_SUCCESS;
1545 return &coord_state->p_super;
1546 }
1547 /*
1548 * After PML is done, update message reference
1549 */
1550 else if( OMPI_CRCP_PML_POST == pml_state->state ) {
1551 ompi_crcp_base_pml_state_t *rtn_state = NULL;
1552 ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL;
1553
1554 EXTRACT_COORD_STATE(pml_state, coord_state, rtn_state,
1555 peer_ref, msg_ref);
1556
1557 /*
1558 * Update Message
1559 */
1560 HOKE_CONTENT_REF_ALLOC(new_content);
1561 new_content->buffer = buf;
1562 new_content->request = *request;
1563 new_content->done = false;
1564 new_content->active = false;
1565 new_content->already_posted = true;
1566 new_content->already_drained = false;
1567 OBJ_RETAIN(*request);
1568 opal_list_append(&(msg_ref->msg_contents), &(new_content->super) );
1569
1570 CRCP_COORD_STATE_RETURN(coord_state);
1571
1572 rtn_state->error_code = OMPI_SUCCESS;
1573 return rtn_state;
1574 }
1575
1576 DONE:
1577 pml_state->error_code = exit_status;
1578 return pml_state;
1579 }
1580
ompi_crcp_bkmrk_pml_start_isend_init(ompi_request_t ** request)1581 static int ompi_crcp_bkmrk_pml_start_isend_init(ompi_request_t **request)
1582 {
1583 int ret, exit_status = OMPI_SUCCESS;
1584 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
1585 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
1586 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
1587 mca_pml_base_request_t *breq = NULL;
1588 size_t tmp_ddt_size = 0;
1589
1590 breq = (mca_pml_base_request_t *)(*request);
1591 ompi_datatype_type_size(breq->req_datatype, &tmp_ddt_size);
1592
1593 /*
1594 * Find the peer reference
1595 */
1596 if( OMPI_SUCCESS != (ret = find_peer_in_comm(breq->req_comm,
1597 breq->req_peer,
1598 &peer_ref) ) ){
1599 opal_output(mca_crcp_bkmrk_component.super.output_handle,
1600 "crcp:bkmrk: req_start(): Failed to find peer_ref\n");
1601 exit_status = ret;
1602 goto DONE;
1603 }
1604
1605 /* Check the send_init list */
1606 if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->send_init_list),
1607 breq->req_count,
1608 breq->req_tag,
1609 breq->req_peer,
1610 breq->req_comm->c_contextid,
1611 tmp_ddt_size,
1612 &msg_ref,
1613 PERSIST_MARKER
1614 ) ) ) {
1615 opal_output(mca_crcp_bkmrk_component.super.output_handle,
1616 "crcp:bkmrk: pml_start(): Unable to find the proper (send_init) message ref for this recv\n");
1617 exit_status = ret;
1618 goto DONE;
1619 }
1620
1621 if( NULL == msg_ref ) {
1622 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_start(): Could not find message ref");
1623 exit_status = OMPI_ERROR;
1624 goto DONE;
1625 } else {
1626 traffic_message_start(msg_ref,
1627 peer_ref,
1628 request,
1629 &(peer_ref->send_init_list),
1630 &content_ref);
1631
1632 if( !content_ref->already_drained ) {
1633 /* Account for this inflight send */
1634 peer_ref->total_msgs_sent += 1;
1635 }
1636 }
1637
1638 DONE:
1639 return exit_status;
1640 }
1641
ompi_crcp_bkmrk_request_complete_isend_init(struct ompi_request_t * request,ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref,int src,int tag,int tmp_ddt_size)1642 static int ompi_crcp_bkmrk_request_complete_isend_init(struct ompi_request_t *request,
1643 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
1644 int src, int tag, int tmp_ddt_size)
1645 {
1646 int ret, exit_status = OMPI_SUCCESS;
1647 mca_pml_base_request_t *breq = NULL;
1648 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
1649 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
1650
1651 breq = (mca_pml_base_request_t *)request;
1652
1653 /* Check the isend_init list */
1654 if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->send_init_list),
1655 breq->req_count,
1656 tag, src,
1657 breq->req_comm->c_contextid,
1658 tmp_ddt_size,
1659 &msg_ref,
1660 FIND_MSG_TRUE
1661 ) ) ) {
1662 opal_output(mca_crcp_bkmrk_component.super.output_handle,
1663 "crcp:bkmrk: req_complete: Unable to find the proper (send_init) message ref for this complete\n");
1664 exit_status = ret;
1665 goto DONE;
1666 }
1667
1668 if( NULL == msg_ref ) {
1669 /*
1670 * It is possible that we did not 'find' the message because
1671 * we could have previously marked it as done. Due to the
1672 * logic in the Request Wait/Test routines we could
1673 * receive multiple request complete calls for the
1674 * same request.
1675 *
1676 * It is possible that we have 'completed' this message previously,
1677 * so this case can occur during normal operation.
1678 * This is caused by us checking for completeness twice in ompi_request_wait_all.
1679 */
1680 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
1681 "crcp:bkmrk: request_complete: No match found for this request :( %d, %d ): [%d/%d,%d]\n",
1682 peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd,
1683 breq->req_peer, src, breq->req_comm->c_contextid));
1684 exit_status = OMPI_SUCCESS;
1685 goto DONE;
1686 }
1687
1688 /* Mark request as inactive */
1689 traffic_message_find_mark_persistent(msg_ref, &request,
1690 true, /* Find currently active */
1691 false, /* Mark as inactive */
1692 &content_ref);
1693
1694 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Request Complete (Send_init) --", true));
1695
1696 if( !content_ref->already_drained ) {
1697 msg_ref->done++;
1698 msg_ref->active--;
1699 } else {
1700 msg_ref->active_drain--;
1701 content_ref->already_drained = false;
1702 }
1703
1704 OPAL_OUTPUT_VERBOSE((25, mca_crcp_bkmrk_component.super.output_handle,
1705 "crcp:bkmrk: req_complete: Marked Message... ( %d, %d )\n",
1706 peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd));
1707 DONE:
1708 return exit_status;
1709 }
1710
1711
ompi_crcp_bkmrk_pml_isend(void * buf,size_t count,ompi_datatype_t * datatype,int dst,int tag,mca_pml_base_send_mode_t mode,struct ompi_communicator_t * comm,struct ompi_request_t ** request,ompi_crcp_base_pml_state_t * pml_state)1712 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_isend(
1713 void *buf, size_t count,
1714 ompi_datatype_t *datatype,
1715 int dst, int tag,
1716 mca_pml_base_send_mode_t mode,
1717 struct ompi_communicator_t* comm,
1718 struct ompi_request_t **request,
1719 ompi_crcp_base_pml_state_t* pml_state )
1720 {
1721 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
1722 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
1723 ompi_crcp_bkmrk_pml_state_t *coord_state = NULL;
1724 int exit_status = OMPI_SUCCESS;
1725 int ret;
1726
1727 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1728 "crcp:bkmrk: pml_isend()"));
1729
1730 /*
1731 * Before the PML gets the message:
1732 * - Setup structure to track the message
1733 */
1734 if( OMPI_CRCP_PML_PRE == pml_state->state ) {
1735 /*
1736 * Find the peer reference
1737 */
1738 if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, dst, &peer_ref) ) ){
1739 opal_output(mca_crcp_bkmrk_component.super.output_handle,
1740 "crcp:bkmrk: isend: Failed to find peer_ref\n");
1741 exit_status = ret;
1742 goto DONE;
1743 }
1744
1745 /*
1746 * Archive the message Message Object
1747 */
1748 traffic_message_append(peer_ref, &(peer_ref->isend_list),
1749 COORD_MSG_TYPE_I_SEND,
1750 count, datatype, 0, tag, dst, comm,
1751 &msg_ref);
1752
1753 /* Bookkeeping */
1754 peer_ref->total_msgs_sent += 1;
1755
1756 /* Save the pointers */
1757 CREATE_COORD_STATE(coord_state, pml_state,
1758 peer_ref, msg_ref);
1759
1760 coord_state->p_super.error_code = OMPI_SUCCESS;
1761 return &coord_state->p_super;
1762 }
1763 /*
1764 * After PML is done, update message reference
1765 */
1766 else if( OMPI_CRCP_PML_POST == pml_state->state ) {
1767 ompi_crcp_base_pml_state_t *rtn_state = NULL;
1768 ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL;
1769
1770 EXTRACT_COORD_STATE(pml_state, coord_state, rtn_state,
1771 peer_ref, msg_ref);
1772
1773 /*
1774 * Update Message
1775 */
1776 HOKE_CONTENT_REF_ALLOC(new_content);
1777 new_content->buffer = NULL; /* No Tracked */
1778 new_content->request = *request;
1779 new_content->done = false;
1780 new_content->active = true;
1781 new_content->already_posted = true;
1782 new_content->already_drained = false;
1783 OBJ_RETAIN(*request);
1784 opal_list_append(&(msg_ref->msg_contents), &(new_content->super) );
1785
1786 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (isend) --", true));
1787
1788 CRCP_COORD_STATE_RETURN(coord_state);
1789
1790 rtn_state->error_code = OMPI_SUCCESS;
1791 return rtn_state;
1792 }
1793
1794 DONE:
1795 pml_state->error_code = exit_status;
1796 return pml_state;
1797 }
1798
ompi_crcp_bkmrk_request_complete_isend(struct ompi_request_t * request,ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref,int src,int tag,int tmp_ddt_size)1799 static int ompi_crcp_bkmrk_request_complete_isend(struct ompi_request_t *request,
1800 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
1801 int src, int tag, int tmp_ddt_size)
1802 {
1803 int ret, exit_status = OMPI_SUCCESS;
1804 mca_pml_base_request_t *breq = NULL;
1805 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
1806 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
1807
1808 breq = (mca_pml_base_request_t *)request;
1809
1810 /* Check the isend list */
1811 if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->isend_list),
1812 breq->req_count,
1813 tag, src,
1814 breq->req_comm->c_contextid,
1815 tmp_ddt_size,
1816 &msg_ref,
1817 FIND_MSG_TRUE
1818 ) ) ) {
1819 opal_output(mca_crcp_bkmrk_component.super.output_handle,
1820 "crcp:bkmrk: req_complete: Unable to find the proper (isend) message ref for this complete\n");
1821 exit_status = ret;
1822 goto DONE;
1823 }
1824
1825 if( NULL == msg_ref ) {
1826 /*
1827 * It is possible that we did not 'find' the message because
1828 * we could have previously marked it as done. Due to the
1829 * logic in the Request Wait/Test routines we could
1830 * receive multiple request complete calls for the
1831 * same request.
1832 *
1833 * It is possible that we have 'completed' this message previously,
1834 * so this case can occur during normal operation.
1835 * This is caused by us checking for completeness twice in ompi_request_wait_all.
1836 */
1837 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
1838 "crcp:bkmrk: request_complete: No match found for this request :( %d, %d ): [%d/%d,%d]\n",
1839 peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd,
1840 breq->req_peer, src, breq->req_comm->c_contextid));
1841 exit_status = OMPI_SUCCESS;
1842 goto DONE;
1843 }
1844
1845 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
1846 "crcp:bkmrk: req_complete: Matched an iSend: total = %d",
1847 peer_ref->total_msgs_sent));
1848
1849 /* Strip off an isend request */
1850 traffic_message_grab_content(msg_ref, &content_ref, true, true); /* Remove, prefer already_drained */
1851
1852 if( !content_ref->already_drained ) {
1853 msg_ref->done++;
1854 msg_ref->active--;
1855 } else {
1856 msg_ref->active_drain--;
1857 content_ref->already_drained = false;
1858 }
1859 HOKE_CONTENT_REF_RETURN(content_ref);
1860
1861 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Request Complete (iSend) --", true));
1862
1863 OPAL_OUTPUT_VERBOSE((25, mca_crcp_bkmrk_component.super.output_handle,
1864 "crcp:bkmrk: req_complete: Marked Message... ( %d, %d )\n",
1865 peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd));
1866 DONE:
1867 return exit_status;
1868 }
1869
1870
ompi_crcp_bkmrk_pml_send(void * buf,size_t count,ompi_datatype_t * datatype,int dst,int tag,mca_pml_base_send_mode_t mode,struct ompi_communicator_t * comm,ompi_crcp_base_pml_state_t * pml_state)1871 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_send(
1872 void *buf, size_t count,
1873 ompi_datatype_t *datatype,
1874 int dst, int tag,
1875 mca_pml_base_send_mode_t mode,
1876 struct ompi_communicator_t* comm,
1877 ompi_crcp_base_pml_state_t* pml_state )
1878 {
1879 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
1880 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
1881 ompi_crcp_bkmrk_pml_state_t *coord_state = NULL;
1882 int exit_status = OMPI_SUCCESS;
1883 int ret;
1884
1885 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1886 "crcp:bkmrk: pml_send()"));
1887
1888 /*
1889 * Before the PML gets the message:
1890 * - Setup structure to track the message
1891 */
1892 if( OMPI_CRCP_PML_PRE == pml_state->state ) {
1893 /*
1894 * Find the peer reference
1895 */
1896 if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, dst, &peer_ref) ) ){
1897 opal_output(mca_crcp_bkmrk_component.super.output_handle,
1898 "crcp:bkmrk: send: Failed to find peer_ref\n");
1899 exit_status = ret;
1900 goto DONE;
1901 }
1902
1903 /*
1904 * Archive the message Message Object
1905 */
1906 traffic_message_append(peer_ref, &(peer_ref->send_list),
1907 COORD_MSG_TYPE_B_SEND,
1908 count, datatype, 0, tag, dst, comm,
1909 &msg_ref);
1910
1911 /* Bookkeeping */
1912 peer_ref->total_msgs_sent += 1;
1913 current_msg_id = msg_ref->msg_id;
1914 current_msg_type = COORD_MSG_TYPE_B_SEND;
1915
1916 /* Save the pointers */
1917 CREATE_COORD_STATE(coord_state, pml_state,
1918 peer_ref, msg_ref);
1919 coord_state->p_super.error_code = OMPI_SUCCESS;
1920
1921 return &coord_state->p_super;
1922 }
1923 /*
1924 * After PML is done, update message reference
1925 */
1926 else if( OMPI_CRCP_PML_POST == pml_state->state ) {
1927 ompi_crcp_base_pml_state_t *rtn_state = NULL;
1928
1929 EXTRACT_COORD_STATE(pml_state, coord_state, rtn_state,
1930 peer_ref, msg_ref);
1931
1932 /*
1933 * Update Message
1934 */
1935 msg_ref->done++;
1936 msg_ref->active--;
1937
1938 current_msg_id = 0;
1939 current_msg_type = COORD_MSG_TYPE_UNKNOWN;
1940
1941 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "Send done", true));
1942
1943 CRCP_COORD_STATE_RETURN(coord_state);
1944 rtn_state->error_code = OMPI_SUCCESS;
1945
1946 return rtn_state;
1947 }
1948
1949 DONE:
1950 pml_state->error_code = exit_status;
1951 return pml_state;
1952 }
1953
1954 /**************** Recv *****************/
ompi_crcp_bkmrk_pml_irecv_init(void * buf,size_t count,ompi_datatype_t * datatype,int src,int tag,struct ompi_communicator_t * comm,struct ompi_request_t ** request,ompi_crcp_base_pml_state_t * pml_state)1955 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_irecv_init(
1956 void *buf, size_t count,
1957 ompi_datatype_t *datatype,
1958 int src, int tag,
1959 struct ompi_communicator_t* comm,
1960 struct ompi_request_t **request,
1961 ompi_crcp_base_pml_state_t* pml_state)
1962 {
1963 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
1964 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
1965 ompi_crcp_bkmrk_pml_state_t *coord_state = NULL;
1966 int exit_status = OMPI_SUCCESS;
1967 int ret;
1968
1969 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
1970 "crcp:bkmrk: pml_irecv_init()"));
1971
1972 /*
1973 * Before PML Call
1974 * - Determine if this can be satisfied from the drained list
1975 * - Otherwise create a new reference to it so we can track it
1976 */
1977 if( OMPI_CRCP_PML_PRE == pml_state->state) {
1978 /*
1979 * A message will never be on the drained list for this function since
1980 * it does not actually receive anything, just sets up the system.
1981 * The receive for these reqeusts are done in the start() and wait()
1982 * commands.
1983 */
1984
1985 /*
1986 * Find the Peer
1987 */
1988 if( MPI_ANY_SOURCE == src || src < 0) {
1989 /*
1990 * Archive the message Message Object
1991 */
1992 traffic_message_append(NULL, &(unknown_persist_recv_list),
1993 COORD_MSG_TYPE_P_RECV,
1994 count, datatype, 0, tag, src, comm,
1995 &msg_ref);
1996
1997 CREATE_COORD_STATE(coord_state, pml_state,
1998 NULL, msg_ref);
1999 }
2000 else {
2001 if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, src, &peer_ref) ) ){
2002 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2003 "crcp:bkmrk: recv: Failed to find peer_ref\n");
2004 exit_status = ret;
2005 goto DONE;
2006 }
2007
2008 /*
2009 * Archive the message Message Object
2010 */
2011 traffic_message_append(peer_ref, &(peer_ref->recv_init_list),
2012 COORD_MSG_TYPE_P_RECV,
2013 count, datatype, 0, tag, src, comm,
2014 &msg_ref);
2015
2016 CREATE_COORD_STATE(coord_state, pml_state,
2017 peer_ref, msg_ref);
2018 }
2019
2020 coord_state->p_super.error_code = OMPI_SUCCESS;
2021 return &coord_state->p_super;
2022 }
2023 /*
2024 * Post PML Call
2025 * - bookkeeping...
2026 */
2027 else if( OMPI_CRCP_PML_POST == pml_state->state) {
2028 ompi_crcp_base_pml_state_t *rtn_state = NULL;
2029 ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL;
2030
2031 EXTRACT_COORD_STATE(pml_state, coord_state, rtn_state,
2032 peer_ref, msg_ref);
2033
2034 /*
2035 * Do the update
2036 */
2037 HOKE_CONTENT_REF_ALLOC(new_content);
2038 new_content->buffer = buf;
2039 new_content->request = *request;
2040 new_content->done = false;
2041 new_content->active = false;
2042 new_content->already_posted = true;
2043 new_content->already_drained = false;
2044 OBJ_RETAIN(*request);
2045 opal_list_append(&(msg_ref->msg_contents), &(new_content->super) );
2046
2047 CRCP_COORD_STATE_RETURN(coord_state);
2048
2049 rtn_state->error_code = OMPI_SUCCESS;
2050 return rtn_state;
2051 }
2052
2053 DONE:
2054 pml_state->error_code = exit_status;
2055 return pml_state;
2056 }
2057
ompi_crcp_bkmrk_pml_start_drain_irecv_init(ompi_request_t ** request,bool * found_drain)2058 static int ompi_crcp_bkmrk_pml_start_drain_irecv_init(ompi_request_t **request, bool *found_drain)
2059 {
2060 int ret, exit_status = OMPI_SUCCESS;
2061 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
2062 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
2063 ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref = NULL;
2064 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
2065 mca_pml_base_request_t *breq = NULL;
2066 size_t tmp_ddt_size = 0;
2067
2068 *found_drain = false;
2069
2070 breq = (mca_pml_base_request_t *)(*request);
2071 ompi_datatype_type_size(breq->req_datatype, &tmp_ddt_size);
2072
2073 /*
2074 * If peer rank is given then find the peer reference
2075 */
2076 if( 0 <= breq->req_peer ) {
2077 if( OMPI_SUCCESS != (ret = find_peer_in_comm(breq->req_comm,
2078 breq->req_peer,
2079 &peer_ref) ) ){
2080 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2081 "crcp:bkmrk: pml_start(): Failed to find peer_ref\n");
2082 exit_status = ret;
2083 goto DONE;
2084 }
2085
2086 if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->recv_init_list),
2087 breq->req_count,
2088 breq->req_tag,
2089 breq->req_peer,
2090 breq->req_comm->c_contextid,
2091 tmp_ddt_size,
2092 &msg_ref,
2093 PERSIST_MARKER
2094 ) ) ) {
2095 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2096 "crcp:bkmrk: pml_start(): Unable to find the proper (recv) message ref for this recv\n");
2097 exit_status = ret;
2098 goto DONE;
2099 }
2100 }
2101 /*
2102 * Otherwise peer is not known
2103 */
2104 else {
2105 if( OMPI_SUCCESS != (ret = traffic_message_find(&(unknown_persist_recv_list),
2106 breq->req_count,
2107 breq->req_tag,
2108 INVALID_INT,
2109 breq->req_comm->c_contextid,
2110 tmp_ddt_size,
2111 &msg_ref,
2112 PERSIST_MARKER
2113 ) ) ) {
2114 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2115 "crcp:bkmrk: pml_start(): Unable to find the proper (recv) message ref for this recv\n");
2116 exit_status = ret;
2117 goto DONE;
2118 }
2119 }
2120
2121 /*
2122 * No message found :(
2123 */
2124 if( NULL == msg_ref ) {
2125 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_start(): Could not find message ref");
2126 exit_status = OMPI_ERROR;
2127 goto DONE;
2128 }
2129
2130 /*
2131 * See if this mesage was already drained.
2132 */
2133 if( NULL != peer_ref ) {
2134 if( OMPI_SUCCESS != (ret = drain_message_find(&(peer_ref->drained_list),
2135 msg_ref->count, msg_ref->tag, msg_ref->rank,
2136 msg_ref->comm->c_contextid, msg_ref->ddt_size,
2137 &drain_msg_ref,
2138 &content_ref) ) ) {
2139 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_start(): Failed trying to find a drained message.");
2140 exit_status = ret;
2141 goto DONE;
2142 }
2143 } else {
2144 if( OMPI_SUCCESS != (ret = drain_message_find_any(msg_ref->count, msg_ref->tag, msg_ref->rank,
2145 msg_ref->comm, msg_ref->ddt_size,
2146 &drain_msg_ref,
2147 &content_ref,
2148 &peer_ref) ) ) {
2149 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_start(): Failed trying to find a drained message.");
2150 exit_status = ret;
2151 goto DONE;
2152 }
2153 }
2154
2155 /*
2156 * Found a drained message!
2157 */
2158 if( NULL != drain_msg_ref ) {
2159 *found_drain = true;
2160 OPAL_OUTPUT_VERBOSE((12, mca_crcp_bkmrk_component.super.output_handle,
2161 "crcp:bkmrk: pml_start(): Matched a drained message..."));
2162
2163 if( OMPI_SUCCESS != (ret = drain_message_copy_remove_persistent(drain_msg_ref,
2164 content_ref,
2165 msg_ref,
2166 *request,
2167 peer_ref) ) ) {
2168 opal_output( mca_crcp_bkmrk_component.super.output_handle,
2169 "crcp:bkmrk: pml_start(): Datatype copy failed (%d)",
2170 ret);
2171 }
2172
2173 peer_ref->total_drained_msgs -= 1;
2174 }
2175
2176 DONE:
2177 return exit_status;
2178 }
2179
ompi_crcp_bkmrk_pml_start_irecv_init(ompi_request_t ** request)2180 static int ompi_crcp_bkmrk_pml_start_irecv_init(ompi_request_t **request)
2181 {
2182 int ret, exit_status = OMPI_SUCCESS;
2183 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
2184 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
2185 mca_pml_base_request_t *breq = NULL;
2186 size_t tmp_ddt_size = 0;
2187
2188 breq = (mca_pml_base_request_t *)(*request);
2189 ompi_datatype_type_size(breq->req_datatype, &tmp_ddt_size);
2190
2191 /*
2192 * If peer rank is given then find the peer reference
2193 */
2194 if( 0 <= breq->req_peer ) {
2195 if( OMPI_SUCCESS != (ret = find_peer_in_comm(breq->req_comm,
2196 breq->req_peer,
2197 &peer_ref) ) ){
2198 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2199 "crcp:bkmrk: pml_start(): Failed to find peer_ref\n");
2200 exit_status = ret;
2201 goto DONE;
2202 }
2203
2204 if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->recv_init_list),
2205 breq->req_count,
2206 breq->req_tag,
2207 breq->req_peer,
2208 breq->req_comm->c_contextid,
2209 tmp_ddt_size,
2210 &msg_ref,
2211 PERSIST_MARKER
2212 ) ) ) {
2213 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2214 "crcp:bkmrk: pml_start(): Unable to find the proper (recv) message ref for this recv\n");
2215 exit_status = ret;
2216 goto DONE;
2217 }
2218
2219 if( NULL != msg_ref ) {
2220 traffic_message_start(msg_ref,
2221 peer_ref,
2222 request,
2223 &(peer_ref->recv_init_list),
2224 NULL);
2225 }
2226 }
2227 /*
2228 * Else peer is not known
2229 */
2230 else {
2231 if( OMPI_SUCCESS != (ret = traffic_message_find(&(unknown_persist_recv_list),
2232 breq->req_count,
2233 breq->req_tag,
2234 INVALID_INT,
2235 breq->req_comm->c_contextid,
2236 tmp_ddt_size,
2237 &msg_ref,
2238 PERSIST_MARKER
2239 ) ) ) {
2240 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2241 "crcp:bkmrk: pml_start(): Unable to find the proper (recv) message ref for this recv\n");
2242 exit_status = ret;
2243 goto DONE;
2244 }
2245
2246 if( NULL != msg_ref ) {
2247 traffic_message_start(msg_ref,
2248 NULL,
2249 request,
2250 &(unknown_persist_recv_list),
2251 NULL);
2252 }
2253 }
2254
2255 if( NULL == msg_ref ) {
2256 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_start(): Could not find message ref");
2257 exit_status = OMPI_ERROR;
2258 goto DONE;
2259 }
2260
2261 DONE:
2262 return exit_status;
2263 }
2264
ompi_crcp_bkmrk_request_complete_irecv_init(struct ompi_request_t * request,ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref,int src,int tag,int tmp_ddt_size)2265 static int ompi_crcp_bkmrk_request_complete_irecv_init(struct ompi_request_t *request,
2266 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
2267 int src, int tag, int tmp_ddt_size)
2268 {
2269 int ret, exit_status = OMPI_SUCCESS;
2270 mca_pml_base_request_t *breq = NULL;
2271 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL, *new_msg_ref = NULL;
2272 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
2273
2274 breq = (mca_pml_base_request_t *)request;
2275
2276 /*
2277 * Check the irecv_init list
2278 */
2279 if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->recv_init_list),
2280 breq->req_count,
2281 tag, src,
2282 breq->req_comm->c_contextid,
2283 tmp_ddt_size,
2284 &msg_ref,
2285 FIND_MSG_TRUE
2286 ) ) ) {
2287 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2288 "crcp:bkmrk: req_complete: Unable to find the proper (recv_init) message ref for this complete\n");
2289 exit_status = ret;
2290 goto DONE;
2291 }
2292
2293 /*
2294 * If not found, check the unknown_irecv_list
2295 */
2296 if( NULL == msg_ref ) {
2297 if( OMPI_SUCCESS != (ret = traffic_message_find(&(unknown_persist_recv_list),
2298 breq->req_count,
2299 tag,
2300 INVALID_INT,
2301 breq->req_comm->c_contextid,
2302 tmp_ddt_size,
2303 &msg_ref,
2304 FIND_MSG_TRUE
2305 ) ) ) {
2306 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2307 "crcp:bkmrk: requ_complete: Unable to find the proper (recv_init) message ref for this complete\n");
2308 exit_status = ret;
2309 goto DONE;
2310 }
2311
2312 if( NULL != msg_ref ) {
2313 traffic_message_move(msg_ref,
2314 COORD_MSG_TYPE_P_RECV,
2315 NULL, &(unknown_persist_recv_list),
2316 peer_ref, &(peer_ref->recv_init_list),
2317 &new_msg_ref,
2318 true,
2319 false);
2320 msg_ref = new_msg_ref;
2321 }
2322 }
2323
2324 /*
2325 * If still not found, then we must have completed this already
2326 */
2327 if( NULL == msg_ref ) {
2328 /*
2329 * It is possible that we did not 'find' the message because
2330 * we could have previously marked it as done. Due to the
2331 * logic in the Request Wait/Test routines we could
2332 * receive multiple request complete calls for the
2333 * same request.
2334 *
2335 * It is possible that we have 'completed' this message previously,
2336 * so this case can occur during normal operation.
2337 * This is caused by us checking for completeness twice in ompi_request_wait_all.
2338 */
2339 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
2340 "crcp:bkmrk: request_complete: No match found for this request :( %d, %d ): [%d/%d,%d]\n",
2341 peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd,
2342 breq->req_peer, src, breq->req_comm->c_contextid));
2343 exit_status = OMPI_SUCCESS;
2344 goto DONE;
2345 }
2346
2347 /*
2348 * Mark request as inactive
2349 * Only increment the total count if this was not accounted for in the last checkpoint
2350 */
2351 traffic_message_find_mark_persistent(msg_ref, &request,
2352 true, /* Find currently active */
2353 false, /* Mark as inactive */
2354 &content_ref);
2355 if( NULL == content_ref ) {
2356 exit_status = OMPI_ERROR;
2357 goto DONE;
2358 }
2359
2360 if( !content_ref->already_drained ) {
2361 peer_ref->total_msgs_recvd += 1;
2362 msg_ref->done++;
2363 msg_ref->active--;
2364 } else {
2365 msg_ref->active_drain--;
2366 content_ref->already_drained = false;
2367 }
2368
2369 /* Do not return the content_ref, persistent sends re-use these */
2370
2371 if( NULL == new_msg_ref ) {
2372 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Request Complete (Recv_Init) --", true));
2373 } else {
2374 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Request Complete (Recv_init - Unknown) --", true));
2375 }
2376
2377 OPAL_OUTPUT_VERBOSE((25, mca_crcp_bkmrk_component.super.output_handle,
2378 "crcp:bkmrk: req_complete: Marked Message... ( %d, %d )\n",
2379 peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd));
2380 DONE:
2381 return exit_status;
2382 }
2383
ompi_crcp_bkmrk_pml_irecv(void * buf,size_t count,ompi_datatype_t * datatype,int src,int tag,struct ompi_communicator_t * comm,struct ompi_request_t ** request,ompi_crcp_base_pml_state_t * pml_state)2384 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_irecv(
2385 void *buf, size_t count,
2386 ompi_datatype_t *datatype,
2387 int src, int tag,
2388 struct ompi_communicator_t* comm,
2389 struct ompi_request_t **request,
2390 ompi_crcp_base_pml_state_t* pml_state )
2391 {
2392 int ret, exit_status = OMPI_SUCCESS;
2393 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
2394 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL;
2395 ompi_crcp_bkmrk_pml_state_t *coord_state = NULL;
2396 bool found_drain = false;
2397
2398 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
2399 "crcp:bkmrk: pml_irecv()"));
2400
2401 /*
2402 * Before PML Call
2403 * - Determine if this can be satisfied from the drained list
2404 * - Otherwise create a new reference to it so we can track it
2405 */
2406 if( OMPI_CRCP_PML_PRE == pml_state->state) {
2407 /*
2408 * Check to see if this message is in the drained message list
2409 */
2410 found_drain = false;
2411 if( OMPI_SUCCESS != (ret = drain_message_check_recv(buf, count, datatype,
2412 &src, &tag, comm, request, NULL,
2413 &found_drain) ) ) {
2414 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_recv(): Failed trying to find a drained message.");
2415 exit_status = ret;
2416 goto DONE;
2417 }
2418
2419 if( found_drain ) {
2420 /* Do *not* increment:
2421 * peer_ref->total_msgs_recvd += 1;
2422 * Because we accounted for this message during the last checkpoint.
2423 */
2424
2425 /* This will identify to the wrapper that this message is complete */
2426 pml_state->state = OMPI_CRCP_PML_DONE;
2427 pml_state->error_code = OMPI_SUCCESS;
2428 return pml_state;
2429 }
2430 /*
2431 * Otherwise the message is not drained (common case)
2432 */
2433 else {
2434 /*
2435 * Find the Peer
2436 */
2437 if( MPI_ANY_SOURCE == src || src < 0) {
2438 /*
2439 * Archive the message Message Object
2440 */
2441 traffic_message_append(NULL, &(unknown_recv_from_list),
2442 COORD_MSG_TYPE_I_RECV,
2443 count, datatype, 0, tag, src, comm,
2444 &msg_ref);
2445
2446 CREATE_COORD_STATE(coord_state, pml_state,
2447 NULL, msg_ref);
2448 }
2449 else {
2450 if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, src, &peer_ref) ) ){
2451 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2452 "crcp:bkmrk: pml_irecv(): Failed to find peer_ref\n");
2453 exit_status = ret;
2454 goto DONE;
2455 }
2456
2457 /*
2458 * Archive the message Message Object
2459 */
2460 traffic_message_append(peer_ref, &(peer_ref->irecv_list),
2461 COORD_MSG_TYPE_I_RECV,
2462 count, datatype, 0, tag, src, comm,
2463 &msg_ref);
2464
2465 CREATE_COORD_STATE(coord_state, pml_state,
2466 peer_ref, msg_ref);
2467 }
2468
2469 coord_state->p_super.error_code = OMPI_SUCCESS;
2470 return &coord_state->p_super;
2471 }
2472 }
2473 /*
2474 * Post PML Call
2475 * - bookkeeping...
2476 */
2477 else if( OMPI_CRCP_PML_POST == pml_state->state) {
2478 ompi_crcp_base_pml_state_t *rtn_state = NULL;
2479 ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL;
2480
2481 EXTRACT_COORD_STATE(pml_state, coord_state, rtn_state,
2482 peer_ref, msg_ref);
2483
2484 /*
2485 * Do the update
2486 */
2487 HOKE_CONTENT_REF_ALLOC(new_content);
2488 new_content->buffer = NULL; /* No tracked */
2489 new_content->request = *request;
2490 new_content->done = false;
2491 new_content->active = true;
2492 new_content->already_posted = true;
2493 new_content->already_drained = false;
2494 OBJ_RETAIN(*request);
2495 opal_list_append(&(msg_ref->msg_contents), &(new_content->super) );
2496
2497 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (irecv) --", true));
2498
2499 CRCP_COORD_STATE_RETURN(coord_state);
2500
2501 rtn_state->error_code = OMPI_SUCCESS;
2502 return rtn_state;
2503 }
2504
2505 DONE:
2506 pml_state->error_code = exit_status;
2507 return pml_state;
2508 }
2509
ompi_crcp_bkmrk_request_complete_irecv(struct ompi_request_t * request,ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref,int src,int tag,int tmp_ddt_size)2510 static int ompi_crcp_bkmrk_request_complete_irecv(struct ompi_request_t *request,
2511 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
2512 int src, int tag, int tmp_ddt_size)
2513 {
2514 int ret, exit_status = OMPI_SUCCESS;
2515 mca_pml_base_request_t *breq = NULL;
2516 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL, *new_msg_ref = NULL;
2517 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
2518
2519 breq = (mca_pml_base_request_t *)request;
2520
2521 /*
2522 * Check the irecv list
2523 */
2524 if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->irecv_list),
2525 breq->req_count,
2526 tag, src,
2527 breq->req_comm->c_contextid,
2528 tmp_ddt_size,
2529 &msg_ref,
2530 FIND_MSG_TRUE
2531 ) ) ) {
2532 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2533 "crcp:bkmrk: req_complete: Unable to find the proper (irecv) message ref for this complete\n");
2534 exit_status = ret;
2535 goto DONE;
2536 }
2537
2538 /*
2539 * If not found, try the unknown_irecv_list
2540 */
2541 if( NULL == msg_ref ) {
2542 if( OMPI_SUCCESS != (ret = traffic_message_find(&(unknown_recv_from_list),
2543 breq->req_count,
2544 tag,
2545 INVALID_INT,
2546 breq->req_comm->c_contextid,
2547 tmp_ddt_size,
2548 &msg_ref,
2549 FIND_MSG_TRUE
2550 ) ) ) {
2551 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2552 "crcp:bkmrk: req_complete: Unable to find the proper (recv_init) message ref for this complete\n");
2553 exit_status = ret;
2554 goto DONE;
2555 }
2556
2557 if( NULL != msg_ref ) {
2558 traffic_message_move(msg_ref,
2559 COORD_MSG_TYPE_I_RECV,
2560 NULL, &(unknown_recv_from_list),
2561 peer_ref, &(peer_ref->irecv_list),
2562 &new_msg_ref,
2563 true,
2564 true);
2565 msg_ref = new_msg_ref;
2566 }
2567 }
2568
2569 /*
2570 * If still not found, then must have completed this twice
2571 */
2572 if( NULL == msg_ref ) {
2573 /*
2574 * It is possible that we did not 'find' the message because
2575 * we could have previously marked it as done. Due to the
2576 * logic in the Request Wait/Test routines we could
2577 * receive multiple request complete calls for the
2578 * same request.
2579 *
2580 * It is possible that we have 'completed' this message previously,
2581 * so this case can occur during normal operation.
2582 * This is caused by us checking for completeness twice in ompi_request_wait_all.
2583 */
2584 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
2585 "crcp:bkmrk: request_complete: No match found for this request :( %d, %d ): [%d/%d,%d]\n",
2586 peer_ref->total_msgs_sent, peer_ref->total_msgs_recvd,
2587 breq->req_peer, src, breq->req_comm->c_contextid));
2588 exit_status = OMPI_SUCCESS;
2589 goto DONE;
2590 }
2591
2592 /* Strip off an irecv request
2593 * Only increment the total count if this was not accounted for in the last checkpoint
2594 */
2595 traffic_message_grab_content(msg_ref, &content_ref, true, true); /* Remove, prefer already_drained */
2596
2597 if( !content_ref->already_drained ) {
2598 peer_ref->total_msgs_recvd += 1;
2599 msg_ref->done++;
2600 msg_ref->active--;
2601 } else {
2602 msg_ref->active_drain--;
2603 content_ref->already_drained = false;
2604 }
2605
2606 HOKE_CONTENT_REF_RETURN(content_ref);
2607
2608 if( NULL == new_msg_ref ) {
2609 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Request Complete (iRecv) --", true));
2610 } else {
2611 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Request Complete (iRecv - Unknown) --", true));
2612 }
2613
2614 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
2615 "crcp:bkmrk: req_complete: Matched an iRecv: total = %d",
2616 peer_ref->total_msgs_recvd));
2617
2618 DONE:
2619 return exit_status;
2620 }
2621
ompi_crcp_bkmrk_pml_recv(void * buf,size_t count,ompi_datatype_t * datatype,int src,int tag,struct ompi_communicator_t * comm,ompi_status_public_t * status,ompi_crcp_base_pml_state_t * pml_state)2622 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_recv(
2623 void *buf, size_t count,
2624 ompi_datatype_t *datatype,
2625 int src, int tag,
2626 struct ompi_communicator_t* comm,
2627 ompi_status_public_t* status,
2628 ompi_crcp_base_pml_state_t* pml_state)
2629 {
2630 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
2631 ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref = NULL, *new_msg_ref = NULL;
2632 ompi_crcp_bkmrk_pml_state_t *coord_state = NULL;
2633 bool found_drain = false;
2634 int exit_status = OMPI_SUCCESS;
2635 int ret;
2636
2637 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
2638 "crcp:bkmrk: pml_recv()"));
2639
2640 /*
2641 * Before PML Call
2642 * - Determine if this can be satisfied from the drained list
2643 * - Otherwise create a new reference to it so we can track it
2644 */
2645 if( OMPI_CRCP_PML_PRE == pml_state->state) {
2646 /*
2647 * Check to see if this message is in the drained message list
2648 */
2649 found_drain = false;
2650 if( OMPI_SUCCESS != (ret = drain_message_check_recv(buf, count, datatype,
2651 &src, &tag, comm, NULL, &status,
2652 &found_drain) ) ) {
2653 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: pml_recv(): Failed trying to find a drained message.");
2654 exit_status = ret;
2655 goto DONE;
2656 }
2657
2658 if( found_drain ) {
2659 /* Do *not* increment:
2660 * peer_ref->total_msgs_recvd += 1;
2661 * Because we accounted for this message during the last checkpoint.
2662 */
2663
2664 /* This will identify to the wrapper that this message is complete */
2665 pml_state->state = OMPI_CRCP_PML_DONE;
2666 pml_state->error_code = OMPI_SUCCESS;
2667 return pml_state;
2668 }
2669 /*
2670 * Otherwise the message is not drained (common case)
2671 */
2672 else {
2673 /*
2674 * Find the Peer
2675 */
2676 if( MPI_ANY_SOURCE == src || src < 0) {
2677 traffic_message_append(NULL, &(unknown_recv_from_list),
2678 COORD_MSG_TYPE_B_RECV,
2679 count, datatype, 0, tag, src, comm,
2680 &msg_ref);
2681
2682 CREATE_COORD_STATE(coord_state, pml_state,
2683 NULL, msg_ref);
2684 }
2685 else {
2686 if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, src, &peer_ref) ) ){
2687 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2688 "crcp:bkmrk: pml_recv(): Failed to find peer_ref\n");
2689 exit_status = ret;
2690 goto DONE;
2691 }
2692
2693 traffic_message_append(peer_ref, &(peer_ref->recv_list),
2694 COORD_MSG_TYPE_B_RECV,
2695 count, datatype, 0, tag, src, comm,
2696 &msg_ref);
2697
2698 CREATE_COORD_STATE(coord_state, pml_state,
2699 peer_ref, msg_ref);
2700 }
2701
2702 /* Bookkeeping */
2703 current_msg_id = msg_ref->msg_id;
2704 current_msg_type = COORD_MSG_TYPE_B_RECV;
2705
2706 coord_state->p_super.error_code = OMPI_SUCCESS;
2707 return &coord_state->p_super;
2708 }
2709 }
2710 /*
2711 * Post PML Call
2712 * - bookkeeping...
2713 */
2714 else if( OMPI_CRCP_PML_POST == pml_state->state) {
2715 ompi_crcp_base_pml_state_t *rtn_state = NULL;
2716
2717 EXTRACT_COORD_STATE(pml_state, coord_state, rtn_state,
2718 peer_ref, msg_ref);
2719
2720 /*
2721 * If MPI_ANY_SOUCE, then move the message from the unknown list
2722 * to the list associated with the resolved process.
2723 */
2724 if( NULL == peer_ref ) {
2725 src = status->MPI_SOURCE;
2726
2727 if( OMPI_SUCCESS != (ret = find_peer_in_comm(comm, src, &peer_ref) ) ){
2728 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2729 "crcp:bkmrk: pml_recv(): Failed to resolve peer_ref (rank %d)\n",
2730 src);
2731 exit_status = ret;
2732 goto DONE;
2733 }
2734
2735 traffic_message_move(msg_ref,
2736 COORD_MSG_TYPE_B_RECV,
2737 NULL, &(unknown_recv_from_list),
2738 peer_ref, &(peer_ref->recv_list),
2739 &new_msg_ref,
2740 false,
2741 true);
2742 new_msg_ref->done++;
2743 new_msg_ref->active--;
2744 } else {
2745 /*
2746 * Do the update
2747 */
2748 msg_ref->done++;
2749 msg_ref->active--;
2750 }
2751
2752 peer_ref->total_msgs_recvd += 1;
2753 current_msg_id = 0;
2754 current_msg_type = COORD_MSG_TYPE_UNKNOWN;
2755
2756 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "Recv Done", true));
2757
2758 CRCP_COORD_STATE_RETURN(coord_state);
2759
2760 rtn_state->error_code = OMPI_SUCCESS;
2761 return rtn_state;
2762 }
2763
2764 DONE:
2765 pml_state->error_code = exit_status;
2766 return pml_state;
2767 }
2768
2769
2770 /**************** Start *****************/
2771 /* Start is connected to irecv_start or isend_start */
2772 static ompi_request_type_t * coord_start_req_types = NULL;
2773
ompi_crcp_bkmrk_pml_start(size_t count,ompi_request_t ** requests,ompi_crcp_base_pml_state_t * pml_state)2774 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_start(
2775 size_t count,
2776 ompi_request_t** requests,
2777 ompi_crcp_base_pml_state_t* pml_state )
2778 {
2779 int ret, exit_status = OMPI_SUCCESS;
2780 mca_pml_base_request_t *breq = NULL;
2781 size_t tmp_ddt_size = 0;
2782 size_t iter_req;
2783 bool found_drain = false;
2784
2785 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
2786 "crcp:bkmrk: pml_start()"));
2787
2788 /*
2789 * Handle all start() on send requests
2790 */
2791 if( OMPI_CRCP_PML_POST == pml_state->state ) {
2792 for(iter_req = 0; iter_req < count; iter_req++) {
2793 breq = (mca_pml_base_request_t *)requests[iter_req];
2794 if(breq->req_type == MCA_PML_REQUEST_SEND ) {
2795 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_pml_start_isend_init(&(requests[iter_req]))) ) {
2796 exit_status = ret;
2797 goto DONE;
2798 }
2799 }
2800 }
2801 }
2802
2803 /*
2804 * Handle all start() on recv requests
2805 * - Pre: Check drain queue for a match
2806 * - Post: Start the message, unless drained
2807 */
2808 if( OMPI_CRCP_PML_PRE == pml_state->state ) {
2809 /*
2810 * Mark all saved requests as NOOP
2811 */
2812 coord_start_req_types = (ompi_request_type_t *)malloc(sizeof(ompi_request_type_t) * count);
2813 for(iter_req = 0; iter_req < count; iter_req++) {
2814 coord_start_req_types[iter_req] = OMPI_REQUEST_NOOP;
2815 }
2816
2817 for(iter_req = 0; iter_req < count; iter_req++) {
2818 breq = (mca_pml_base_request_t *)requests[iter_req];
2819 ompi_datatype_type_size(breq->req_datatype, &tmp_ddt_size);
2820
2821 if( breq->req_type == MCA_PML_REQUEST_RECV ) {
2822 found_drain = false;
2823 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_pml_start_drain_irecv_init(&(requests[iter_req]), &found_drain)) ) {
2824 exit_status = ret;
2825 goto DONE;
2826 }
2827
2828 if( found_drain ) {
2829 coord_start_req_types[iter_req] = requests[iter_req]->req_type;
2830 requests[iter_req]->req_type = OMPI_REQUEST_NOOP;
2831 requests[iter_req]->req_complete = true;
2832 }
2833 }
2834 }
2835 goto DONE;
2836 }
2837 else if( OMPI_CRCP_PML_POST == pml_state->state) {
2838 for(iter_req = 0; iter_req < count; iter_req++) {
2839 breq = (mca_pml_base_request_t *)requests[iter_req];
2840 ompi_datatype_type_size(breq->req_datatype, &tmp_ddt_size);
2841
2842 if (breq->req_type == MCA_PML_REQUEST_RECV) {
2843 /*
2844 * If this was a drained message it will have it's type set to
2845 * OMPI_REQUEST_NOOP so the PML does not try to start it again.
2846 * So we need to replace it with the original type, but can
2847 * skip starting it.
2848 */
2849 if( NULL != coord_start_req_types ) {
2850 if( OMPI_REQUEST_NOOP != coord_start_req_types[iter_req] ) {
2851 requests[iter_req]->req_type = coord_start_req_types[iter_req];
2852 continue;
2853 }
2854 }
2855
2856 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_pml_start_irecv_init(&(requests[iter_req]))) ) {
2857 exit_status = ret;
2858 goto DONE;
2859 }
2860 }
2861 }
2862
2863 /*
2864 * Clear out the temporary drain type structure.
2865 */
2866 if( NULL != coord_start_req_types ) {
2867 free(coord_start_req_types);
2868 coord_start_req_types = NULL;
2869 }
2870 }
2871
2872 DONE:
2873 pml_state->error_code = exit_status;
2874 return pml_state;
2875 }
2876
2877 /**************** Request Completed ********/
ompi_crcp_bkmrk_request_complete(struct ompi_request_t * request)2878 int ompi_crcp_bkmrk_request_complete(struct ompi_request_t *request)
2879 {
2880 int ret, exit_status = OMPI_SUCCESS;
2881 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
2882 mca_pml_base_request_t *breq;
2883 size_t tmp_ddt_size = 0;
2884 int src, tag;
2885
2886 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
2887 "crcp:bkmrk: pml_request_complete()"));
2888
2889 /*
2890 * Extract & check the PML version of the request
2891 */
2892 breq = (mca_pml_base_request_t *)request;
2893
2894 if( (breq->req_type != MCA_PML_REQUEST_SEND &&
2895 breq->req_type != MCA_PML_REQUEST_RECV ) || /* JJH YYY -- req_state = OMPI_REQUEST_INACTIVE ??? */
2896 request->req_type == OMPI_REQUEST_NOOP ||
2897 request->req_type == OMPI_REQUEST_NULL) {
2898 exit_status = OMPI_SUCCESS;
2899 goto DONE;
2900 }
2901
2902 /* Extract source/tag/ddt_size */
2903 src = breq->req_peer;
2904 tag = breq->req_tag;
2905 ompi_datatype_type_size(breq->req_datatype, &tmp_ddt_size);
2906
2907 /*
2908 * Find the peer reference
2909 */
2910 if( MPI_ANY_SOURCE == src ) {
2911 if( OMPI_SUCCESS != (ret = find_peer_in_comm(breq->req_comm, request->req_status.MPI_SOURCE, &peer_ref) ) ){
2912 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2913 "crcp:bkmrk: req_complete(): Failed to find peer_ref\n");
2914 exit_status = ret;
2915 goto DONE;
2916 }
2917 } else {
2918 if( OMPI_SUCCESS != (ret = find_peer_in_comm(breq->req_comm, src, &peer_ref) ) ){
2919 opal_output(mca_crcp_bkmrk_component.super.output_handle,
2920 "crcp:bkmrk: req_complete(): Failed to find peer_ref\n");
2921 exit_status = ret;
2922 goto DONE;
2923 }
2924 }
2925
2926 /*******************************
2927 * A send request is completing
2928 ******************************/
2929 if(breq->req_type == MCA_PML_REQUEST_SEND ) {
2930 /*
2931 * ISEND Case:
2932 */
2933 if( false == request->req_persistent ) {
2934 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_request_complete_isend(request, peer_ref,
2935 src, tag, tmp_ddt_size) ) ) {
2936 exit_status = ret;
2937 goto DONE;
2938 }
2939 }
2940 /*
2941 * SEND_INIT/START Case
2942 */
2943 else {
2944 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_request_complete_isend_init(request, peer_ref,
2945 src, tag, tmp_ddt_size) ) ) {
2946 exit_status = ret;
2947 goto DONE;
2948 }
2949 }
2950 }
2951 /***********************************
2952 * A receive request is completing
2953 ***********************************/
2954 else if(breq->req_type == MCA_PML_REQUEST_RECV) {
2955 /*
2956 * IRECV Case:
2957 */
2958 if( false == request->req_persistent ) {
2959 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_request_complete_irecv(request, peer_ref,
2960 src, tag, tmp_ddt_size) ) ) {
2961 exit_status = ret;
2962 goto DONE;
2963 }
2964 }
2965 /*
2966 * IRECV_INIT/START Case:
2967 */
2968 else {
2969 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_request_complete_irecv_init(request, peer_ref,
2970 src, tag, tmp_ddt_size) ) ) {
2971 exit_status = ret;
2972 goto DONE;
2973 }
2974 }
2975 }
2976
2977 DONE:
2978 return exit_status;
2979 }
2980
2981 /**************** FT Event *****************/
ompi_crcp_bkmrk_pml_quiesce_start(ompi_crcp_bkmrk_pml_quiesce_tag_type_t tag)2982 int ompi_crcp_bkmrk_pml_quiesce_start(ompi_crcp_bkmrk_pml_quiesce_tag_type_t tag ) {
2983 int ret, exit_status = OMPI_SUCCESS;
2984
2985 if( OMPI_SUCCESS != (ret = ft_event_coordinate_peers()) ) {
2986 exit_status = ret;
2987 }
2988
2989 return exit_status;
2990 }
2991
ompi_crcp_bkmrk_pml_quiesce_end(ompi_crcp_bkmrk_pml_quiesce_tag_type_t tag)2992 int ompi_crcp_bkmrk_pml_quiesce_end(ompi_crcp_bkmrk_pml_quiesce_tag_type_t tag ) {
2993 int ret, exit_status = OMPI_SUCCESS;
2994
2995 if( OMPI_SUCCESS != (ret = ft_event_finalize_exchange() ) ) {
2996 exit_status = ret;
2997 }
2998
2999 return exit_status;
3000 }
3001
ompi_crcp_bkmrk_pml_ft_event(int state,ompi_crcp_base_pml_state_t * pml_state)3002 ompi_crcp_base_pml_state_t* ompi_crcp_bkmrk_pml_ft_event(
3003 int state,
3004 ompi_crcp_base_pml_state_t* pml_state)
3005 {
3006 static int step_to_return_to = 0;
3007 static bool first_continue_pass = false;
3008 opal_list_item_t* item = NULL;
3009 int exit_status = OMPI_SUCCESS;
3010 int ret;
3011
3012 ft_event_state = state;
3013
3014 if( step_to_return_to == 1 ) {
3015 goto STEP_1;
3016 }
3017
3018 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
3019 "crcp:bkmrk: pml_ft_event()"));
3020
3021 /**************************
3022 * Prepare for a Checkpoint
3023 **************************/
3024 if(OPAL_CRS_CHECKPOINT == state) {
3025 if( OMPI_CRCP_PML_PRE != pml_state->state){
3026 goto DONE;
3027 }
3028
3029 if( opal_cr_timing_barrier_enabled ) {
3030 OPAL_CR_SET_TIMER(OPAL_CR_TIMER_CRCPBR0);
3031 if( OMPI_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
3032 exit_status = ret;
3033 goto DONE;
3034 }
3035 }
3036 OPAL_CR_SET_TIMER(OPAL_CR_TIMER_CRCP0);
3037
3038 START_TIMER(CRCP_TIMER_TOTAL_CKPT);
3039 STEP_1:
3040 step_to_return_to = 0;
3041
3042 /* Coordinate Peers:
3043 * When we return from this function we know that all of our
3044 * channels have been flushed.
3045 */
3046 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_pml_quiesce_start(QUIESCE_TAG_CKPT)) ) {
3047 opal_output(mca_crcp_bkmrk_component.super.output_handle,
3048 "crcp:bkmrk: %s ft_event: Checkpoint Coordination Failed %d",
3049 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
3050 ret);
3051 exit_status = ret;
3052 goto DONE;
3053 }
3054
3055 if( stall_for_completion ) {
3056 stall_for_completion = false;
3057 opal_cr_stall_check = true;
3058 step_to_return_to = 1;
3059
3060 exit_status = OMPI_EXISTS;
3061 goto DONE_STALL;
3062 }
3063 END_TIMER(CRCP_TIMER_TOTAL_CKPT);
3064
3065 DISPLAY_ALL_TIMERS(state);
3066 clear_timers();
3067 }
3068 /*****************************
3069 * Continue after a checkpoint
3070 ******************************/
3071 else if(OPAL_CRS_CONTINUE == state) {
3072 if( OMPI_CRCP_PML_POST != pml_state->state){
3073 goto DONE;
3074 }
3075
3076 first_continue_pass = !first_continue_pass;
3077
3078 /* Only finalize the Protocol after the PML has been rebuilt */
3079 if (opal_cr_continue_like_restart && first_continue_pass) {
3080 goto DONE;
3081 }
3082
3083 START_TIMER(CRCP_TIMER_TOTAL_CONT);
3084
3085 /*
3086 * Finish the coord protocol
3087 */
3088 if( OMPI_SUCCESS != (ret = ompi_crcp_bkmrk_pml_quiesce_end(QUIESCE_TAG_CONTINUE) ) ) {
3089 opal_output(mca_crcp_bkmrk_component.super.output_handle,
3090 "crcp:bkmrk: pml_ft_event: Checkpoint Finalization Failed %d",
3091 ret);
3092 exit_status = ret;
3093 goto DONE;
3094 }
3095 END_TIMER(CRCP_TIMER_TOTAL_CONT);
3096
3097 DISPLAY_ALL_TIMERS(state);
3098 clear_timers();
3099
3100 if( opal_cr_timing_barrier_enabled ) {
3101 OPAL_CR_SET_TIMER(OPAL_CR_TIMER_COREBR1);
3102 if( OMPI_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
3103 exit_status = ret;
3104 goto DONE;
3105 }
3106 }
3107 OPAL_CR_SET_TIMER(OPAL_CR_TIMER_CORE2);
3108 }
3109 /*****************************
3110 * Restart from a checkpoint
3111 *****************************/
3112 else if(OPAL_CRS_RESTART == state) {
3113 if( OMPI_CRCP_PML_POST != pml_state->state){
3114 goto DONE;
3115 }
3116
3117 START_TIMER(CRCP_TIMER_TOTAL_RST);
3118 /*
3119 * Refresh the jobids
3120 */
3121 for(item = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
3122 item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
3123 item = opal_list_get_next(item) ) {
3124 ompi_crcp_bkmrk_pml_peer_ref_t *cur_peer_ref;
3125 cur_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
3126
3127 /* JJH - Assuming only one global jobid at the moment */
3128 cur_peer_ref->proc_name.jobid = OMPI_PROC_MY_NAME->jobid;
3129 }
3130
3131 /*
3132 * Finish the coord protocol
3133 */
3134 if( OMPI_SUCCESS != (ret = ft_event_finalize_exchange() ) ) {
3135 opal_output(mca_crcp_bkmrk_component.super.output_handle,
3136 "crcp:bkmrk: pml_ft_event: Checkpoint Finalization Failed %d",
3137 ret);
3138 exit_status = ret;
3139 goto DONE;
3140 }
3141
3142 END_TIMER(CRCP_TIMER_TOTAL_RST);
3143
3144 DISPLAY_ALL_TIMERS(state);
3145 clear_timers();
3146 }
3147 /*****************************
3148 * Terminating the process post checkpoint
3149 *****************************/
3150 else if(OPAL_CRS_TERM == state ) {
3151 goto DONE;
3152 }
3153 /****************************
3154 * Reached an error
3155 ****************************/
3156 else {
3157 goto DONE;
3158 }
3159
3160 DONE:
3161 step_to_return_to = 0;
3162 ft_event_state = OPAL_CRS_RUNNING;
3163
3164 DONE_STALL:
3165 pml_state->error_code = exit_status;
3166 return pml_state;
3167 }
3168
3169 /******************
3170 * Local Utility functions
3171 ******************/
3172
3173 /************************************************
3174 * Traffic Message Utility Functions
3175 ************************************************/
traffic_message_append(ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref,opal_list_t * append_list,ompi_crcp_bkmrk_pml_message_type_t msg_type,size_t count,ompi_datatype_t * datatype,size_t in_ddt_size,int tag,int dest,struct ompi_communicator_t * comm,ompi_crcp_bkmrk_pml_traffic_message_ref_t ** msg_ref)3176 static int traffic_message_append(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
3177 opal_list_t * append_list,
3178 ompi_crcp_bkmrk_pml_message_type_t msg_type,
3179 size_t count,
3180 ompi_datatype_t *datatype,
3181 size_t in_ddt_size,
3182 int tag,
3183 int dest,
3184 struct ompi_communicator_t* comm,
3185 ompi_crcp_bkmrk_pml_traffic_message_ref_t **msg_ref)
3186 {
3187 int ret, exit_status = OMPI_SUCCESS;
3188 size_t ddt_size = 0;
3189
3190 if( NULL != datatype ) {
3191 ompi_datatype_type_size(datatype,
3192 &ddt_size);
3193 } else {
3194 ddt_size = in_ddt_size;
3195 /* ddt_size = 0; */
3196 }
3197
3198 /*
3199 * Determine if message is currently in the list
3200 * - If it is then increment the count.
3201 * - ow add it to the list
3202 */
3203 if( OMPI_SUCCESS != (ret = traffic_message_find(append_list,
3204 count, tag, dest,
3205 comm->c_contextid,
3206 ddt_size,
3207 msg_ref,
3208 FIND_MSG_UNKNOWN /* Active? */
3209 ) ) ) {
3210 opal_output(mca_crcp_bkmrk_component.super.output_handle,
3211 "crcp:bkmrk: traffic_message_append: Unable to find the proper message reference.\n");
3212 return OMPI_ERROR;
3213 }
3214
3215 if( NULL != *msg_ref ) {
3216 if( msg_type == COORD_MSG_TYPE_P_SEND ||
3217 msg_type == COORD_MSG_TYPE_P_RECV ) {
3218 (*msg_ref)->posted++;
3219 } else {
3220 (*msg_ref)->active++;
3221 }
3222 } else {
3223 if( NULL != peer_ref ) {
3224 CREATE_NEW_MSG((*msg_ref), msg_type,
3225 count, ddt_size, tag, dest, comm,
3226 peer_ref->proc_name.jobid,
3227 peer_ref->proc_name.vpid);
3228 } else {
3229 CREATE_NEW_MSG((*msg_ref), msg_type,
3230 count, ddt_size, tag, dest, comm,
3231 ORTE_JOBID_INVALID, ORTE_VPID_INVALID);
3232 }
3233
3234 if( msg_type == COORD_MSG_TYPE_P_SEND ||
3235 msg_type == COORD_MSG_TYPE_P_RECV ) {
3236 (*msg_ref)->matched = 0;
3237 (*msg_ref)->done = 0;
3238 (*msg_ref)->active = 0;
3239 (*msg_ref)->posted = 1;
3240 } else {
3241 (*msg_ref)->matched = 0;
3242 (*msg_ref)->done = 0;
3243 (*msg_ref)->active = 1;
3244 (*msg_ref)->posted = 0;
3245 }
3246
3247 opal_list_append(append_list, &((*msg_ref)->super));
3248 }
3249
3250 if( NULL != peer_ref ) {
3251 if( msg_type == COORD_MSG_TYPE_B_SEND ) {
3252 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (send) --", true));
3253 }
3254 else if( msg_type == COORD_MSG_TYPE_P_SEND ) {
3255 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (send_init) --", true));
3256 }
3257 else if( msg_type == COORD_MSG_TYPE_B_RECV ) {
3258 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (recv) --", true));
3259 }
3260 else if( msg_type == COORD_MSG_TYPE_P_RECV ) {
3261 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (recv_init) --", true));
3262 }
3263 else if( msg_type == COORD_MSG_TYPE_I_SEND || msg_type == COORD_MSG_TYPE_I_RECV ) {
3264 ;
3265 }
3266 else {
3267 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Append Message (Unknown) --", true));
3268 }
3269 }
3270
3271 return exit_status;
3272 }
3273
traffic_message_start(ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref,ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref,ompi_request_t ** request,opal_list_t * peer_list,ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref)3274 static int traffic_message_start(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
3275 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
3276 ompi_request_t **request,
3277 opal_list_t * peer_list,
3278 ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref)
3279 {
3280 /* This is only called by persistent calls.
3281 * This will mark the current message as having one more active member.
3282 * There is still only one posted message. */
3283 msg_ref->active++;
3284
3285 traffic_message_find_mark_persistent(msg_ref, request,
3286 false, /* Find currently not active */
3287 true, /* Mark as active */
3288 content_ref);
3289 return OMPI_SUCCESS;
3290 }
3291
traffic_message_move(ompi_crcp_bkmrk_pml_traffic_message_ref_t * old_msg_ref,ompi_crcp_bkmrk_pml_message_type_t msg_type,ompi_crcp_bkmrk_pml_peer_ref_t * from_peer_ref,opal_list_t * from_list,ompi_crcp_bkmrk_pml_peer_ref_t * to_peer_ref,opal_list_t * to_list,ompi_crcp_bkmrk_pml_traffic_message_ref_t ** new_msg_ref,bool keep_active,bool remove)3292 static int traffic_message_move(ompi_crcp_bkmrk_pml_traffic_message_ref_t *old_msg_ref,
3293 ompi_crcp_bkmrk_pml_message_type_t msg_type,
3294 ompi_crcp_bkmrk_pml_peer_ref_t *from_peer_ref,
3295 opal_list_t * from_list,
3296 ompi_crcp_bkmrk_pml_peer_ref_t *to_peer_ref,
3297 opal_list_t * to_list,
3298 ompi_crcp_bkmrk_pml_traffic_message_ref_t **new_msg_ref,
3299 bool keep_active,
3300 bool remove)
3301 {
3302 int ret;
3303 ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL, *prev_content = NULL;
3304 ompi_request_t *request = NULL;
3305 bool loc_already_drained = false;
3306
3307 /* Append to the to_peer_ref */
3308 if( COORD_MSG_TYPE_B_RECV != msg_type ) {
3309 traffic_message_grab_content(old_msg_ref, &prev_content, remove, true); /* Remove, prefer already_drained */
3310 request = prev_content->request;
3311
3312 loc_already_drained = prev_content->already_drained;
3313
3314 if( remove ) {
3315 prev_content->request = NULL;
3316 HOKE_CONTENT_REF_RETURN(prev_content);
3317 }
3318 }
3319
3320 ret = traffic_message_append(to_peer_ref, to_list,
3321 old_msg_ref->msg_type,
3322 old_msg_ref->count,
3323 NULL,
3324 old_msg_ref->ddt_size,
3325 old_msg_ref->tag,
3326 old_msg_ref->rank,
3327 old_msg_ref->comm,
3328 new_msg_ref);
3329
3330 if( loc_already_drained ) {
3331 old_msg_ref->active_drain--;
3332 (*new_msg_ref)->active--; /* Undo the action from _append() */
3333 (*new_msg_ref)->active_drain++;
3334 } else {
3335 /* 'remove' from from_peer_ref */
3336 old_msg_ref->active--;
3337 }
3338
3339 if( msg_type == COORD_MSG_TYPE_P_SEND ||
3340 msg_type == COORD_MSG_TYPE_P_RECV ) {
3341 if( keep_active ) {
3342 (*new_msg_ref)->active++;
3343 }
3344 }
3345
3346 if( COORD_MSG_TYPE_B_RECV != msg_type && NULL == request ) {
3347 ERROR_SHOULD_NEVER_HAPPEN("Error: Must match a non-blocking send, and there is no matching request.");
3348 }
3349
3350 if( NULL != request ) {
3351 HOKE_CONTENT_REF_ALLOC(new_content);
3352 new_content->buffer = NULL;
3353 new_content->request = request;
3354 new_content->done = false;
3355 new_content->active = keep_active;
3356 new_content->already_posted = true;
3357 new_content->already_drained = loc_already_drained;
3358 OBJ_RETAIN(request);
3359 opal_list_append(&((*new_msg_ref)->msg_contents), &(new_content->super) );
3360 }
3361
3362 if( NULL == from_peer_ref && NULL != to_peer_ref ) {
3363 (*new_msg_ref)->proc_name.jobid = to_peer_ref->proc_name.jobid;
3364 (*new_msg_ref)->proc_name.vpid = to_peer_ref->proc_name.vpid;
3365 }
3366
3367 return ret;
3368 }
3369
traffic_message_find_mark_persistent(ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref,ompi_request_t ** request,bool cur_active,bool set_is_active,ompi_crcp_bkmrk_pml_message_content_ref_t ** c_ref)3370 static int traffic_message_find_mark_persistent(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
3371 ompi_request_t **request,
3372 bool cur_active,
3373 bool set_is_active,
3374 ompi_crcp_bkmrk_pml_message_content_ref_t **c_ref)
3375 {
3376 mca_pml_base_request_t * breq = NULL;
3377 opal_list_item_t* item = NULL;
3378
3379 breq = (mca_pml_base_request_t *)(*request);
3380
3381 for(item = opal_list_get_first(&(msg_ref->msg_contents));
3382 item != opal_list_get_end( &(msg_ref->msg_contents));
3383 item = opal_list_get_next(item) ) {
3384 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
3385 mca_pml_base_request_t * loc_breq = NULL;
3386
3387 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)item;
3388 loc_breq = (mca_pml_base_request_t *)(content_ref->request);
3389
3390 if( content_ref->active != cur_active ) {
3391 continue;
3392 }
3393 else if( loc_breq->req_sequence == breq->req_sequence ) {
3394 OPAL_OUTPUT_VERBOSE((25, mca_crcp_bkmrk_component.super.output_handle,
3395 "%s %8s Request [%d] (%s) %d : %d",
3396 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
3397 (set_is_active ? "Start" : (NULL != c_ref ? "Drain" : "Complete")),
3398 (int)msg_ref->msg_id,
3399 (content_ref->active ? "T" : "F"),
3400 (int)loc_breq->req_sequence,
3401 (int)breq->req_sequence));
3402
3403 content_ref->active = set_is_active;
3404 if( NULL != c_ref ) {
3405 *c_ref = content_ref;
3406 }
3407 break;
3408 }
3409 }
3410
3411 return OMPI_SUCCESS;
3412 }
3413
traffic_message_grab_content(ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref,ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref,bool remove,bool already_drained)3414 static int traffic_message_grab_content(ompi_crcp_bkmrk_pml_traffic_message_ref_t *msg_ref,
3415 ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref,
3416 bool remove,
3417 bool already_drained)
3418 {
3419 ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL;
3420 ompi_crcp_bkmrk_pml_message_content_ref_t *loc_content_ref = NULL;
3421 opal_list_item_t* item = NULL;
3422
3423 /*
3424 * If there is no request list, return NULL
3425 */
3426 if( 0 >= opal_list_get_size(&msg_ref->msg_contents) ) {
3427 return OMPI_SUCCESS;
3428 }
3429
3430 /*
3431 * Otherwise look though the list, and grab something 'already_drained' if
3432 * possible, otherwise just get the first element.
3433 */
3434 if( already_drained ) {
3435 item = opal_list_get_first(&(msg_ref->msg_contents));
3436 new_content = (ompi_crcp_bkmrk_pml_message_content_ref_t*)item;
3437 }
3438
3439 for(item = opal_list_get_first(&(msg_ref->msg_contents));
3440 item != opal_list_get_end( &(msg_ref->msg_contents));
3441 item = opal_list_get_next(item) ) {
3442 loc_content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)item;
3443
3444 if( !already_drained ) {
3445 TRAFFIC_MSG_DUMP_MSG_CONTENT_INDV(10, (loc_content_ref));
3446 }
3447
3448 if( loc_content_ref->already_drained == already_drained ) {
3449 new_content = (ompi_crcp_bkmrk_pml_message_content_ref_t*)item;
3450 break;
3451 }
3452 }
3453
3454 if( remove ) {
3455 opal_list_remove_item(&msg_ref->msg_contents, &(new_content->super));
3456 }
3457
3458 if( NULL != content_ref ) {
3459 *content_ref = new_content;
3460 } else if( remove && NULL != new_content ) {
3461 HOKE_CONTENT_REF_RETURN(new_content);
3462 }
3463
3464 return OMPI_SUCCESS;
3465 }
3466
traffic_message_create_drain_message(bool post_drain,int max_post,ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref,ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_msg_ref,int * num_posted)3467 static int traffic_message_create_drain_message(bool post_drain,
3468 int max_post,
3469 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
3470 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_msg_ref,
3471 int *num_posted)
3472 {
3473 ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref = NULL;
3474 ompi_crcp_bkmrk_pml_message_content_ref_t *new_content = NULL, *prev_content = NULL;
3475 int m_iter, m_total;
3476
3477 *num_posted = 0;
3478
3479 /*
3480 * Nothing to do here
3481 */
3482 if( NULL == (*posted_msg_ref) || max_post <= 0) {
3483 return OMPI_SUCCESS;
3484 }
3485
3486 /*
3487 * For each active message or if not active message then max_post, create a drain message
3488 */
3489 m_total = max_post;
3490 if( !post_drain && max_post > (*posted_msg_ref)->active ) {
3491 m_total = (*posted_msg_ref)->active;
3492 }
3493
3494 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
3495 "crcp:bkmrk: %s <-- %s "
3496 " --> Create Drain Msg: %s %4d = min(%4d / %4d)",
3497 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
3498 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
3499 (post_drain ? "Posting" : "Not Posting"),
3500 m_total, (*posted_msg_ref)->active, max_post ));
3501
3502 TRAFFIC_MSG_DUMP_MSG_INDV(10, ((*posted_msg_ref), "Drain", true));
3503
3504 /*
3505 * Get a drained message reference for this signature.
3506 */
3507 drain_message_append(peer_ref,
3508 COORD_MSG_TYPE_I_RECV,
3509 (*posted_msg_ref)->count,
3510 (*posted_msg_ref)->ddt_size,
3511 (*posted_msg_ref)->tag,
3512 (*posted_msg_ref)->rank,
3513 (*posted_msg_ref)->comm,
3514 &drain_msg_ref);
3515
3516 /*
3517 * Create a new message content for each message to be drained.
3518 */
3519 for(m_iter = 0; m_iter < m_total; ++m_iter) {
3520 new_content = NULL;
3521
3522 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
3523 "crcp:bkmrk: %s <-- %s "
3524 " \t--> Find Content: %s (%4d of %4d)",
3525 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
3526 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
3527 (post_drain ? "Posting" : "Not Posting"),
3528 m_iter, m_total));
3529
3530 /* Grab a request if there are any
3531 * - if we are posting, then we created a dummy message which will not
3532 * have any contents, so this is still valid.
3533 * - if we are not posting, and this is an iRecv, then we *must* find a content!
3534 */
3535 traffic_message_grab_content((*posted_msg_ref), &prev_content, false, false); /* Do not remove, No already drained */
3536 if( NULL != prev_content ) {
3537 prev_content->already_drained = true;
3538 }
3539
3540 /* YYY JJH YYY - Is this needed? */
3541 if( !post_drain && (*posted_msg_ref)->msg_type != COORD_MSG_TYPE_B_RECV ) {
3542 assert( NULL != prev_content );
3543 }
3544
3545 /* Decrementing active occurs when we stall in the Blocking Recv, do not do so here. */
3546 if( NULL != prev_content ) {
3547 (*posted_msg_ref)->active--;
3548 }
3549 (*posted_msg_ref)->active_drain++;
3550
3551 /* Create a new content for the drained message */
3552 HOKE_CONTENT_REF_ALLOC(new_content);
3553 new_content->buffer = NULL;
3554 if( NULL == prev_content ) {
3555 new_content->request = NULL;
3556 } else {
3557 new_content->request = prev_content->request;
3558 if( NULL != new_content->request ) {
3559 OBJ_RETAIN(new_content->request);
3560 }
3561 }
3562 opal_list_append(&(drain_msg_ref->msg_contents), &(new_content->super) );
3563
3564 if( !post_drain ) {
3565 new_content->done = false;
3566 new_content->active = true;
3567 new_content->already_posted = true;
3568 new_content->already_drained = true;
3569
3570 drain_msg_ref->active++;
3571 drain_msg_ref->already_posted++;
3572 } else {
3573 new_content->done = false;
3574 new_content->active = false;
3575 new_content->already_posted = false;
3576 new_content->already_drained = true;
3577
3578 /*
3579 * Put the true count here so we can properly match the drain.
3580 * The post_drained() will properly handle the packed datatype
3581 * by changing the count to (count * ddt_size).
3582 */
3583 ompi_datatype_duplicate(&(ompi_mpi_packed.dt), &(drain_msg_ref->datatype));
3584
3585 /* Create a buffer of the necessary type/size */
3586 if(drain_msg_ref->count > 0 ) {
3587 new_content->buffer = (void *) malloc(drain_msg_ref->count * drain_msg_ref->ddt_size);
3588 } else {
3589 new_content->buffer = (void *) malloc(1 * drain_msg_ref->ddt_size);
3590 }
3591
3592 /* JJH - Performance Optimization? - Post drained messages right away? */
3593 }
3594
3595 (*num_posted)++;
3596 }
3597
3598 peer_ref->total_drained_msgs += *num_posted;
3599
3600 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
3601 "crcp:bkmrk: %s <-- %s "
3602 "Added %d messages to the drained list (size = %d)",
3603 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
3604 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
3605 (*num_posted),
3606 (int)opal_list_get_size(&(peer_ref->drained_list)) ));
3607
3608 return OMPI_SUCCESS;
3609 }
3610
traffic_message_find_recv(ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref,int rank,uint32_t comm_id,int tag,size_t count,size_t datatype_size,ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_recv_msg_ref,ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_irecv_msg_ref,ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_precv_msg_ref,ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_unknown_recv_msg_ref,ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_unknown_precv_msg_ref)3611 static int traffic_message_find_recv(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
3612 int rank, uint32_t comm_id, int tag,
3613 size_t count, size_t datatype_size,
3614 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_recv_msg_ref,
3615 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_irecv_msg_ref,
3616 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_precv_msg_ref,
3617 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_unknown_recv_msg_ref,
3618 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** posted_unknown_precv_msg_ref)
3619 {
3620 int ret;
3621
3622 *posted_recv_msg_ref = NULL;
3623 *posted_irecv_msg_ref = NULL;
3624 *posted_precv_msg_ref = NULL;
3625 *posted_unknown_recv_msg_ref = NULL;
3626 *posted_unknown_precv_msg_ref = NULL;
3627
3628 /*
3629 * Check the recv_list
3630 */
3631 if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->recv_list),
3632 count, tag, INVALID_INT,
3633 comm_id, datatype_size,
3634 posted_recv_msg_ref,
3635 FIND_MSG_UNKNOWN) ) ) {
3636 opal_output(mca_crcp_bkmrk_component.super.output_handle,
3637 "crcp:bkmrk: traffic_message_find_recv: Unable to find the proper message reference.\n");
3638 return OMPI_ERROR;
3639 }
3640
3641 /*
3642 * Check the irecv_list
3643 */
3644 if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->irecv_list),
3645 count, tag, INVALID_INT,
3646 comm_id, datatype_size,
3647 posted_irecv_msg_ref,
3648 FIND_MSG_UNKNOWN) ) ) {
3649 opal_output(mca_crcp_bkmrk_component.super.output_handle,
3650 "crcp:bkmrk: traffic_message_find_recv: Unable to find the proper message reference.\n");
3651 return OMPI_ERROR;
3652 }
3653
3654 /*
3655 * Check the recv_init_list
3656 */
3657 if( OMPI_SUCCESS != (ret = traffic_message_find(&(peer_ref->recv_init_list),
3658 count, tag, INVALID_INT,
3659 comm_id, datatype_size,
3660 posted_precv_msg_ref,
3661 FIND_MSG_UNKNOWN) ) ) {
3662 opal_output(mca_crcp_bkmrk_component.super.output_handle,
3663 "crcp:bkmrk: traffic_message_find_recv: Unable to find the proper message reference.\n");
3664 return OMPI_ERROR;
3665 }
3666
3667 /*
3668 * Check the unknown list of non-persistant
3669 */
3670 if( OMPI_SUCCESS != (ret = traffic_message_find(&(unknown_recv_from_list),
3671 count, tag, INVALID_INT,
3672 comm_id, datatype_size,
3673 posted_unknown_recv_msg_ref,
3674 FIND_MSG_UNKNOWN) ) ) {
3675 opal_output(mca_crcp_bkmrk_component.super.output_handle,
3676 "crcp:bkmrk: traffic_message_find_recv: Unable to find the proper message reference.\n");
3677 return OMPI_ERROR;
3678 }
3679
3680 /*
3681 * Check the unknown list of persistant
3682 */
3683 if( OMPI_SUCCESS != (ret = traffic_message_find(&(unknown_persist_recv_list),
3684 count, tag, INVALID_INT,
3685 comm_id, datatype_size,
3686 posted_unknown_precv_msg_ref,
3687 FIND_MSG_UNKNOWN) ) ) {
3688 opal_output(mca_crcp_bkmrk_component.super.output_handle,
3689 "crcp:bkmrk: traffic_message_find_recv: Unable to find the proper message reference.\n");
3690 return OMPI_ERROR;
3691 }
3692
3693 /*
3694 * JJH -- Should we check the drained list?
3695 * If we checkpoint again before dimishing the drained list, then
3696 * the peer could be requesting that a drained send complete...
3697 */
3698
3699 return OMPI_SUCCESS;
3700 }
3701
traffic_message_find(opal_list_t * search_list,size_t count,int tag,int peer,uint32_t comm_id,size_t ddt_size,ompi_crcp_bkmrk_pml_traffic_message_ref_t ** found_msg_ref,int active)3702 static int traffic_message_find(opal_list_t * search_list,
3703 size_t count, int tag, int peer,
3704 uint32_t comm_id, size_t ddt_size,
3705 ompi_crcp_bkmrk_pml_traffic_message_ref_t ** found_msg_ref,
3706 int active )
3707 {
3708 opal_list_item_t* item = NULL;
3709
3710 *found_msg_ref = NULL;
3711
3712 #if OPAL_ENABLE_DEBUG == 1
3713 /*
3714 * Dummy checks:
3715 */
3716 if( NULL == search_list) {
3717 opal_output(0, "WARNING (Debug): Search_list NULL! (%s:%d)", __FILE__, __LINE__);
3718 return OMPI_ERROR;
3719 }
3720 #endif
3721
3722 /*
3723 * Check the search list
3724 */
3725 for(item = opal_list_get_last(search_list);
3726 item != opal_list_get_begin(search_list);
3727 item = opal_list_get_prev(item) ) {
3728 ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref;
3729 msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)item;
3730
3731 if( active != FIND_MSG_UNKNOWN ) {
3732 if( active == PERSIST_MARKER ) {
3733 if( 0 >= msg_ref->posted ) {
3734 continue;
3735 }
3736 }
3737 else if( (active == FIND_MSG_TRUE && 0 >= (msg_ref->active + msg_ref->active_drain) ) ||
3738 (active == FIND_MSG_FALSE && 0 <= (msg_ref->active + msg_ref->active_drain) ) ) {
3739 continue;
3740 }
3741 }
3742
3743 if(msg_ref->count == count &&
3744 (NULL != msg_ref->comm && msg_ref->comm->c_contextid == comm_id) &&
3745 (msg_ref->tag == MPI_ANY_TAG || msg_ref->tag == tag) &&
3746 (peer == INVALID_INT || msg_ref->rank == peer) &&
3747 msg_ref->ddt_size == ddt_size) {
3748
3749 OPAL_OUTPUT_VERBOSE((30, mca_crcp_bkmrk_component.super.output_handle,
3750 "crcp:bkmrk: traffic_message_find: Found Message -- Comm list (%d, %d)\n",
3751 tag, peer));
3752
3753 *found_msg_ref = msg_ref;
3754 return OMPI_SUCCESS;
3755 }
3756 }
3757
3758 return OMPI_SUCCESS;
3759 }
3760
3761
3762 /************************************************
3763 * Drain Message Utility Functions
3764 ************************************************/
drain_message_append(ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref,ompi_crcp_bkmrk_pml_message_type_t msg_type,size_t count,size_t ddt_size,int tag,int dest,struct ompi_communicator_t * comm,ompi_crcp_bkmrk_pml_drain_message_ref_t ** msg_ref)3765 static int drain_message_append(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
3766 ompi_crcp_bkmrk_pml_message_type_t msg_type,
3767 size_t count, size_t ddt_size,
3768 int tag,int dest,
3769 struct ompi_communicator_t* comm,
3770 ompi_crcp_bkmrk_pml_drain_message_ref_t **msg_ref)
3771 {
3772 int ret, exit_status = OMPI_SUCCESS;
3773 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
3774
3775 /*
3776 * Determine if message is currently in the list
3777 * - If it is then increment the count.
3778 * - ow add it to the list
3779 */
3780 if( OMPI_SUCCESS != (ret = drain_message_find(&(peer_ref->drained_list),
3781 count, tag, dest,
3782 comm->c_contextid,
3783 ddt_size,
3784 msg_ref,
3785 &content_ref) ) ) {
3786 opal_output(mca_crcp_bkmrk_component.super.output_handle,
3787 "crcp:bkmrk: traffic_message_append: Unable to find the proper message reference.\n");
3788 return OMPI_ERROR;
3789 }
3790
3791 if( NULL == *msg_ref ) {
3792 CREATE_NEW_DRAIN_MSG((*msg_ref), msg_type,
3793 count, NULL, tag, dest, comm,
3794 peer_ref->proc_name.jobid,
3795 peer_ref->proc_name.vpid);
3796
3797 (*msg_ref)->done = 0;
3798 (*msg_ref)->active = 0;
3799 (*msg_ref)->already_posted = 0;
3800
3801 opal_list_append(&(peer_ref->drained_list), &((*msg_ref)->super));
3802 }
3803 /* If message does exist then the calling function needs to handle the msg_contents and counts */
3804
3805 return exit_status;
3806 }
3807
drain_message_remove(ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref,ompi_crcp_bkmrk_pml_drain_message_ref_t * msg_ref,ompi_crcp_bkmrk_pml_message_content_ref_t * content_ref)3808 static int drain_message_remove(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
3809 ompi_crcp_bkmrk_pml_drain_message_ref_t *msg_ref,
3810 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref)
3811 {
3812 /*
3813 * Remove the message content from the list attached to the message
3814 */
3815 opal_list_remove_item(&(msg_ref->msg_contents), &(content_ref->super));
3816 HOKE_CONTENT_REF_RETURN(content_ref);
3817
3818 /*
3819 * If there are no more drained messages of this signature,
3820 * then remove the signature from the peers drained list.
3821 */
3822 if( 0 >= opal_list_get_size(&(msg_ref->msg_contents) ) ) {
3823 TRAFFIC_MSG_DUMP_DRAIN_MSG_INDV(10, (msg_ref, "D*remove", true));
3824 opal_list_remove_item(&(peer_ref->drained_list), &(msg_ref->super));
3825 HOKE_DRAIN_MSG_REF_RETURN(msg_ref);
3826 } else {
3827 TRAFFIC_MSG_DUMP_DRAIN_MSG_INDV(10, (msg_ref, "Dremove", true));
3828 }
3829
3830 return OMPI_SUCCESS;
3831 }
3832
drain_message_check_recv(void ** buf,size_t count,ompi_datatype_t * datatype,int * src,int * tag,struct ompi_communicator_t * comm,struct ompi_request_t ** request,ompi_status_public_t ** status,bool * found_drain)3833 static int drain_message_check_recv(void **buf, size_t count,
3834 ompi_datatype_t *datatype,
3835 int *src, int *tag,
3836 struct ompi_communicator_t* comm,
3837 struct ompi_request_t **request,
3838 ompi_status_public_t** status,
3839 bool *found_drain)
3840 {
3841 int ret, exit_status = OMPI_SUCCESS;
3842 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref = NULL;
3843 ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref = NULL;
3844 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
3845 size_t tmp_ddt_size = 0;
3846
3847 *found_drain = false;
3848
3849 ompi_datatype_type_size(datatype, &tmp_ddt_size);
3850
3851 /*
3852 * Check to see if this message is in the drained message list
3853 */
3854 if( OMPI_SUCCESS != (ret = drain_message_find_any(count, *tag, *src,
3855 comm, tmp_ddt_size,
3856 &drain_msg_ref,
3857 &content_ref,
3858 &peer_ref) ) ) {
3859 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: drain_check_recv(): Failed trying to find a drained message.");
3860 exit_status = ret;
3861 goto DONE;
3862 }
3863
3864 /*
3865 * If the message is a drained message
3866 * - Complete it right now
3867 * - We do not need to increment any counters here since we already have
3868 * when we originally drained the message.
3869 */
3870 if( NULL != drain_msg_ref ) {
3871 OPAL_OUTPUT_VERBOSE((12, mca_crcp_bkmrk_component.super.output_handle,
3872 "crcp:bkmrk: drain_check_recv(): Matched a drained message..."));
3873
3874 if( OMPI_SUCCESS != (ret = drain_message_copy_remove(drain_msg_ref,
3875 content_ref,
3876 src, tag, request, status,
3877 datatype, count, buf,
3878 peer_ref) ) ) {
3879 opal_output( mca_crcp_bkmrk_component.super.output_handle,
3880 "crcp:bkmrk: drain_check_recv(): Datatype copy failed (%d)",
3881 ret);
3882 exit_status = ret;
3883 goto DONE;
3884 }
3885
3886 peer_ref->total_drained_msgs -= 1;
3887
3888 *found_drain = true;
3889 }
3890
3891 DONE:
3892 return exit_status;
3893 }
3894
drain_message_find_any(size_t count,int tag,int peer,struct ompi_communicator_t * comm,size_t ddt_size,ompi_crcp_bkmrk_pml_drain_message_ref_t ** found_msg_ref,ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref,ompi_crcp_bkmrk_pml_peer_ref_t ** peer_ref)3895 static int drain_message_find_any(size_t count, int tag, int peer,
3896 struct ompi_communicator_t* comm, size_t ddt_size,
3897 ompi_crcp_bkmrk_pml_drain_message_ref_t ** found_msg_ref,
3898 ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref,
3899 ompi_crcp_bkmrk_pml_peer_ref_t **peer_ref)
3900 {
3901 ompi_crcp_bkmrk_pml_peer_ref_t *cur_peer_ref = NULL;
3902 opal_list_item_t* item = NULL;
3903
3904 *found_msg_ref = NULL;
3905
3906 for(item = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
3907 item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
3908 item = opal_list_get_next(item) ) {
3909 cur_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
3910
3911 /*
3912 * If we ware not MPI_ANY_SOURCE, then extract the process name from the
3913 * communicator, and search only the peer that matches.
3914 */
3915 if( MPI_ANY_SOURCE != peer && peer >= 0) {
3916 /* Check to see if peer could possibly be in this communicator */
3917 if( comm->c_local_group->grp_proc_count <= peer ) {
3918 continue;
3919 }
3920
3921 if( OPAL_EQUAL != ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
3922 &(cur_peer_ref->proc_name),
3923 OMPI_CAST_RTE_NAME(&comm->c_local_group->grp_proc_pointers[peer]->super.proc_name))) {
3924 continue;
3925 }
3926 }
3927
3928 drain_message_find(&(cur_peer_ref->drained_list),
3929 count, tag, peer,
3930 comm->c_contextid, ddt_size,
3931 found_msg_ref,
3932 content_ref);
3933 if( NULL != *found_msg_ref) {
3934 if( NULL != peer_ref ) {
3935 *peer_ref = cur_peer_ref;
3936 }
3937 return OMPI_SUCCESS;
3938 }
3939 }
3940
3941 return OMPI_SUCCESS;
3942 }
3943
drain_message_find(opal_list_t * search_list,size_t count,int tag,int peer,uint32_t comm_id,size_t ddt_size,ompi_crcp_bkmrk_pml_drain_message_ref_t ** found_msg_ref,ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref)3944 static int drain_message_find(opal_list_t * search_list,
3945 size_t count, int tag, int peer,
3946 uint32_t comm_id, size_t ddt_size,
3947 ompi_crcp_bkmrk_pml_drain_message_ref_t ** found_msg_ref,
3948 ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref)
3949 {
3950 ompi_crcp_bkmrk_pml_drain_message_ref_t * drain_msg = NULL;
3951 opal_list_item_t* item = NULL;
3952
3953 *found_msg_ref = NULL;
3954 *content_ref = NULL;
3955
3956 /* Dummy Check:
3957 * If the list is empty...
3958 */
3959 if( 0 >= opal_list_get_size(search_list) ) {
3960 return OMPI_SUCCESS;
3961 }
3962
3963 for(item = opal_list_get_first(search_list);
3964 item != opal_list_get_end(search_list);
3965 item = opal_list_get_next(item) ) {
3966 drain_msg = (ompi_crcp_bkmrk_pml_drain_message_ref_t*)item;
3967
3968 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
3969 "crcp:bkmrk: find_drain_msg(): Compare [%d, %d, %d, %d] to [%d, %d, %d, %d]",
3970 (int)ddt_size, (int)count, tag, peer,
3971 (int)drain_msg->ddt_size, (int)drain_msg->count, (int)drain_msg->tag, (int)drain_msg->rank));
3972
3973 /* Check the communicator for a match */
3974 if( NULL != drain_msg->comm ) {
3975 if( drain_msg->comm->c_contextid != comm_id ) {
3976 continue;
3977 }
3978 }
3979
3980 /* If a specific tag was requested, then make sure this messages matches */
3981 if( MPI_ANY_TAG != tag &&
3982 drain_msg->tag != tag) {
3983 continue;
3984 }
3985
3986 /* If a specific rank was requested, then make sure this messages matches */
3987 if( INVALID_INT != peer ) {
3988 if( MPI_ANY_SOURCE != peer &&
3989 drain_msg->rank != peer) {
3990 continue;
3991 }
3992 }
3993
3994 /* Check the datatype size, if specified for a match */
3995 if( ddt_size != PROBE_ANY_SIZE &&
3996 count != PROBE_ANY_COUNT) {
3997 /* Check the datatype size and count to make sure it matches */
3998 if((drain_msg->count ) != count ||
3999 (drain_msg->ddt_size) != ddt_size) {
4000 continue;
4001 }
4002 }
4003
4004 /* If we get here then the message matches */
4005 *found_msg_ref = drain_msg;
4006 break;
4007 }
4008
4009 /*
4010 * Find a content to return
4011 */
4012 if( NULL != *found_msg_ref ) {
4013 drain_message_grab_content((*found_msg_ref), content_ref );
4014
4015 /* If there are no contents that match, then there are no drained messages that match. */
4016 if( NULL == *content_ref ) {
4017 *found_msg_ref = NULL;
4018 }
4019 }
4020
4021 return OMPI_SUCCESS;
4022 }
4023
drain_message_grab_content(ompi_crcp_bkmrk_pml_drain_message_ref_t * drain_msg_ref,ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref)4024 static int drain_message_grab_content(ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref,
4025 ompi_crcp_bkmrk_pml_message_content_ref_t ** content_ref)
4026 {
4027 ompi_crcp_bkmrk_pml_message_content_ref_t *loc_content_ref = NULL;
4028 opal_list_item_t* item = NULL;
4029
4030 *content_ref = NULL;
4031
4032 for(item = opal_list_get_first(&(drain_msg_ref->msg_contents));
4033 item != opal_list_get_end(&(drain_msg_ref->msg_contents));
4034 item = opal_list_get_next(item) ) {
4035 loc_content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)item;
4036
4037 /* If the buffer is invalid then this is not a valid message or
4038 * has not been completed draining just yet */
4039 if(NULL != loc_content_ref->buffer) {
4040 *content_ref = loc_content_ref;
4041 break;
4042 }
4043 }
4044
4045 return OMPI_SUCCESS;
4046 }
4047
drain_message_copy_remove_persistent(ompi_crcp_bkmrk_pml_drain_message_ref_t * drain_msg_ref,ompi_crcp_bkmrk_pml_message_content_ref_t * drain_content_ref,ompi_crcp_bkmrk_pml_traffic_message_ref_t * traffic_msg_ref,ompi_request_t * request,ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref)4048 static int drain_message_copy_remove_persistent(ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref,
4049 ompi_crcp_bkmrk_pml_message_content_ref_t *drain_content_ref,
4050 ompi_crcp_bkmrk_pml_traffic_message_ref_t *traffic_msg_ref,
4051 ompi_request_t *request,
4052 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref)
4053 {
4054 int ret, exit_status = OMPI_SUCCESS;
4055 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
4056
4057 /*
4058 * Find the request in the list that has been posted, but not started
4059 */
4060 traffic_message_find_mark_persistent(traffic_msg_ref, &request,
4061 false, /* Find currently not active */
4062 false, /* Keep inactive */
4063 &content_ref);
4064
4065 /* These two requests should be exactly the same, so this is redundant, but here for completeness */
4066 content_ref->request = request;
4067
4068 memcpy(&(content_ref->status), &drain_content_ref->status, sizeof(ompi_status_public_t));
4069
4070 if( 0 != (ret = ompi_datatype_copy_content_same_ddt(drain_msg_ref->datatype,
4071 drain_msg_ref->count,
4072 content_ref->buffer,
4073 drain_content_ref->buffer) ) ) {
4074 opal_output( mca_crcp_bkmrk_component.super.output_handle,
4075 "crcp:bkmrk: drain_message_copy_remove_p(): Datatype copy failed (%d)",
4076 ret);
4077 exit_status = ret;
4078 }
4079
4080 /* Remove the message from the list */
4081 drain_content_ref->request = NULL;
4082 drain_message_remove(peer_ref, drain_msg_ref, drain_content_ref);
4083
4084 return exit_status;
4085 }
4086
drain_message_copy_remove(ompi_crcp_bkmrk_pml_drain_message_ref_t * drain_msg_ref,ompi_crcp_bkmrk_pml_message_content_ref_t * drain_content_ref,int * src,int * tag,struct ompi_request_t ** request,ompi_status_public_t ** status,ompi_datatype_t * datatype,int count,void ** buf,ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref)4087 static int drain_message_copy_remove(ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref,
4088 ompi_crcp_bkmrk_pml_message_content_ref_t * drain_content_ref,
4089 int *src, int *tag,
4090 struct ompi_request_t **request,
4091 ompi_status_public_t **status,
4092 ompi_datatype_t *datatype, int count, void **buf,
4093 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref)
4094 {
4095 int ret, exit_status = OMPI_SUCCESS;
4096
4097 if( NULL != src ) {
4098 *src = drain_msg_ref->rank;
4099 }
4100
4101 if( NULL != tag ) {
4102 *tag = drain_msg_ref->tag;
4103 }
4104
4105 if( NULL != request ) {
4106 *request = drain_content_ref->request;
4107 OBJ_RETAIN(*request);
4108 }
4109
4110 if( NULL != status && MPI_STATUS_IGNORE != *status ) {
4111 memcpy(*status, &drain_content_ref->status, sizeof(ompi_status_public_t));
4112 }
4113
4114 /* The buffer could be NULL - More likely when doing a count=0 type of message (e.g., Barrier) */
4115 if( OPAL_LIKELY(NULL != buf) ) {
4116 if( 0 != (ret = ompi_datatype_copy_content_same_ddt(datatype, count,
4117 (void*)buf, drain_content_ref->buffer) ) ) {
4118 opal_output( mca_crcp_bkmrk_component.super.output_handle,
4119 "crcp:bkmrk: drain_message_copy_remove(): Datatype copy failed (%d)",
4120 ret);
4121 exit_status = ret;
4122 }
4123 }
4124 else {
4125 OPAL_OUTPUT_VERBOSE((20, mca_crcp_bkmrk_component.super.output_handle,
4126 "crcp:bkmrk: drain_message_copy_remove(): Skip copy - NULL buffer"));
4127 }
4128
4129 /* Remove the message from the list */
4130 drain_content_ref->request = NULL;
4131 drain_message_remove(peer_ref, drain_msg_ref, drain_content_ref);
4132
4133 return exit_status;
4134 }
4135
4136
4137 /************************************************
4138 * Peer List Utility Functions
4139 ************************************************/
find_peer(ompi_process_name_t proc)4140 static ompi_crcp_bkmrk_pml_peer_ref_t * find_peer(ompi_process_name_t proc)
4141 {
4142 opal_list_item_t* item = NULL;
4143 ompi_rte_cmp_bitmask_t mask;
4144
4145 for(item = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
4146 item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
4147 item = opal_list_get_next(item) ) {
4148 ompi_crcp_bkmrk_pml_peer_ref_t *cur_peer_ref;
4149 cur_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
4150
4151 mask = OMPI_RTE_CMP_JOBID | OMPI_RTE_CMP_VPID;
4152
4153 if( OPAL_EQUAL == ompi_rte_compare_name_fields(mask,
4154 &(cur_peer_ref->proc_name),
4155 &proc) ) {
4156 return cur_peer_ref;
4157 }
4158 }
4159
4160 return NULL;
4161 }
4162
find_peer_in_comm(struct ompi_communicator_t * comm,int proc_idx,ompi_crcp_bkmrk_pml_peer_ref_t ** peer_ref)4163 static int find_peer_in_comm(struct ompi_communicator_t* comm, int proc_idx,
4164 ompi_crcp_bkmrk_pml_peer_ref_t **peer_ref)
4165 {
4166 *peer_ref = find_peer(*(ompi_process_name_t *)&comm->c_remote_group->grp_proc_pointers[proc_idx]->super.proc_name);
4167
4168 if( NULL == *peer_ref) {
4169 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4170 "crcp:bkmrk: find_peer_in_comm(): Failed to find peer_ref - peer_ref is NULL\n");
4171 return OMPI_ERROR;
4172 }
4173
4174 return OMPI_SUCCESS;
4175 }
4176
4177
4178 /************************************************
4179 * FT Event Utility Functions
4180 ************************************************/
ft_event_coordinate_peers(void)4181 static int ft_event_coordinate_peers(void)
4182 {
4183 static int step_to_return_to = 0;
4184 int exit_status = OMPI_SUCCESS;
4185 int ret;
4186
4187 if( step_to_return_to == 1 ) {
4188 goto STEP_1;
4189 }
4190
4191 /*
4192 * Exchange Bookmarks with peers
4193 */
4194 START_TIMER(CRCP_TIMER_CKPT_EX_B);
4195 if( OMPI_SUCCESS != (ret = ft_event_exchange_bookmarks() ) ) {
4196 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4197 "crcp:bkmrk: ft_event_coordinate_peers: Bookmark Exchange Failed %d",
4198 ret);
4199 exit_status = ret;
4200 goto DONE;
4201 }
4202 END_TIMER(CRCP_TIMER_CKPT_EX_B);
4203
4204 /*
4205 * Check exchanged bookmarks
4206 */
4207 START_TIMER(CRCP_TIMER_CKPT_CHECK_B);
4208 if( OMPI_SUCCESS != (ret = ft_event_check_bookmarks() ) ) {
4209 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4210 "crcp:bkmrk: ft_event_coordinate_peers: Bookmark Check Failed %d",
4211 ret);
4212 exit_status = ret;
4213 goto DONE;
4214 }
4215 END_TIMER(CRCP_TIMER_CKPT_CHECK_B);
4216
4217 /*
4218 * Post Drain Acks and Msgs
4219 */
4220 START_TIMER(CRCP_TIMER_CKPT_POST_DRAIN);
4221 if( OMPI_SUCCESS != (ret = ft_event_post_drain_acks() ) ) {
4222 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4223 "crcp:bkmrk: ft_event_coordinate_peers: Bookmark Post Drain ACKS Failed %d",
4224 ret);
4225 exit_status = ret;
4226 goto DONE;
4227 }
4228
4229 if( OMPI_SUCCESS != (ret = ft_event_post_drained() ) ) {
4230 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4231 "crcp:bkmrk: ft_event_coordinate_peers: Bookmark Post Drain Msgs Failed %d",
4232 ret);
4233 exit_status = ret;
4234 goto DONE;
4235 }
4236 END_TIMER(CRCP_TIMER_CKPT_POST_DRAIN);
4237 DISPLAY_INDV_TIMER(CRCP_TIMER_CKPT_POST_DRAIN, -1, 0);
4238
4239 /*
4240 * Check if we need to stall for completion of tasks
4241 */
4242 /*
4243 * If we know that we are in the middle of a blocking send then we
4244 * need to stall the coordination algorithm while we wait for this to
4245 * complete.
4246 */
4247 if( 0 < current_msg_id &&
4248 current_msg_type == COORD_MSG_TYPE_B_SEND) {
4249 stall_for_completion = true;
4250 }
4251 START_TIMER(CRCP_TIMER_CKPT_WAIT_QUI);
4252 if( stall_for_completion ) {
4253 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4254 "crcp:bkmrk: %s **** STALLING %s in PID %d ***",
4255 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4256 (current_msg_type == COORD_MSG_TYPE_B_SEND ? "Send" : "Recv"),
4257 getpid() ));
4258 step_to_return_to = 1;
4259 exit_status = OMPI_SUCCESS;
4260 goto DONE;
4261 }
4262
4263 STEP_1:
4264 step_to_return_to = 0;
4265
4266 /*
4267 * Wait for any messages that needed resolved.
4268 * - Outstanding Receives (to drain wire) -- Receiver
4269 * - Outstanding Irecvs (for drain ack) -- Sender
4270 */
4271 if( OMPI_SUCCESS != (ret = ft_event_wait_quiesce() ) ) {
4272 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4273 "crcp:bkmrk: ft_event_coordinate_peers: Wait Quiesce Failed %d",
4274 ret);
4275 exit_status = ret;
4276 goto DONE;
4277 }
4278 END_TIMER(CRCP_TIMER_CKPT_WAIT_QUI);
4279
4280 OPAL_OUTPUT_VERBOSE((5, mca_crcp_bkmrk_component.super.output_handle,
4281 "crcp:bkmrk: %s Coordination Finished...\n",
4282 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME)));
4283
4284 /*
4285 * Now that all our peer channels are marked as drained
4286 * continue with the checkpoint.
4287 * Note: This does not guarentee that all of the peers
4288 * are at this same position, but that our
4289 * checkpoint will be consistent with all of the
4290 * peers once they finish the protocol.
4291 */
4292
4293 DONE:
4294 return exit_status;
4295 }
4296
ft_event_finalize_exchange(void)4297 static int ft_event_finalize_exchange(void)
4298 {
4299 int exit_status = OMPI_SUCCESS;
4300 opal_list_item_t* item = NULL, *rm_item = NULL;
4301 ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref;
4302 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
4303 opal_list_item_t* cont_item = NULL;
4304
4305 /*
4306 * Clear bookmark totals
4307 */
4308 for(item = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
4309 item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
4310 item = opal_list_get_next(item) ) {
4311 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref;
4312 peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
4313
4314 if( OPAL_EQUAL != ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
4315 (OMPI_PROC_MY_NAME),
4316 &(peer_ref->proc_name)) ) {
4317 TRAFFIC_MSG_DUMP_PEER(10, (peer_ref, "finalize_exchange", false));
4318 }
4319
4320 peer_ref->total_msgs_sent = 0;
4321 peer_ref->total_msgs_recvd = 0;
4322
4323 peer_ref->matched_msgs_sent = 0;
4324 peer_ref->matched_msgs_recvd = 0;
4325
4326 peer_ref->ack_required = false;
4327
4328 /* Clear send_list */
4329 for(rm_item = opal_list_get_last(&peer_ref->send_list);
4330 rm_item != opal_list_get_begin(&peer_ref->send_list);
4331 rm_item = opal_list_get_prev(rm_item) ) {
4332 msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)rm_item;
4333 msg_ref->matched = 0;
4334 msg_ref->done = 0;
4335 msg_ref->active_drain += msg_ref->active;
4336 msg_ref->active = 0;
4337
4338 for(cont_item = opal_list_get_first(&(msg_ref->msg_contents));
4339 cont_item != opal_list_get_end( &(msg_ref->msg_contents));
4340 cont_item = opal_list_get_next(cont_item) ) {
4341 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)cont_item;
4342 if( content_ref->active ) {
4343 content_ref->already_drained = true;
4344 }
4345 }
4346 }
4347
4348 /* Clear isend_list */
4349 for(rm_item = opal_list_get_last(&peer_ref->isend_list);
4350 rm_item != opal_list_get_begin(&peer_ref->isend_list);
4351 rm_item = opal_list_get_prev(rm_item) ) {
4352 msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)rm_item;
4353 msg_ref->matched = 0;
4354 msg_ref->done = 0;
4355 msg_ref->active_drain += msg_ref->active;
4356 msg_ref->active = 0;
4357
4358 for(cont_item = opal_list_get_first(&(msg_ref->msg_contents));
4359 cont_item != opal_list_get_end( &(msg_ref->msg_contents));
4360 cont_item = opal_list_get_next(cont_item) ) {
4361 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)cont_item;
4362 if( content_ref->active ) {
4363 content_ref->already_drained = true;
4364 }
4365 }
4366 }
4367
4368 /* Clear send_init_list */
4369 for(rm_item = opal_list_get_last(&peer_ref->send_list);
4370 rm_item != opal_list_get_begin(&peer_ref->send_list);
4371 rm_item = opal_list_get_prev(rm_item) ) {
4372 msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)rm_item;
4373 msg_ref->matched = 0;
4374 msg_ref->done = 0;
4375 msg_ref->active_drain += msg_ref->active;
4376 msg_ref->active = 0;
4377
4378 for(cont_item = opal_list_get_first(&(msg_ref->msg_contents));
4379 cont_item != opal_list_get_end( &(msg_ref->msg_contents));
4380 cont_item = opal_list_get_next(cont_item) ) {
4381 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)cont_item;
4382 if( content_ref->active ) {
4383 content_ref->already_drained = true;
4384 }
4385 }
4386 }
4387
4388 /* Clear recv_list */
4389 for(rm_item = opal_list_get_last(&peer_ref->recv_list);
4390 rm_item != opal_list_get_begin(&peer_ref->recv_list);
4391 rm_item = opal_list_get_prev(rm_item) ) {
4392 msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)rm_item;
4393 msg_ref->matched = 0;
4394 msg_ref->done = 0;
4395 }
4396
4397 /* Clear irecv_list */
4398 for(rm_item = opal_list_get_last(&peer_ref->irecv_list);
4399 rm_item != opal_list_get_begin(&peer_ref->irecv_list);
4400 rm_item = opal_list_get_prev(rm_item) ) {
4401 msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)rm_item;
4402 msg_ref->matched = 0;
4403 msg_ref->done = 0;
4404 }
4405
4406 /* Clear recv_init_list */
4407 for(rm_item = opal_list_get_last(&peer_ref->recv_list);
4408 rm_item != opal_list_get_begin(&peer_ref->recv_list);
4409 rm_item = opal_list_get_prev(rm_item) ) {
4410 msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)rm_item;
4411 msg_ref->matched = 0;
4412 msg_ref->done = 0;
4413 }
4414 }
4415
4416 return exit_status;
4417 }
4418
ft_event_exchange_bookmarks(void)4419 static int ft_event_exchange_bookmarks(void)
4420 {
4421 int peer_idx = 0;
4422 int my_idx = OMPI_PROC_MY_NAME->vpid;
4423 int iter = 0;
4424 int num_peers = 0;
4425
4426 num_peers = opal_list_get_size(&ompi_crcp_bkmrk_pml_peer_refs);
4427
4428 for( peer_idx = (num_peers - my_idx - 1), iter = 0;
4429 iter < num_peers;
4430 peer_idx = (peer_idx + 1) % num_peers, ++iter)
4431 {
4432 if(my_idx > peer_idx) {
4433 /* Send our bookmark status */
4434 send_bookmarks(peer_idx);
4435 /* Recv peer bookmark status */
4436 recv_bookmarks(peer_idx);
4437 }
4438 else if(my_idx < peer_idx) {
4439 /* Recv peer bookmark status */
4440 recv_bookmarks(peer_idx);
4441 /* Send our bookmark status */
4442 send_bookmarks(peer_idx);
4443 }
4444 }
4445
4446 /* Wait for all bookmarks to arrive */
4447 START_TIMER(CRCP_TIMER_CKPT_EX_WAIT);
4448 while( total_recv_bookmarks > 0 ) {
4449 opal_event_loop(opal_sync_event_base, OPAL_EVLOOP_NONBLOCK);
4450 }
4451 total_recv_bookmarks = 0;
4452 END_TIMER(CRCP_TIMER_CKPT_EX_WAIT);
4453
4454 return OMPI_SUCCESS;
4455 }
4456
ft_event_check_bookmarks(void)4457 static int ft_event_check_bookmarks(void)
4458 {
4459 opal_list_item_t* item = NULL;
4460 int ret;
4461 int p_n_to_p_m = 0;
4462 int p_n_from_p_m = 0;
4463
4464 if( 10 <= mca_crcp_bkmrk_component.super.verbose ) {
4465 sleep(OMPI_PROC_MY_NAME->vpid);
4466 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4467 "---------------------------------------------"));
4468 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4469 "Process %s Match Table",
4470 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME)));
4471 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4472 "%s %5s | %7s | %7s | %7s | %7s |",
4473 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4474 "Vpid", "T_Send", "M_Recv", "M_Send", "T_Recv"));
4475
4476 for(item = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
4477 item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
4478 item = opal_list_get_next(item) ) {
4479 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref;
4480 int t_send, m_send;
4481 int t_recv, m_recv;
4482 peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
4483
4484 t_send = peer_ref->total_msgs_sent;
4485 m_send = peer_ref->matched_msgs_sent;
4486 t_recv = peer_ref->total_msgs_recvd;
4487 m_recv = peer_ref->matched_msgs_recvd;
4488
4489 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4490 "%s %5d | %7d | %7d | %7d | %7d |",
4491 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4492 peer_ref->proc_name.vpid,
4493 t_send, m_recv, m_send, t_recv));
4494 }
4495 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4496 "---------------------------------------------"));
4497 }
4498
4499 /*
4500 * For each peer:
4501 * - Check bookmarks
4502 * - if mis-matched then post outstanding recvs.
4503 */
4504 for(item = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
4505 item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
4506 item = opal_list_get_next(item) ) {
4507 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref;
4508 peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
4509
4510 if( OPAL_EQUAL == ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
4511 (OMPI_PROC_MY_NAME),
4512 &(peer_ref->proc_name)) ) {
4513 continue;
4514 }
4515
4516 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "-- Bookmark Details --", false));
4517
4518 /* Lowest Rank sends first */
4519 if( OMPI_PROC_MY_NAME->vpid < peer_ref->proc_name.vpid ) {
4520 /********************
4521 * Check P_n --> P_m
4522 * Has the peer received all the messages that I have put on the wire?
4523 ********************/
4524 p_n_to_p_m = peer_ref->total_msgs_sent;
4525 p_n_from_p_m = peer_ref->matched_msgs_recvd;
4526
4527 /* T_Send >= M_Recv */
4528 if( p_n_to_p_m < p_n_from_p_m ) {
4529 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4530 "crcp:bkmrk: %s --> %s "
4531 "Total Sent (%4d) = Matched Recv. (%4d) => Diff (%4d). "
4532 " WARNING: Peer received more than was sent. :(\n",
4533 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4534 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4535 p_n_to_p_m,
4536 p_n_from_p_m,
4537 (p_n_to_p_m - p_n_from_p_m)
4538 );
4539 }
4540
4541 /* I've send more than my peer has received,
4542 * so need to coordinate with peer. */
4543 if( p_n_to_p_m > p_n_from_p_m) {
4544 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4545 "crcp:bkmrk: %s --> %s "
4546 "Total Sent (%4d) = Matched Recv. (%4d). Peer needs %4d.\n",
4547 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4548 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4549 p_n_to_p_m,
4550 p_n_from_p_m,
4551 (p_n_to_p_m - p_n_from_p_m)
4552 ));
4553 /*
4554 * Tell the peer what the outstanding messages looked like.
4555 * Since we can't tell which ones they are, we need to send the
4556 * information for all of the messages since the last checkpoint
4557 */
4558 if( OMPI_SUCCESS != (ret = send_msg_details(peer_ref, p_n_to_p_m, p_n_from_p_m) ) ) {
4559 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4560 "crcp:bkmrk: check_bookmarks: Unable to send message details to peer %s: Return %d\n",
4561 OMPI_NAME_PRINT(&peer_ref->proc_name),
4562 ret);
4563 return ret;
4564 }
4565 }
4566
4567 /********************
4568 * Check P_n <-- P_m
4569 * Have I received all the messages that my peer has put on the wire?
4570 ********************/
4571 p_n_to_p_m = peer_ref->matched_msgs_sent;
4572 p_n_from_p_m = peer_ref->total_msgs_recvd;
4573
4574 /* M_Send >= T_Recv */
4575 if( p_n_to_p_m < p_n_from_p_m ) {
4576 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4577 "crcp:bkmrk: %s --> %s "
4578 "Matched Sent (%4d) = Total Recv. (%4d) => Diff (%4d). "
4579 " WARNING: I received more than the peer sent. :(\n",
4580 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4581 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4582 p_n_to_p_m,
4583 p_n_from_p_m,
4584 (p_n_to_p_m - p_n_from_p_m)
4585 );
4586 }
4587
4588 /* I've recv'ed less than my peer has sent,
4589 * so need to coordinate with peer. */
4590 if( p_n_to_p_m > p_n_from_p_m) {
4591 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4592 "crcp:bkmrk: %s <-- %s "
4593 "Matched Sent (%4d) = Total Recv. (%4d). I need %4d.\n",
4594 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4595 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4596 p_n_to_p_m,
4597 p_n_from_p_m,
4598 (p_n_to_p_m - p_n_from_p_m)
4599 ));
4600 /*
4601 * Receive from peer the datatypes of the outstanding sends
4602 * As we figure out what they are post Irecv's for them into a drained buffer list.
4603 */
4604 if( OMPI_SUCCESS != (ret = recv_msg_details(peer_ref, p_n_to_p_m, p_n_from_p_m) ) ) {
4605 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4606 "crcp:bkmrk: check_bookmarks: Unable to recv message details from peer %s: Return %d\n",
4607 OMPI_NAME_PRINT(&peer_ref->proc_name),
4608 ret);
4609 return ret;
4610 }
4611 }
4612 }
4613 /* Highest rank recvs first */
4614 else {
4615 /********************
4616 * Check P_n <-- P_m
4617 * Have I received all the messages that my peer has put on the wire?
4618 ********************/
4619 p_n_to_p_m = peer_ref->matched_msgs_sent;
4620 p_n_from_p_m = peer_ref->total_msgs_recvd;
4621
4622 /* M_Send >= T_Recv */
4623 if( p_n_to_p_m < p_n_from_p_m ) {
4624 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4625 "crcp:bkmrk: %s --> %s "
4626 "Matched Sent (%4d) = Total Recv. (%4d) => Diff (%4d). "
4627 " WARNING: I received more than the peer sent. :(\n",
4628 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4629 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4630 p_n_to_p_m,
4631 p_n_from_p_m,
4632 (p_n_to_p_m - p_n_from_p_m)
4633 );
4634 }
4635
4636 /* I've recv'ed less than my peer has sent,
4637 * so need to coordinate with peer. */
4638 if( p_n_to_p_m > p_n_from_p_m) {
4639 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4640 "crcp:bkmrk: %s <-- %s "
4641 "Matched Sent (%4d) = Total Recv. (%4d). I need %4d.\n",
4642 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4643 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4644 p_n_to_p_m,
4645 p_n_from_p_m,
4646 (p_n_to_p_m - p_n_from_p_m)
4647 ));
4648 /*
4649 * Receive from peer the datatypes of the outstanding sends
4650 * As we figure out what they are post Irecv's for them into a drained buffer list.
4651 */
4652 if( OMPI_SUCCESS != (ret = recv_msg_details(peer_ref, p_n_to_p_m, p_n_from_p_m) ) ) {
4653 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4654 "crcp:bkmrk: check_bookmarks: Unable to recv message details from peer %s: Return %d\n",
4655 OMPI_NAME_PRINT(&peer_ref->proc_name),
4656 ret);
4657 return ret;
4658 }
4659 }
4660
4661 /********************
4662 * Check P_n --> P_m
4663 * Has the peer received all the messages that I have put on the wire?
4664 ********************/
4665 p_n_to_p_m = peer_ref->total_msgs_sent;
4666 p_n_from_p_m = peer_ref->matched_msgs_recvd;
4667
4668 /* T_Send >= M_Recv */
4669 if( p_n_to_p_m < p_n_from_p_m ) {
4670 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4671 "crcp:bkmrk: %s --> %s "
4672 "Total Sent (%4d) = Matched Recv. (%4d) => Diff (%4d). "
4673 " WARNING: Peer received more than was sent. :(\n",
4674 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4675 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4676 p_n_to_p_m,
4677 p_n_from_p_m,
4678 (p_n_to_p_m - p_n_from_p_m)
4679 );
4680 }
4681
4682 /* I've send more than my peer has received,
4683 * so need to coordinate with peer. */
4684 if( p_n_to_p_m > p_n_from_p_m) {
4685 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4686 "crcp:bkmrk: %s --> %s "
4687 "Total Sent (%4d) = Matched Recv. (%4d). Peer needs %4d.\n",
4688 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4689 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
4690 p_n_to_p_m,
4691 p_n_from_p_m,
4692 (p_n_to_p_m - p_n_from_p_m)
4693 ));
4694 /*
4695 * Tell the peer what the outstanding messages looked like.
4696 * Since we can't tell which ones they are, we need to send the
4697 * information for all of the messages since the last checkpoint
4698 */
4699 if( OMPI_SUCCESS != (ret = send_msg_details(peer_ref, p_n_to_p_m, p_n_from_p_m) ) ) {
4700 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4701 "crcp:bkmrk: check_bookmarks: Unable to send message details to peer %s: Return %d\n",
4702 OMPI_NAME_PRINT(&peer_ref->proc_name),
4703 ret);
4704 return ret;
4705 }
4706 }
4707 }
4708 }
4709
4710 return OMPI_SUCCESS;
4711 }
4712
ft_event_post_drain_acks(void)4713 static int ft_event_post_drain_acks(void)
4714 {
4715 ompi_crcp_bkmrk_pml_drain_message_ack_ref_t * drain_msg_ack = NULL;
4716 opal_list_item_t* item = NULL;
4717 size_t req_size;
4718
4719 req_size = opal_list_get_size(&drained_msg_ack_list);
4720 if(req_size <= 0) {
4721 return OMPI_SUCCESS;
4722 }
4723
4724 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4725 "crcp:bkmrk: %s Wait on %d Drain ACK Messages.\n",
4726 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4727 (int)req_size));
4728
4729 /*
4730 * We have loaded our peer with the message information
4731 * Now wait for the ack from them
4732 */
4733 for(item = opal_list_get_first(&drained_msg_ack_list);
4734 item != opal_list_get_end(&drained_msg_ack_list);
4735 item = opal_list_get_next(item) ) {
4736 drain_msg_ack = (ompi_crcp_bkmrk_pml_drain_message_ack_ref_t*)item;
4737
4738 /* Post the receive */
4739 ompi_rte_recv_buffer_nb(&drain_msg_ack->peer, OMPI_CRCP_COORD_BOOKMARK_TAG,
4740 0, drain_message_ack_cbfunc, NULL);
4741 }
4742
4743 return OMPI_SUCCESS;
4744 }
4745
drain_message_ack_cbfunc(int status,ompi_process_name_t * sender,opal_buffer_t * buffer,ompi_rml_tag_t tag,void * cbdata)4746 static void drain_message_ack_cbfunc(int status,
4747 ompi_process_name_t* sender,
4748 opal_buffer_t *buffer,
4749 ompi_rml_tag_t tag,
4750 void* cbdata)
4751 {
4752 int ret, exit_status = OMPI_SUCCESS;
4753 opal_list_item_t* item = NULL;
4754 size_t ckpt_status;
4755
4756 /*
4757 * Unpack the buffer
4758 */
4759 UNPACK_BUFFER(buffer, ckpt_status, 1, OPAL_SIZE, "");
4760
4761 /*
4762 * Update the outstanding message queue
4763 */
4764 for(item = opal_list_get_first(&drained_msg_ack_list);
4765 item != opal_list_get_end(&drained_msg_ack_list);
4766 item = opal_list_get_next(item) ) {
4767 ompi_crcp_bkmrk_pml_drain_message_ack_ref_t * drain_msg_ack;
4768 drain_msg_ack = (ompi_crcp_bkmrk_pml_drain_message_ack_ref_t*)item;
4769
4770 /* If this ACK has not completed yet */
4771 if(!drain_msg_ack->complete) {
4772 /* If it is the correct peer */
4773 if( OPAL_EQUAL == ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
4774 &(drain_msg_ack->peer),
4775 sender) ) {
4776 /* We found it! */
4777 drain_msg_ack->complete = true;
4778 OPAL_OUTPUT_VERBOSE((5, mca_crcp_bkmrk_component.super.output_handle,
4779 "crcp:bkmrk: %s --> %s Received ACK of FLUSH from peer\n",
4780 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4781 OMPI_NAME_PRINT(sender) ));
4782 return;
4783 }
4784 }
4785 }
4786
4787 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4788 "crcp:bkmrk: %s --> %s ERROR: Unable to match ACK to peer (%d)\n",
4789 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4790 OMPI_NAME_PRINT(sender), exit_status);
4791
4792 cleanup:
4793 return;
4794 }
4795
ft_event_post_drained(void)4796 static int ft_event_post_drained(void)
4797 {
4798 int ret, exit_status = OMPI_SUCCESS;
4799 ompi_crcp_bkmrk_pml_peer_ref_t *cur_peer_ref = NULL;
4800 ompi_crcp_bkmrk_pml_drain_message_ref_t * drain_msg_ref = NULL;
4801 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
4802 opal_list_item_t* item = NULL, *d_item = NULL, *c_item = NULL;
4803 int i, total_number_to_drain = 0, peer_total = 0;
4804
4805 /* First Pass just to get a count */
4806 for(item = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
4807 item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
4808 item = opal_list_get_next(item) ) {
4809 cur_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
4810
4811 if( OPAL_EQUAL == ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
4812 (OMPI_PROC_MY_NAME),
4813 &(cur_peer_ref->proc_name)) ) {
4814 continue;
4815 }
4816
4817 for(d_item = opal_list_get_first(&(cur_peer_ref->drained_list));
4818 d_item != opal_list_get_end(&(cur_peer_ref->drained_list));
4819 d_item = opal_list_get_next(d_item) ) {
4820 drain_msg_ref = (ompi_crcp_bkmrk_pml_drain_message_ref_t*)d_item;
4821
4822 for(c_item = opal_list_get_first(&(drain_msg_ref->msg_contents));
4823 c_item != opal_list_get_end(&(drain_msg_ref->msg_contents));
4824 c_item = opal_list_get_next(c_item) ) {
4825 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)c_item;
4826
4827 if( !content_ref->done ) {
4828 ++total_number_to_drain;
4829 }
4830 }
4831 }
4832 }
4833
4834 /*
4835 * Check to make sure there is something to post
4836 */
4837 if( 0 >= total_number_to_drain ) {
4838 return OMPI_SUCCESS;
4839 }
4840
4841 /* Allocate memory */
4842 if( NULL != quiesce_requests ) {
4843 free(quiesce_requests);
4844 quiesce_requests = NULL;
4845 }
4846 quiesce_requests = (ompi_request_t **)malloc( (total_number_to_drain) * sizeof(ompi_request_t *));
4847 if( NULL == quiesce_requests){
4848 exit_status = OMPI_ERROR;
4849 goto cleanup;
4850 }
4851
4852 if( NULL != quiesce_statuses ) {
4853 free(quiesce_statuses);
4854 quiesce_statuses = NULL;
4855 }
4856 quiesce_statuses = (ompi_status_public_t **)malloc( (total_number_to_drain) * sizeof(ompi_status_public_t *));
4857 if( NULL == quiesce_statuses){
4858 exit_status = OMPI_ERROR;
4859 goto cleanup;
4860 }
4861
4862 /* Initalize to invalid values */
4863 for(i = 0; i < (total_number_to_drain); ++i) {
4864 quiesce_requests[i] = &(ompi_request_null.request);
4865 quiesce_statuses[i] = &ompi_status_empty;
4866 }
4867 quiesce_request_count = 0;
4868
4869 /* Second pass to post */
4870 for(item = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
4871 item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
4872 item = opal_list_get_next(item) ) {
4873 cur_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
4874 peer_total = 0;
4875
4876 if( OPAL_EQUAL == ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
4877 (OMPI_PROC_MY_NAME),
4878 &(cur_peer_ref->proc_name)) ) {
4879 continue;
4880 }
4881
4882 for(d_item = opal_list_get_first(&(cur_peer_ref->drained_list));
4883 d_item != opal_list_get_end(&(cur_peer_ref->drained_list));
4884 d_item = opal_list_get_next(d_item) ) {
4885 drain_msg_ref = (ompi_crcp_bkmrk_pml_drain_message_ref_t*)d_item;
4886
4887 for(c_item = opal_list_get_first(&(drain_msg_ref->msg_contents));
4888 c_item != opal_list_get_end(&(drain_msg_ref->msg_contents));
4889 c_item = opal_list_get_next(c_item) ) {
4890 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)c_item;
4891
4892 if( content_ref->done ) {
4893 continue;
4894 }
4895
4896 if( OMPI_SUCCESS != (ret = ft_event_post_drain_message(drain_msg_ref, content_ref) ) ) {
4897 exit_status = ret;
4898 goto cleanup;
4899 }
4900
4901 cur_peer_ref->ack_required = true;
4902
4903 /* Wait on all drain requests, newly posted or not */
4904 if( NULL != content_ref->request) {
4905 quiesce_requests[quiesce_request_count] = content_ref->request;
4906 quiesce_statuses[quiesce_request_count] = &content_ref->status;
4907 quiesce_request_count++;
4908 peer_total++;
4909 }
4910 /* If a NULL request, and already_posted then this is an indicator that we need to stall */
4911 else if( content_ref->already_posted ) {
4912 stall_for_completion = true;
4913 }
4914 else {
4915 ERROR_SHOULD_NEVER_HAPPEN("crcp:bkmrk: ft_event_post_drained(): Found a drain message with a NULL request.");
4916 }
4917 }
4918 }
4919
4920 if( peer_total > 0 || stall_for_completion ) {
4921 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
4922 "crcp:bkmrk: %s <-- %s Will be draining %4d messages from this peer. Total %4d %s\n",
4923 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4924 OMPI_NAME_PRINT(&(cur_peer_ref->proc_name)),
4925 peer_total,
4926 quiesce_request_count,
4927 (stall_for_completion ? "(And Stalling)" : "") ));
4928 }
4929 }
4930
4931 cleanup:
4932 return exit_status;
4933 }
4934
ft_event_post_drain_message(ompi_crcp_bkmrk_pml_drain_message_ref_t * drain_msg_ref,ompi_crcp_bkmrk_pml_message_content_ref_t * content_ref)4935 static int ft_event_post_drain_message(ompi_crcp_bkmrk_pml_drain_message_ref_t *drain_msg_ref,
4936 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref)
4937 {
4938 int ret;
4939
4940 /*
4941 * This message has already been posted and drained in a previous
4942 * checkpoint, do not post it again.
4943 */
4944 if( content_ref->done ) {
4945 return OMPI_SUCCESS;
4946 }
4947
4948 /* Do not repost those that are already posted, and
4949 * we have requests for
4950 */
4951 if( content_ref->already_posted ) {
4952 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
4953 "crcp:bkmrk: %s <-- %s Found a message that we do not need to post.\n",
4954 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4955 OMPI_NAME_PRINT(&(drain_msg_ref->proc_name)) ));
4956 return OMPI_SUCCESS;
4957 }
4958
4959 /* Match counts in traffic_message_create_drain_message() */
4960 content_ref->active = true;
4961 drain_msg_ref->active++;
4962
4963 /*
4964 * Post a receive to drain this message
4965 */
4966 OPAL_OUTPUT_VERBOSE((20, mca_crcp_bkmrk_component.super.output_handle,
4967 "crcp:bkmrk: %s <-- %s Posting a message to be drained from rank %d.\n",
4968 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4969 OMPI_NAME_PRINT(&(drain_msg_ref->proc_name)),
4970 drain_msg_ref->rank));
4971 if( OMPI_SUCCESS != (ret = wrapped_pml_module->pml_irecv(content_ref->buffer,
4972 (drain_msg_ref->count * drain_msg_ref->ddt_size),
4973 drain_msg_ref->datatype,
4974 drain_msg_ref->rank,
4975 drain_msg_ref->tag,
4976 drain_msg_ref->comm,
4977 &(content_ref->request) ) ) ) {
4978 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4979 "crcp:bkmrk: %s <-- %s Failed to post the Draining PML iRecv\n",
4980 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
4981 OMPI_NAME_PRINT(&(drain_msg_ref->proc_name)) );
4982 return ret;
4983 }
4984
4985 return OMPI_SUCCESS;
4986 }
4987
ft_event_wait_quiesce(void)4988 static int ft_event_wait_quiesce(void)
4989 {
4990 int exit_status = OMPI_SUCCESS;
4991 int ret;
4992
4993 /*********************************************
4994 * Wait for all draining receives to complete
4995 **********************************************/
4996 if( OMPI_SUCCESS != (ret = wait_quiesce_drained() ) ) {
4997 opal_output(mca_crcp_bkmrk_component.super.output_handle,
4998 "crcp:bkmrk: wait_quiesce: %s Failed to quiesce drained messages\n",
4999 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME) );
5000 exit_status = ret;
5001 goto cleanup;
5002 }
5003
5004 /*******************************************************************
5005 * If we are waiting for All Clear messages from peers wait on them.
5006 *******************************************************************/
5007 if( OMPI_SUCCESS != (ret = wait_quiesce_drain_ack() ) ) {
5008 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5009 "crcp:bkmrk: wait_quiesce: %s Failed to recv all drain ACKs\n",
5010 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME) );
5011 exit_status = ret;
5012 goto cleanup;
5013 }
5014
5015 cleanup:
5016 return exit_status;
5017 }
5018
wait_quiesce_drained(void)5019 static int wait_quiesce_drained(void)
5020 {
5021 int ret, exit_status = OMPI_SUCCESS;
5022 ompi_crcp_bkmrk_pml_peer_ref_t *cur_peer_ref = NULL;
5023 ompi_crcp_bkmrk_pml_drain_message_ref_t * drain_msg_ref = NULL;
5024 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
5025 opal_list_item_t* item = NULL, *d_item = NULL, *d_next = NULL, *c_item = NULL, *c_next = NULL;
5026 bool prev_stall = false;
5027
5028 /* Can we shortcut this? */
5029
5030 OPAL_OUTPUT_VERBOSE((5, mca_crcp_bkmrk_component.super.output_handle,
5031 "crcp:bkmrk: %s Waiting on %d messages to drain\n",
5032 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5033 (int)quiesce_request_count));
5034
5035 /*
5036 * Wait on all of the message to complete in any order
5037 * Created in ft_event_post_drained()
5038 */
5039 prev_stall = opal_cr_stall_check;
5040 opal_cr_stall_check = true;
5041 if( OMPI_SUCCESS != (ret = coord_request_wait_all(quiesce_request_count,
5042 quiesce_requests,
5043 quiesce_statuses) ) ) {
5044 exit_status = ret;
5045 goto cleanup;
5046 }
5047 opal_cr_stall_check = prev_stall;
5048
5049 /*
5050 * Send ACKs to all peers
5051 *
5052 * Remove only the already posted members of the drained list.
5053 * All other elements need to be left in the list since we need
5054 * to match them as new receives come in.
5055 */
5056 for(item = opal_list_get_first(&ompi_crcp_bkmrk_pml_peer_refs);
5057 item != opal_list_get_end(&ompi_crcp_bkmrk_pml_peer_refs);
5058 item = opal_list_get_next(item) ) {
5059 cur_peer_ref = (ompi_crcp_bkmrk_pml_peer_ref_t*)item;
5060
5061 if( OPAL_EQUAL == ompi_rte_compare_name_fields(OMPI_RTE_CMP_ALL,
5062 (OMPI_PROC_MY_NAME),
5063 &(cur_peer_ref->proc_name)) ) {
5064 continue;
5065 }
5066
5067 /*
5068 * Send ACK to peer if wanted
5069 */
5070 if( cur_peer_ref->ack_required ) {
5071 opal_buffer_t *buffer = NULL;
5072 size_t response = 1;
5073
5074 OPAL_OUTPUT_VERBOSE((5, mca_crcp_bkmrk_component.super.output_handle,
5075 "crcp:bkmrk: %s --> %s Send ACKs to Peer\n",
5076 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5077 OMPI_NAME_PRINT(&(cur_peer_ref->proc_name)) ));
5078
5079 /* Send All Clear to Peer */
5080 if (NULL == (buffer = OBJ_NEW(opal_buffer_t))) {
5081 exit_status = OMPI_ERROR;
5082 goto cleanup;
5083 }
5084
5085 PACK_BUFFER(buffer, response, 1, OPAL_SIZE, "");
5086
5087 /* JJH - Performance Optimization? - Why not post all isends, then wait? */
5088 if (ORTE_SUCCESS != (ret = ompi_rte_send_buffer_nb(&(cur_peer_ref->proc_name),
5089 buffer, OMPI_CRCP_COORD_BOOKMARK_TAG,
5090 orte_rml_send_callback, NULL))) {
5091 exit_status = ret;
5092 goto cleanup;
5093 }
5094 if( NULL != buffer) {
5095 OBJ_RELEASE(buffer);
5096 buffer = NULL;
5097 }
5098 }
5099
5100 cur_peer_ref->ack_required = false;
5101
5102 /*
5103 * Remove already_posted drained items
5104 */
5105 for(d_item = opal_list_get_first(&(cur_peer_ref->drained_list));
5106 d_item != opal_list_get_end(&(cur_peer_ref->drained_list));
5107 d_item = d_next ) {
5108 drain_msg_ref = (ompi_crcp_bkmrk_pml_drain_message_ref_t*)d_item;
5109 d_next = opal_list_get_next(d_item);
5110
5111 for(c_item = opal_list_get_first(&(drain_msg_ref->msg_contents));
5112 c_item != opal_list_get_end(&(drain_msg_ref->msg_contents));
5113 c_item = c_next ) {
5114 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)c_item;
5115 c_next = opal_list_get_next(c_item);
5116
5117 /*
5118 * This message has already been posted and drained in a previous
5119 * checkpoint, do not do anything to it.
5120 */
5121 if( content_ref->done ) {
5122 continue;
5123 }
5124
5125 if( content_ref->already_posted ) {
5126 drain_message_remove(cur_peer_ref, drain_msg_ref, content_ref);
5127
5128 /* Match counts in traffic_message_create_drain_message() */
5129 drain_msg_ref->active--;
5130 drain_msg_ref->already_posted--;
5131 } else {
5132 content_ref->done = true;
5133 content_ref->active = false;
5134
5135 /* Match counts in traffic_message_create_drain_message() */
5136 drain_msg_ref->done++;
5137 drain_msg_ref->active--;
5138 }
5139 }
5140 }
5141 }
5142
5143 cleanup:
5144 if( NULL != quiesce_requests ) {
5145 free(quiesce_requests);
5146 quiesce_requests = NULL;
5147 }
5148
5149 if( NULL != quiesce_statuses ) {
5150 free(quiesce_statuses);
5151 quiesce_statuses = NULL;
5152 }
5153
5154 quiesce_request_count = 0;
5155
5156 return exit_status;
5157 }
5158
coord_request_wait_all(size_t count,ompi_request_t ** requests,ompi_status_public_t ** statuses)5159 static int coord_request_wait_all( size_t count,
5160 ompi_request_t ** requests,
5161 ompi_status_public_t ** statuses )
5162 {
5163 int exit_status = OMPI_SUCCESS;
5164 ompi_status_public_t * status;
5165 ompi_request_t *req;
5166 size_t i;
5167
5168 /*
5169 * Just wait on each request in order
5170 */
5171 for( i = 0; i < count; ++i) {
5172 req = requests[i];
5173 status = statuses[i];
5174
5175 coord_request_wait(req, status);
5176
5177 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
5178 "crcp:bkmrk: %s Request Wait: Done with idx %d of %d\n",
5179 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5180 (int)i, (int)count));
5181 }
5182
5183 return exit_status;
5184 }
5185
coord_request_wait(ompi_request_t * req,ompi_status_public_t * status)5186 static int coord_request_wait( ompi_request_t * req,
5187 ompi_status_public_t * status)
5188 {
5189 ompi_request_wait_completion(req);
5190
5191 if( MPI_STATUS_IGNORE != status ) {
5192 status->MPI_TAG = req->req_status.MPI_TAG;
5193 status->MPI_SOURCE = req->req_status.MPI_SOURCE;
5194 status->_cancelled = req->req_status._cancelled;
5195 status->_ucount = req->req_status._ucount;
5196 }
5197
5198 return OMPI_SUCCESS;
5199 }
5200
wait_quiesce_drain_ack(void)5201 static int wait_quiesce_drain_ack(void)
5202 {
5203 opal_list_item_t* item = NULL;
5204 opal_list_item_t* next = NULL;
5205 ompi_crcp_bkmrk_pml_drain_message_ack_ref_t * drain_msg_ack;
5206 int num_outstanding;
5207
5208 /* YYY JJH YYY Should we wait explicitly on the send requests pending first? */
5209
5210 num_outstanding = opal_list_get_size(&drained_msg_ack_list);
5211 if(num_outstanding <= 0) {
5212 /* Nothing to do */
5213 return OMPI_SUCCESS;
5214 }
5215
5216 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5217 "crcp:bkmrk: %s Waiting on %d Drain ACK messages\n",
5218 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5219 num_outstanding));
5220
5221 while(0 < num_outstanding) {
5222 for(item = opal_list_get_first(&drained_msg_ack_list);
5223 item != opal_list_get_end(&drained_msg_ack_list);
5224 item = next) {
5225 drain_msg_ack = (ompi_crcp_bkmrk_pml_drain_message_ack_ref_t*)item;
5226 next = opal_list_get_next(item);
5227
5228 if(drain_msg_ack->complete) {
5229 num_outstanding--;
5230 opal_list_remove_item(&drained_msg_ack_list, &(drain_msg_ack->super) );
5231 HOKE_DRAIN_ACK_MSG_REF_RETURN(item);
5232 break;
5233 }
5234 }
5235
5236 opal_event_loop(opal_sync_event_base, OPAL_EVLOOP_NONBLOCK);
5237 }
5238
5239 /* Clear the ack queue if it isn't already clear (it should already be) */
5240 while (NULL != (item = opal_list_remove_first(&drained_msg_ack_list) ) ) {
5241 HOKE_DRAIN_ACK_MSG_REF_RETURN(item);
5242 }
5243
5244 return OMPI_SUCCESS;
5245 }
5246
5247 /* Paired with recv_bookmarks */
send_bookmarks(int peer_idx)5248 static int send_bookmarks(int peer_idx)
5249 {
5250 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref;
5251 ompi_process_name_t peer_name;
5252 opal_buffer_t *buffer = NULL;
5253 int exit_status = OMPI_SUCCESS;
5254 int ret;
5255
5256 START_TIMER(CRCP_TIMER_CKPT_EX_PEER_S);
5257 /*
5258 * Find the peer structure for this peer
5259 */
5260 peer_name.jobid = OMPI_PROC_MY_NAME->jobid;
5261 peer_name.vpid = peer_idx;
5262
5263 if( NULL == (peer_ref = find_peer(peer_name))) {
5264 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5265 "crcp:bkmrk: send_bookmarks: Error: Could not find peer indexed %d\n",
5266 peer_idx);
5267 exit_status = OMPI_ERROR;
5268 goto cleanup;
5269 }
5270
5271 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
5272 "crcp:bkmrk: %s --> %s Sending bookmark (S[%6d] R[%6d])\n",
5273 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5274 OMPI_NAME_PRINT(&peer_name),
5275 peer_ref->total_msgs_sent,
5276 peer_ref->total_msgs_recvd));
5277
5278 /*
5279 * Send the bookmarks to peer
5280 */
5281 if (NULL == (buffer = OBJ_NEW(opal_buffer_t))) {
5282 exit_status = OMPI_ERROR;
5283 goto cleanup;
5284 }
5285
5286 PACK_BUFFER(buffer, (peer_ref->total_msgs_sent), 1, OPAL_UINT32,
5287 "crcp:bkmrk: send_bookmarks: Unable to pack total_msgs_sent");
5288 PACK_BUFFER(buffer, (peer_ref->total_msgs_recvd), 1, OPAL_UINT32,
5289 "crcp:bkmrk: send_bookmarks: Unable to pack total_msgs_recvd");
5290
5291 if (ORTE_SUCCESS != (ret = ompi_rte_send_buffer_nb(&peer_name, buffer,
5292 OMPI_CRCP_COORD_BOOKMARK_TAG,
5293 orte_rml_send_callback, NULL))) {
5294 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5295 "crcp:bkmrk: send_bookmarks: Failed to send bookmark to peer %s: Return %d\n",
5296 OMPI_NAME_PRINT(&peer_name),
5297 ret);
5298 exit_status = ret;
5299 goto cleanup;
5300 }
5301
5302 cleanup:
5303 if(NULL != buffer) {
5304 OBJ_RELEASE(buffer);
5305 }
5306
5307 END_TIMER(CRCP_TIMER_CKPT_EX_PEER_S);
5308 DISPLAY_INDV_TIMER(CRCP_TIMER_CKPT_EX_PEER_S, peer_idx, 1);
5309
5310 return exit_status;
5311 }
5312
5313 /* Paired with send_bookmarks */
recv_bookmarks(int peer_idx)5314 static int recv_bookmarks(int peer_idx)
5315 {
5316 ompi_process_name_t peer_name;
5317
5318 START_TIMER(CRCP_TIMER_CKPT_EX_PEER_R);
5319
5320 peer_name.jobid = OMPI_PROC_MY_NAME->jobid;
5321 peer_name.vpid = peer_idx;
5322
5323 ompi_rte_recv_buffer_nb(&peer_name, OMPI_CRCP_COORD_BOOKMARK_TAG,
5324 0, recv_bookmarks_cbfunc, NULL);
5325
5326 ++total_recv_bookmarks;
5327
5328 END_TIMER(CRCP_TIMER_CKPT_EX_PEER_R);
5329 /* JJH Doesn't make much sense to print this. The real bottleneck is always the send_bookmarks() */
5330 /*DISPLAY_INDV_TIMER(CRCP_TIMER_CKPT_EX_PEER_R, peer_idx, 1);*/
5331
5332 return OMPI_SUCCESS;
5333 }
5334
recv_bookmarks_cbfunc(int status,ompi_process_name_t * sender,opal_buffer_t * buffer,ompi_rml_tag_t tag,void * cbdata)5335 static void recv_bookmarks_cbfunc(int status,
5336 ompi_process_name_t* sender,
5337 opal_buffer_t *buffer,
5338 ompi_rml_tag_t tag,
5339 void* cbdata)
5340 {
5341 ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref;
5342 int exit_status = OMPI_SUCCESS;
5343 int ret, tmp_int;
5344 ompi_vpid_t peer_idx;
5345
5346 peer_idx = sender->vpid;
5347
5348 /*
5349 * Find the peer structure for this peer
5350 */
5351 if( NULL == (peer_ref = find_peer(*sender))) {
5352 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5353 "crcp:bkmrk: recv_bookmarks: Could not find peer indexed %d\n",
5354 peer_idx);
5355 exit_status = OMPI_ERROR;
5356 goto cleanup;
5357 }
5358
5359 UNPACK_BUFFER(buffer, tmp_int, 1, OPAL_UINT32,
5360 "crcp:bkmrk: recv_bookmarks: Unable to unpack total_msgs_sent");
5361 peer_ref->matched_msgs_sent = tmp_int;
5362
5363 UNPACK_BUFFER(buffer, tmp_int, 1, OPAL_UINT32,
5364 "crcp:bkmrk: recv_bookmarks: Unable to unpack total_msgs_recvd");
5365 peer_ref->matched_msgs_recvd = tmp_int;
5366
5367 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
5368 "crcp:bkmrk: %s <-- %s Received bookmark (S[%6d] R[%6d]) vs. (S[%6d] R[%6d]) (%d)\n",
5369 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5370 OMPI_NAME_PRINT(sender),
5371 peer_ref->matched_msgs_sent,
5372 peer_ref->matched_msgs_recvd,
5373 peer_ref->total_msgs_sent,
5374 peer_ref->total_msgs_recvd,
5375 exit_status));
5376
5377 cleanup:
5378 --total_recv_bookmarks;
5379
5380 return;
5381 }
5382
send_msg_details(ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref,int total_sent,int total_matched)5383 static int send_msg_details(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
5384 int total_sent, int total_matched)
5385 {
5386 int ret, exit_status = OMPI_SUCCESS;
5387 ompi_crcp_bkmrk_pml_drain_message_ack_ref_t * d_msg_ack = NULL;
5388 opal_list_t *search_list = NULL;
5389 opal_list_item_t* msg_item = NULL;
5390 bool finished;
5391 int pass_num = 1;
5392 int need, found;
5393 int total_details_sent = 0;
5394 int num_matches = 0;
5395 int p_total_found = 0;
5396
5397 need = total_sent - total_matched;
5398 found = 0;
5399 finished = false;
5400 assert(need > 0);
5401
5402 START_TIMER(CRCP_TIMER_CKPT_CHECK_PEER_S);
5403
5404 /*
5405 * Check the 'send_list' for this peer
5406 */
5407 search_list = &(peer_ref->send_list);
5408 pass_num = 1;
5409
5410 SEARCH_AGAIN:
5411 for(msg_item = opal_list_get_last(search_list);
5412 msg_item != opal_list_get_begin(search_list);
5413 msg_item = opal_list_get_prev(msg_item) ) {
5414 ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref;
5415 msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)msg_item;
5416
5417 num_matches = 0;
5418
5419 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5420 "crcp:bkmrk: send_msg_details: Stage 1: [M/A/D/AD] [%3d/%3d/%3d/%3d] (%s)",
5421 msg_ref->matched, msg_ref->active, msg_ref->done, msg_ref->active_drain,
5422 (msg_ref->msg_type == COORD_MSG_TYPE_B_SEND ? " Send" :
5423 (msg_ref->msg_type == COORD_MSG_TYPE_I_SEND ? "iSend" : "pSend"))
5424 ));
5425
5426 /* If this message has not seen any traffic, then skip it */
5427 if( 0 >= (msg_ref->active + msg_ref->done) ) {
5428 continue;
5429 }
5430 /* YYY JJH YYY Keep this as a sanity check? if( msg_ref->matched >= (msg_ref->active + msg_ref->done) ) { continue; } */
5431
5432 if(OMPI_SUCCESS != (ret = do_send_msg_detail(peer_ref, msg_ref, &num_matches, &p_total_found, &finished)) ) {
5433 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5434 "crcp:bkmrk: send_msg_details: %s --> %s Failed to send message details to peer. Return %d\n",
5435 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5436 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5437 ret);
5438 }
5439
5440 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5441 "crcp:bkmrk: send_msg_details: Stage 2: [M/A/D/AD] [%3d/%3d/%3d/%3d] (%s) [%3d, %3d, %s] [%3d, %3d]",
5442 msg_ref->matched, msg_ref->active, msg_ref->done, msg_ref->active_drain,
5443 (msg_ref->msg_type == COORD_MSG_TYPE_B_SEND ? " Send" :
5444 (msg_ref->msg_type == COORD_MSG_TYPE_I_SEND ? "iSend" : "pSend")),
5445 num_matches, p_total_found, (finished ? "T" : "F"),
5446 total_details_sent, found
5447 ));
5448
5449 total_details_sent += num_matches;
5450 if(0 < num_matches ) {
5451 found += num_matches;
5452 }
5453 if(finished) {
5454 goto ALL_SENT;
5455 }
5456 }
5457
5458 /*
5459 * We tried the 'send_list' and need more,
5460 * so match off the 'isend_list'
5461 */
5462 if( 1 == pass_num ) {
5463 search_list = &(peer_ref->isend_list);
5464 pass_num = 2;
5465 goto SEARCH_AGAIN;
5466 }
5467
5468 /*
5469 * We tried the 'send_list' and 'isend_list' and need more,
5470 * so match off the 'send_init_list'
5471 */
5472 if( 2 == pass_num ) {
5473 search_list = &(peer_ref->send_init_list);
5474 pass_num = 3;
5475 goto SEARCH_AGAIN;
5476 }
5477
5478 ALL_SENT:
5479 if( need > found ) {
5480 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5481 "crcp:bkmrk: send_msg_details: ERROR: ****** Need (%d) vs Found (%d)",
5482 need, found));
5483 }
5484 assert(need <= found);
5485
5486 /* Prepare to post a Recv for the ACK All Clear signal from the peer
5487 * which is sent when they have finished receiving all of the
5488 * inflight messages into a local buffer
5489 */
5490 HOKE_DRAIN_ACK_MSG_REF_ALLOC(d_msg_ack);
5491 d_msg_ack->peer.jobid = peer_ref->proc_name.jobid;
5492 d_msg_ack->peer.vpid = peer_ref->proc_name.vpid;
5493
5494 d_msg_ack->complete = false;
5495 opal_list_append(&drained_msg_ack_list, &(d_msg_ack->super));
5496 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5497 "crcp:bkmrk: %s <-> %s Message Inflight! Will wait on ACK from this peer.\n",
5498 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5499 OMPI_NAME_PRINT(&(peer_ref->proc_name))));
5500
5501 END_TIMER(CRCP_TIMER_CKPT_CHECK_PEER_S);
5502 DISPLAY_INDV_TIMER(CRCP_TIMER_CKPT_CHECK_PEER_S, peer_ref->proc_name.vpid, total_details_sent);
5503
5504 return exit_status;
5505 }
5506
do_send_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref,ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref,int * num_matches,int * total_found,bool * finished)5507 static int do_send_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
5508 ompi_crcp_bkmrk_pml_traffic_message_ref_t*msg_ref,
5509 int *num_matches,
5510 int *total_found,
5511 bool *finished)
5512 {
5513 int ret, exit_status = OMPI_SUCCESS;
5514 opal_buffer_t *buffer = NULL;
5515 orte_rml_recv_cb_t *rb = NULL;
5516 int32_t recv_response = RECV_MATCH_RESP_ERROR;
5517 int32_t num_resolv = -1;
5518 int32_t p_total_found = -1;
5519 int comm_my_rank = -1;
5520 int total_sent;
5521
5522 *num_matches = 0;
5523 *total_found = 0;;
5524 *finished = false;
5525
5526 if( NULL != buffer) {
5527 OBJ_RELEASE(buffer);
5528 buffer = NULL;
5529 }
5530
5531 if (NULL == (buffer = OBJ_NEW(opal_buffer_t))) {
5532 exit_status = OMPI_ERROR;
5533 goto cleanup;
5534 }
5535
5536 /*
5537 * Send:
5538 * - Communicator Context ID
5539 * - My Rank in Communicator
5540 */
5541 comm_my_rank = ompi_comm_rank(msg_ref->comm);
5542
5543 PACK_BUFFER(buffer, msg_ref->comm->c_contextid, 1, OPAL_UINT32,
5544 "crcp:bkmrk: send_msg_details: Unable to pack communicator ID");
5545 PACK_BUFFER(buffer, comm_my_rank, 1, OPAL_INT,
5546 "crcp:bkmrk: send_msg_details: Unable to pack comm rank ID");
5547
5548 /*
5549 * Send:
5550 * - Message tag
5551 * - Message count
5552 * - Message Datatype size
5553 */
5554 PACK_BUFFER(buffer, msg_ref->tag, 1, OPAL_INT,
5555 "crcp:bkmrk: send_msg_details: Unable to pack tag");
5556 PACK_BUFFER(buffer, msg_ref->count, 1, OPAL_SIZE,
5557 "crcp:bkmrk: send_msg_details: Unable to pack count");
5558 PACK_BUFFER(buffer, msg_ref->ddt_size, 1, OPAL_SIZE,
5559 "crcp:bkmrk: send_msg_details: Unable to pack datatype size");
5560
5561 /*
5562 * Send:
5563 * - Message done
5564 * - Message active
5565 */
5566 total_sent = msg_ref->done + msg_ref->active;
5567 PACK_BUFFER(buffer, total_sent, 1, OPAL_INT,
5568 "crcp:bkmrk: send_msg_details: Unable to pack done+active count");
5569
5570 /*
5571 * Do the send...
5572 */
5573 if (ORTE_SUCCESS != (ret = ompi_rte_send_buffer_nb(&peer_ref->proc_name, buffer,
5574 OMPI_CRCP_COORD_BOOKMARK_TAG,
5575 orte_rml_send_callback, NULL))) {
5576 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5577 "crcp:bkmrk: do_send_msg_detail: Unable to send message details to peer %s: Return %d\n",
5578 OMPI_NAME_PRINT(&peer_ref->proc_name),
5579 ret);
5580
5581 exit_status = OMPI_ERROR;
5582 goto cleanup;
5583 }
5584
5585 /*
5586 * Recv the ACK msg
5587 */
5588 rb = OBJ_NEW(orte_rml_recv_cb_t);
5589 rb->active = true;
5590 ompi_rte_recv_buffer_nb(&peer_ref->proc_name, OMPI_CRCP_COORD_BOOKMARK_TAG, 0,
5591 orte_rml_recv_callback, rb);
5592 ORTE_WAIT_FOR_COMPLETION(rb->active);
5593
5594 UNPACK_BUFFER(&rb->data, recv_response, 1, OPAL_UINT32,
5595 "crcp:bkmrk: send_msg_details: Failed to unpack the ACK from peer buffer.");
5596 UNPACK_BUFFER(&rb->data, num_resolv, 1, OPAL_UINT32,
5597 "crcp:bkmrk: send_msg_details: Failed to unpack the num_resolv from peer buffer.");
5598 UNPACK_BUFFER(&rb->data, p_total_found, 1, OPAL_UINT32,
5599 "crcp:bkmrk: send_msg_details: Failed to unpack the total_found from peer buffer.");
5600
5601 OBJ_RELEASE(rb);
5602 /* Mark message as matched */
5603 msg_ref->matched += num_resolv;
5604 *num_matches = num_resolv;
5605 *total_found = p_total_found;
5606
5607 /*
5608 *
5609 */
5610 if( RECV_MATCH_RESP_DONE == recv_response ) {
5611 *finished = true;
5612 }
5613 else if( RECV_MATCH_RESP_MORE == recv_response ) {
5614 *finished = false;
5615 }
5616
5617 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
5618 "**************************\n"));
5619 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
5620 "send_msg_details: %d, %d = %s [%d / %d]\n",
5621 *num_matches, *total_found,
5622 (*finished ? "Done" : "Continue..."),
5623 msg_ref->done, msg_ref->active));
5624 TRAFFIC_MSG_DUMP_MSG_INDV(15, (msg_ref, "", false));
5625 OPAL_OUTPUT_VERBOSE((15, mca_crcp_bkmrk_component.super.output_handle,
5626 "**************************\n"));
5627
5628 cleanup:
5629 return exit_status;
5630 }
5631
5632 /*
5633 * Recv message details from peer
5634 * - matched with send_msg_details()
5635 */
recv_msg_details(ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref,int total_recv,int total_matched)5636 static int recv_msg_details(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
5637 int total_recv, int total_matched)
5638 {
5639 int need, found;
5640 int response;
5641 int exit_status = OMPI_SUCCESS;
5642 int ret;
5643 int total_details_recv = 0;
5644
5645 need = total_recv - total_matched;
5646 found = 0;
5647
5648 assert( need > 0);
5649
5650 START_TIMER(CRCP_TIMER_CKPT_CHECK_PEER_R);
5651
5652 /*
5653 * While we are still looking for messages to drain
5654 */
5655 while(need > found) {
5656 uint32_t p_comm_id;
5657 size_t p_count;
5658 size_t p_datatype_size;
5659 int p_rank;
5660 int p_tag;
5661 int p_num_sent;
5662 int num_resolved = 0;
5663
5664 /*
5665 * Receive the message details from peer
5666 */
5667 if( OMPI_SUCCESS != (ret = do_recv_msg_detail(peer_ref,
5668 &p_rank, &p_comm_id,
5669 &p_tag, &p_count,
5670 &p_datatype_size,
5671 &p_num_sent)) ) {
5672 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5673 "crcp:bkmrk: recv_msg_details: %s <-- %s "
5674 "Failed to receive message detail from peer. Return %d\n",
5675 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5676 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5677 ret);
5678 exit_status = ret;
5679 goto cleanup;
5680 }
5681
5682 /*
5683 * Determine if we have matched this message or not.
5684 * Also take approprate action.
5685 */
5686 num_resolved = 0;
5687 if( OMPI_SUCCESS != (ret = do_recv_msg_detail_check_drain(peer_ref,
5688 p_rank, p_comm_id,
5689 p_tag, p_count,
5690 p_datatype_size,
5691 p_num_sent,
5692 &num_resolved)) ) {
5693 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5694 "crcp:bkmrk: recv_msg_details: %s <-- %s "
5695 "Failed to check message detail from peer. Return %d\n",
5696 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5697 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5698 ret);
5699 exit_status = ret;
5700 goto cleanup;
5701 }
5702
5703 found += num_resolved;
5704 total_details_recv += num_resolved;
5705
5706 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5707 "crcp:bkmrk: %s <-- %s Recv Detail: Stage --: [%3d / %3d] [%3d, %3d, %s]",
5708 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5709 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5710 need, found,
5711 num_resolved, total_details_recv,
5712 ( need <= found ? "T" : "F") ));
5713
5714 /* If we do not need any more, respond DONE */
5715 if( need <= found ) {
5716 response = RECV_MATCH_RESP_DONE; /* All done */
5717 }
5718 /* Otherwise respond need more */
5719 else {
5720 response = RECV_MATCH_RESP_MORE;
5721 }
5722
5723 if(OMPI_SUCCESS != (ret = do_recv_msg_detail_resp(peer_ref, response, num_resolved, found))) {
5724 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5725 "crcp:bkmrk: recv_msg_details: %s <-- %s Failed to respond to peer. Return %d\n",
5726 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5727 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5728 ret);
5729 exit_status = ret;
5730 goto cleanup;
5731 }
5732 }
5733
5734 cleanup:
5735
5736 END_TIMER(CRCP_TIMER_CKPT_CHECK_PEER_R);
5737 DISPLAY_INDV_TIMER(CRCP_TIMER_CKPT_CHECK_PEER_R, peer_ref->proc_name.vpid, total_details_recv);
5738
5739 return exit_status;
5740 }
5741
do_recv_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref,int * rank,uint32_t * comm_id,int * tag,size_t * count,size_t * datatype_size,int * p_num_sent)5742 static int do_recv_msg_detail(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
5743 int *rank, uint32_t *comm_id, int *tag,
5744 size_t *count, size_t *datatype_size,
5745 int *p_num_sent)
5746 {
5747 orte_rml_recv_cb_t *rb = NULL;
5748 int exit_status = OMPI_SUCCESS;
5749 int ret;
5750
5751 /*
5752 * Recv the msg
5753 */
5754 rb = OBJ_NEW(orte_rml_recv_cb_t);
5755 rb->active = true;
5756 ompi_rte_recv_buffer_nb(&peer_ref->proc_name, OMPI_CRCP_COORD_BOOKMARK_TAG, 0, orte_rml_recv_callback, rb);
5757 ORTE_WAIT_FOR_COMPLETION(rb->active);
5758
5759 /* Pull out the communicator ID */
5760 UNPACK_BUFFER(&rb->data, (*comm_id), 1, OPAL_UINT32,
5761 "crcp:bkmrk: recv_msg_details: Failed to unpack the communicator ID");
5762 UNPACK_BUFFER(&rb->data, (*rank), 1, OPAL_INT,
5763 "crcp:bkmrk: recv_msg_details: Failed to unpack the communicator rank ID");
5764
5765 /* Pull out the message details */
5766 UNPACK_BUFFER(&rb->data, (*tag), 1, OPAL_INT,
5767 "crcp:bkmrk: recv_msg_details: Failed to unpack the tag");
5768 UNPACK_BUFFER(&rb->data, (*count), 1, OPAL_SIZE,
5769 "crcp:bkmrk: recv_msg_details: Failed to unpack the count");
5770 UNPACK_BUFFER(&rb->data, (*datatype_size), 1, OPAL_SIZE,
5771 "crcp:bkmrk: recv_msg_details: Failed to unpack the datatype size");
5772
5773 /* Pull out the counts */
5774 UNPACK_BUFFER(&rb->data, (*p_num_sent), 1, OPAL_INT,
5775 "crcp:bkmrk: recv_msg_details: Failed to unpack the sent count");
5776
5777 cleanup:
5778 OBJ_RELEASE(rb);
5779 return exit_status;
5780 }
5781
do_recv_msg_detail_check_drain(ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref,int rank,uint32_t comm_id,int tag,size_t count,size_t datatype_size,int p_num_sent,int * num_resolved)5782 static int do_recv_msg_detail_check_drain(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
5783 int rank, uint32_t comm_id, int tag,
5784 size_t count, size_t datatype_size,
5785 int p_num_sent,
5786 int *num_resolved)
5787 {
5788 int ret, exit_status = OMPI_SUCCESS;
5789 ompi_crcp_bkmrk_pml_traffic_message_ref_t *posted_tmp_msg_ref = NULL;
5790 ompi_crcp_bkmrk_pml_traffic_message_ref_t *posted_recv_msg_ref = NULL;
5791 ompi_crcp_bkmrk_pml_traffic_message_ref_t *posted_irecv_msg_ref = NULL;
5792 ompi_crcp_bkmrk_pml_traffic_message_ref_t *posted_precv_msg_ref = NULL;
5793 ompi_crcp_bkmrk_pml_traffic_message_ref_t *posted_unknown_recv_msg_ref = NULL;
5794 ompi_crcp_bkmrk_pml_traffic_message_ref_t *posted_unknown_precv_msg_ref = NULL;
5795 /* Number of messages left not-matched */
5796 int num_left_unresolved = 0;
5797 /* Number of active messages need to be drained */
5798 int num_still_active = 0;
5799 /* Number of drain messages posted */
5800 int num_posted = 0;
5801
5802 *num_resolved = 0;
5803 num_left_unresolved = p_num_sent;
5804
5805 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5806 "crcp:bkmrk: %s <-- %s "
5807 "Stage 0: Ck.Drain: [TR %3d/MS %3d] sent %4d, unres %4d, res %4d",
5808 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5809 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5810 peer_ref->total_msgs_recvd,
5811 peer_ref->matched_msgs_sent,
5812 p_num_sent,
5813 num_left_unresolved,
5814 *num_resolved));
5815 TRAFFIC_MSG_DUMP_PEER(15, (peer_ref, "Recv Check...", true));
5816
5817 /*
5818 * Find all references to this message signature.
5819 */
5820 ret = traffic_message_find_recv(peer_ref, /* Peer to resolve with */
5821 rank, comm_id, tag, count, datatype_size, /* Message signature */
5822 &posted_recv_msg_ref, /* One of 5 lists where this signature could match */
5823 &posted_irecv_msg_ref,
5824 &posted_precv_msg_ref,
5825 &posted_unknown_recv_msg_ref,
5826 &posted_unknown_precv_msg_ref);
5827 if( OMPI_SUCCESS != ret) {
5828 opal_output(mca_crcp_bkmrk_component.super.output_handle,
5829 "crcp:bkmrk: recv_msg_detail_check: %s -- %s "
5830 "Failed to determine if we have received this message. Return %d\n",
5831 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5832 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5833 ret);
5834 exit_status = ret;
5835 goto cleanup;
5836 }
5837
5838 /*
5839 * Peer sent 'p_num_sent'.
5840 * For each msg_ref from recv lists:
5841 * Mark all as 'matched'
5842 * Subtract recv->{'active' + 'done'} from 'p_num_sent'
5843 * If recv->active
5844 * need to make sure to drain these and possibly stall
5845 * If 'p_num_sent' > 0
5846 * Post outstanding messages in drain queue
5847 */
5848
5849 /*
5850 * First pass: Count all 'done'
5851 */
5852 if( NULL != posted_recv_msg_ref ) {
5853 posted_recv_msg_ref->matched += posted_recv_msg_ref->done;
5854 num_left_unresolved -= posted_recv_msg_ref->done;
5855 TRAFFIC_MSG_DUMP_MSG_INDV(11, (posted_recv_msg_ref, "Ck. Recv", true));
5856 }
5857 if( NULL != posted_irecv_msg_ref ) {
5858 posted_irecv_msg_ref->matched += posted_irecv_msg_ref->done;
5859 num_left_unresolved -= posted_irecv_msg_ref->done;
5860 TRAFFIC_MSG_DUMP_MSG_INDV(11, (posted_irecv_msg_ref, "Ck. iRecv", true));
5861 }
5862 if( NULL != posted_precv_msg_ref ) {
5863 posted_precv_msg_ref->matched += posted_precv_msg_ref->done;
5864 num_left_unresolved -= posted_precv_msg_ref->done;
5865 TRAFFIC_MSG_DUMP_MSG_INDV(11, (posted_precv_msg_ref, "Ck. pRecv", true));
5866 }
5867 if( NULL != posted_unknown_recv_msg_ref ) {
5868 posted_unknown_recv_msg_ref->matched += posted_unknown_recv_msg_ref->done;
5869 num_left_unresolved -= posted_unknown_recv_msg_ref->done;
5870 TRAFFIC_MSG_DUMP_MSG_INDV(11, (posted_unknown_recv_msg_ref, "Ck. uRecv", true));
5871 }
5872 if( NULL != posted_unknown_precv_msg_ref ) {
5873 posted_unknown_precv_msg_ref->matched += posted_unknown_precv_msg_ref->done;
5874 num_left_unresolved -= posted_unknown_precv_msg_ref->done;
5875 TRAFFIC_MSG_DUMP_MSG_INDV(11, (posted_unknown_precv_msg_ref, "Ck. upRecv", true));
5876 }
5877
5878 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5879 "crcp:bkmrk: %s <-- %s "
5880 "Stage 1: Ck.Drain: [TR %3d/MS %3d] sent %4d, unres %4d, res %4d",
5881 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5882 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5883 peer_ref->total_msgs_recvd,
5884 peer_ref->matched_msgs_sent,
5885 p_num_sent,
5886 num_left_unresolved,
5887 *num_resolved));
5888
5889 /* Short cut if we have completed everything necessary
5890 * This should never happen since we are here because there is a message
5891 * that was sent that has not been started.
5892 */
5893 if( num_left_unresolved <= 0 ) {
5894 goto cleanup;
5895 }
5896
5897 /*
5898 * Next pass: Count all 'active'
5899 * - if active > unresolved then match all unresolved, and jump to end
5900 * - if active < unresolved then match all active, and continue looking
5901 */
5902 if( NULL != posted_recv_msg_ref ) {
5903 if( posted_recv_msg_ref->active > num_left_unresolved ) {
5904 posted_recv_msg_ref->matched += num_left_unresolved;
5905 num_still_active += num_left_unresolved;
5906 num_left_unresolved = 0;
5907 } else {
5908 posted_recv_msg_ref->matched += posted_recv_msg_ref->active;
5909 num_still_active += posted_recv_msg_ref->active;
5910 num_left_unresolved -= posted_recv_msg_ref->active;
5911 }
5912 }
5913 if( num_left_unresolved > 0 && NULL != posted_irecv_msg_ref ) {
5914 if( posted_irecv_msg_ref->active > num_left_unresolved ) {
5915 posted_irecv_msg_ref->matched += num_left_unresolved;
5916 num_still_active += num_left_unresolved;
5917 num_left_unresolved = 0;
5918 } else {
5919 posted_irecv_msg_ref->matched += posted_irecv_msg_ref->active;
5920 num_still_active += posted_irecv_msg_ref->active;
5921 num_left_unresolved -= posted_irecv_msg_ref->active;
5922 }
5923 }
5924 if( num_left_unresolved > 0 && NULL != posted_precv_msg_ref ) {
5925 if( posted_precv_msg_ref->active > num_left_unresolved ) {
5926 posted_precv_msg_ref->matched += num_left_unresolved;
5927 num_still_active += num_left_unresolved;
5928 num_left_unresolved = 0;
5929 } else {
5930 posted_precv_msg_ref->matched += posted_precv_msg_ref->active;
5931 num_still_active += posted_precv_msg_ref->active;
5932 num_left_unresolved -= posted_precv_msg_ref->active;
5933 }
5934 }
5935 if( num_left_unresolved > 0 && NULL != posted_unknown_recv_msg_ref ) {
5936 if( posted_unknown_recv_msg_ref->active > num_left_unresolved ) {
5937 posted_unknown_recv_msg_ref->matched += num_left_unresolved;
5938 num_still_active += num_left_unresolved;
5939 num_left_unresolved = 0;
5940 } else {
5941 posted_unknown_recv_msg_ref->matched += posted_unknown_recv_msg_ref->active;
5942 num_still_active += posted_unknown_recv_msg_ref->active;
5943 num_left_unresolved -= posted_unknown_recv_msg_ref->active;
5944 }
5945 }
5946 if( num_left_unresolved > 0 && NULL != posted_unknown_precv_msg_ref ) {
5947 if( posted_unknown_precv_msg_ref->active > num_left_unresolved ) {
5948 posted_unknown_precv_msg_ref->matched += num_left_unresolved;
5949 num_still_active += num_left_unresolved;
5950 num_left_unresolved = 0;
5951 } else {
5952 posted_unknown_precv_msg_ref->matched += posted_unknown_precv_msg_ref->active;
5953 num_still_active += posted_unknown_precv_msg_ref->active;
5954 num_left_unresolved -= posted_unknown_precv_msg_ref->active;
5955 }
5956 }
5957
5958 /*
5959 * If we happen to have more active Recvs than the peer has posted sends, then
5960 * we need to reset the number still active to reflect that only a subset
5961 * of the active sends should be drained.
5962 */
5963 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
5964 "crcp:bkmrk: %s <-- %s "
5965 "Stage 2: Ck.Drain: [TR %3d/MS %3d] sent %4d, unres %4d, res %4d, active %4d",
5966 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
5967 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
5968 peer_ref->total_msgs_recvd,
5969 peer_ref->matched_msgs_sent,
5970 p_num_sent,
5971 num_left_unresolved,
5972 *num_resolved,
5973 num_still_active
5974 ));
5975
5976 /*
5977 * Check the math at this point, and make sure we did not mess up above.
5978 */
5979 if(num_left_unresolved < 0 ) {
5980 ERROR_SHOULD_NEVER_HAPPEN_ARG("crcp:bkmrk: Ck.Drain: Unresolved (%3d) < 0", num_left_unresolved);
5981 exit_status = OMPI_ERROR;
5982 goto cleanup;
5983 }
5984
5985 /*
5986 * Fast Track: If there are no outstanding messages to post, and nothing 'active'
5987 * If all the matched messages were found 'done' (none were 'active')
5988 * -> Nothing to do.
5989 */
5990 if( num_left_unresolved <= 0 &&
5991 num_still_active <= 0) {
5992 goto cleanup;
5993 }
5994
5995 /*
5996 * Stage 3: Resolve 'active' messages by posting a drain message for each
5997 * -> then we need to make sure to wait for them to complete before the checkpoint
5998 * -> Create a drain message
5999 * -> Point the 'request' at it
6000 * -> Make sure not to post this message to be drained, but just wait on the request.
6001 */
6002 if( num_still_active > 0 ) {
6003 /*
6004 * If this is the current blocking recv, then we need to stall for it to
6005 * complete properly.
6006 * - Only applies to Blocking Recv.
6007 */
6008 if( NULL != posted_recv_msg_ref ) {
6009 /* Is this the signature of the current blocking recv? */
6010 if (current_msg_id == posted_recv_msg_ref->msg_id &&
6011 COORD_MSG_TYPE_B_RECV == posted_recv_msg_ref->msg_type) {
6012 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
6013 "crcp:bkmrk: %s <-- %s "
6014 "Recv Check: Found a message that is 'active'! Prepare to STALL.\n",
6015 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
6016 OMPI_NAME_PRINT(&(peer_ref->proc_name)) ));
6017 stall_for_completion = true;
6018 }
6019 else {
6020 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
6021 "crcp:bkmrk: %s <-- %s "
6022 "Recv Check: Found a message that is 'active', but is not the current recv! "
6023 "No stall required [%3d, %3d, %3d, %3d].\n",
6024 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
6025 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
6026 (int)current_msg_id,
6027 (int)current_msg_type,
6028 (int)posted_recv_msg_ref->msg_id,
6029 (int)posted_recv_msg_ref->msg_type));
6030 }
6031 }
6032
6033 /*
6034 * Construct a message for draining for each active message.
6035 * This message will *not* be posted for draining since it is already
6036 * posted in the system. We will simply wait for it to complete.
6037 * - Only applies to messages that are not Blocking Recv
6038 */
6039 traffic_message_create_drain_message(false, num_still_active,
6040 peer_ref,
6041 &posted_recv_msg_ref,
6042 &num_posted);
6043 num_still_active -= num_posted;
6044 *num_resolved += num_posted;
6045 peer_ref->total_msgs_recvd += num_posted;
6046
6047 traffic_message_create_drain_message(false, num_still_active,
6048 peer_ref,
6049 &posted_irecv_msg_ref,
6050 &num_posted);
6051 num_still_active -= num_posted;
6052 *num_resolved += num_posted;
6053 peer_ref->total_msgs_recvd += num_posted;
6054
6055 traffic_message_create_drain_message(false, num_still_active,
6056 peer_ref,
6057 &posted_precv_msg_ref,
6058 &num_posted);
6059 num_still_active -= num_posted;
6060 *num_resolved += num_posted;
6061 peer_ref->total_msgs_recvd += num_posted;
6062
6063 traffic_message_create_drain_message(false, num_still_active,
6064 peer_ref,
6065 &posted_unknown_recv_msg_ref,
6066 &num_posted);
6067 num_still_active -= num_posted;
6068 *num_resolved += num_posted;
6069 peer_ref->total_msgs_recvd += num_posted;
6070
6071 traffic_message_create_drain_message(false, num_still_active,
6072 peer_ref,
6073 &posted_unknown_precv_msg_ref,
6074 &num_posted);
6075 num_still_active -= num_posted;
6076 *num_resolved += num_posted;
6077 peer_ref->total_msgs_recvd += num_posted;
6078 }
6079
6080 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
6081 "crcp:bkmrk: %s <-- %s "
6082 "Stage 3: Ck.Drain: [TR %3d/MS %3d] sent %4d, unres %4d, res %4d, active %4d",
6083 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
6084 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
6085 peer_ref->total_msgs_recvd,
6086 peer_ref->matched_msgs_sent,
6087 p_num_sent,
6088 num_left_unresolved,
6089 *num_resolved,
6090 num_still_active
6091 ));
6092
6093 /*
6094 * Post all unresolved messages to the drain queue
6095 * - Create a new message to drain
6096 * - Notify peer of resolution of N messages
6097 */
6098 if( num_left_unresolved > 0 ) {
6099 /* Create a stamp for the drained message */
6100 CREATE_NEW_MSG(posted_tmp_msg_ref, COORD_MSG_TYPE_I_RECV,
6101 count, datatype_size, tag, rank,
6102 ompi_comm_lookup(comm_id),
6103 peer_ref->proc_name.jobid,
6104 peer_ref->proc_name.vpid);
6105
6106 traffic_message_create_drain_message(true, num_left_unresolved,
6107 peer_ref,
6108 &posted_tmp_msg_ref,
6109 &num_posted);
6110 num_left_unresolved -= num_posted;
6111 *num_resolved += num_posted;
6112 peer_ref->total_msgs_recvd += num_posted;
6113
6114 HOKE_TRAFFIC_MSG_REF_RETURN(posted_tmp_msg_ref);
6115 }
6116
6117 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
6118 "crcp:bkmrk: %s <-- %s "
6119 "Stage 4: Ck.Drain: [TR %3d/MS %3d] sent %4d, unres %4d, res %4d, active %4d",
6120 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
6121 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
6122 peer_ref->total_msgs_recvd,
6123 peer_ref->matched_msgs_sent,
6124 p_num_sent,
6125 num_left_unresolved,
6126 *num_resolved,
6127 num_still_active
6128 ));
6129
6130 /* YYY JJH YYY Should we check for no-action? */
6131 cleanup:
6132 return exit_status;
6133 }
6134
do_recv_msg_detail_resp(ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref,int resp,int num_resolv,int total_found)6135 static int do_recv_msg_detail_resp(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref,
6136 int resp, int num_resolv, int total_found)
6137 {
6138 opal_buffer_t * buffer = NULL;
6139 int exit_status = OMPI_SUCCESS;
6140 int ret;
6141
6142 if (NULL == (buffer = OBJ_NEW(opal_buffer_t))) {
6143 exit_status = OMPI_ERROR;
6144 goto cleanup;
6145 }
6146
6147 PACK_BUFFER(buffer, resp, 1, OPAL_UINT32,
6148 "crcp:bkmrk: recv_msg_details: Unable to ask peer for more messages");
6149 PACK_BUFFER(buffer, num_resolv, 1, OPAL_UINT32,
6150 "crcp:bkmrk: recv_msg_details: Unable to ask peer for more messages");
6151 PACK_BUFFER(buffer, total_found, 1, OPAL_UINT32,
6152 "crcp:bkmrk: recv_msg_details: Unable to ask peer for more messages");
6153
6154 if (ORTE_SUCCESS != (ret = ompi_rte_send_buffer_nb(&peer_ref->proc_name, buffer,
6155 OMPI_CRCP_COORD_BOOKMARK_TAG,
6156 orte_rml_send_callback, NULL))) {
6157 opal_output(mca_crcp_bkmrk_component.super.output_handle,
6158 "crcp:bkmrk: recv_msg_detail_resp: Unable to send message detail response to peer %s: Return %d\n",
6159 OMPI_NAME_PRINT(&peer_ref->proc_name),
6160 ret);
6161 exit_status = OMPI_ERROR;
6162 goto cleanup;
6163 }
6164
6165 cleanup:
6166 if( NULL != buffer) {
6167 OBJ_RELEASE(buffer);
6168 buffer = NULL;
6169 }
6170
6171 return exit_status;
6172 }
6173
6174
6175 /************************************************
6176 * Timer Utility Functions
6177 ************************************************/
start_time(int idx)6178 static void start_time(int idx) {
6179 if(idx < CRCP_TIMER_MAX ) {
6180 timer_start[idx] = get_time();
6181 }
6182 }
6183
end_time(int idx)6184 static void end_time(int idx) {
6185 if(idx < CRCP_TIMER_MAX ) {
6186 timer_end[idx] = get_time();
6187 }
6188 }
6189
get_time()6190 static double get_time() {
6191 double wtime;
6192
6193 #if OPAL_TIMER_USEC_NATIVE
6194 wtime = (double)opal_timer_base_get_usec() / 1000000.0;
6195 #else
6196 struct timeval tv;
6197 gettimeofday(&tv, NULL);
6198 wtime = tv.tv_sec;
6199 wtime += (double)tv.tv_usec / 1000000.0;
6200 #endif
6201
6202 return wtime;
6203 }
6204
clear_timers(void)6205 static void clear_timers(void) {
6206 int i;
6207 for(i = 0; i < CRCP_TIMER_MAX; ++i) {
6208 timer_start[i] = 0.0;
6209 timer_end[i] = 0.0;
6210 }
6211 }
6212
display_all_timers(int state)6213 static void display_all_timers(int state) {
6214 bool report_ready = false;
6215 double barrier_start, barrier_stop;
6216 int i, ret;
6217
6218 if( 0 != OMPI_PROC_MY_NAME->vpid ) {
6219 if( 2 > timing_enabled ) {
6220 return;
6221 }
6222 else if( 2 == timing_enabled ) {
6223 if( OPAL_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
6224 OPAL_ERROR_LOG(ret);
6225 }
6226 return;
6227 }
6228 }
6229
6230 for( i = 0; i < CRCP_TIMER_MAX; ++i) {
6231 if(timer_end[i] > 0.001) {
6232 report_ready = true;
6233 }
6234 }
6235 if( !report_ready ) {
6236 return;
6237 }
6238
6239 opal_output(0, "crcp:bkmrk: timing(%20s): ******************** Begin: [State = %12s]\n", "Summary", opal_crs_base_state_str(state));
6240 for( i = 0; i < CRCP_TIMER_MAX; ++i) {
6241 display_indv_timer_core(i, 0, 0, false);
6242 }
6243
6244 if( timing_enabled >= 2) {
6245 barrier_start = get_time();
6246 if( OPAL_SUCCESS != (ret = opal_pmix.fence(NULL, 0))) {
6247 OPAL_ERROR_LOG(ret);
6248 }
6249 barrier_stop = get_time();
6250 opal_output(0,
6251 "crcp:bkmrk: timing(%20s): %20s = %10.2f s\n",
6252 "",
6253 "Group Barrier",
6254 (barrier_stop - barrier_start));
6255 }
6256
6257 opal_output(0, "crcp:bkmrk: timing(%20s): ******************** End: [State = %12s]\n", "Summary", opal_crs_base_state_str(state));
6258
6259 }
6260
display_indv_timer(int idx,int proc,int msgs)6261 static void display_indv_timer(int idx, int proc, int msgs) {
6262 display_indv_timer_core(idx, proc, msgs, true);
6263 }
6264
display_indv_timer_core(int idx,int proc,int msgs,bool direct)6265 static void display_indv_timer_core(int idx, int proc, int msgs, bool direct) {
6266 double diff = timer_end[idx] - timer_start[idx];
6267 char * str = NULL;
6268
6269 if( 0 != OMPI_PROC_MY_NAME->vpid && timing_enabled < 3 ) {
6270 return;
6271 }
6272
6273 /* Only display the timer if an end value was set */
6274 if(timer_end[idx] <= 0.001) {
6275 return;
6276 }
6277
6278 switch(idx) {
6279 case CRCP_TIMER_CKPT_EX_PEER_S:
6280 case CRCP_TIMER_CKPT_EX_PEER_R:
6281 case CRCP_TIMER_CKPT_CHECK_PEER_S:
6282 case CRCP_TIMER_CKPT_CHECK_PEER_R:
6283 /* These timers do not mean anything in the aggregate, so only display
6284 * them when directly asked for */
6285 if( direct && timing_enabled >= 2) {
6286 asprintf(&str, "Proc %2d, Msg %5d", proc, msgs);
6287 } else {
6288 return;
6289 }
6290 break;
6291 default:
6292 str = strdup("");
6293 break;
6294 }
6295
6296 opal_output(0,
6297 "crcp:bkmrk: timing(%20s): %20s = %10.2f s\n",
6298 str,
6299 timer_label[idx],
6300 diff);
6301 free(str);
6302 str = NULL;
6303 }
6304
6305 /**************** Message Dump functionality ********************/
6306 #if OPAL_ENABLE_DEBUG
traffic_message_dump_msg_content_indv(ompi_crcp_bkmrk_pml_message_content_ref_t * content_ref)6307 static void traffic_message_dump_msg_content_indv(ompi_crcp_bkmrk_pml_message_content_ref_t * content_ref)
6308 {
6309 OPAL_OUTPUT_VERBOSE((10, mca_crcp_bkmrk_component.super.output_handle,
6310 "\t\t(%3d) Content: [A/D/P/Dr] [%s / %s / %s /%s]",
6311 (int)content_ref->msg_id,
6312 (content_ref->active ? "T" : "F"),
6313 (content_ref->done ? "T" : "F"),
6314 (content_ref->already_posted ? "T" : "F"),
6315 (content_ref->already_drained ? "T" : "F")));
6316 }
6317
traffic_message_dump_msg_indv(ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref,char * msg,bool vshort)6318 static void traffic_message_dump_msg_indv(ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref, char * msg, bool vshort)
6319 {
6320 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
6321 opal_list_item_t* cont_item = NULL;
6322 char * type_name = NULL;
6323
6324 switch(msg_ref->msg_type) {
6325 case COORD_MSG_TYPE_B_SEND:
6326 type_name = strdup(" Send");
6327 break;
6328 case COORD_MSG_TYPE_I_SEND:
6329 type_name = strdup("iSend");
6330 break;
6331 case COORD_MSG_TYPE_P_SEND:
6332 type_name = strdup("pSend");
6333 break;
6334 case COORD_MSG_TYPE_B_RECV:
6335 type_name = strdup(" Recv");
6336 break;
6337 case COORD_MSG_TYPE_I_RECV:
6338 type_name = strdup("iRecv");
6339 break;
6340 case COORD_MSG_TYPE_P_RECV:
6341 type_name = strdup("pRecv");
6342 break;
6343 default:
6344 type_name = strdup("Unknown");
6345 break;
6346 }
6347
6348 if( !vshort ) {
6349 opal_output(0, "\t%s %10s (%3d): [m %3d/d %3d/a %3d/ad %3d/p %3d] Contents %2d ... count %6d, tag %6d, rank %3d",
6350 type_name,
6351 msg,
6352 (int)msg_ref->msg_id,
6353 msg_ref->matched,
6354 msg_ref->done,
6355 msg_ref->active,
6356 msg_ref->active_drain,
6357 msg_ref->posted,
6358 (int)opal_list_get_size(&msg_ref->msg_contents),
6359 (int)msg_ref->count,
6360 msg_ref->tag,
6361 msg_ref->rank);
6362 } else {
6363 opal_output(0, "\t%s %10s (%3d): [m %3d/d %3d/a %3d/ad %3d/p %3d] Contents %2d ... count %6d",
6364 type_name,
6365 msg,
6366 (int)msg_ref->msg_id,
6367 msg_ref->matched,
6368 msg_ref->done,
6369 msg_ref->active,
6370 msg_ref->active_drain,
6371 msg_ref->posted,
6372 (int)opal_list_get_size(&msg_ref->msg_contents),
6373 (int)msg_ref->count);
6374 }
6375
6376 free(type_name);
6377
6378 for(cont_item = opal_list_get_first(&(msg_ref->msg_contents));
6379 cont_item != opal_list_get_end( &(msg_ref->msg_contents));
6380 cont_item = opal_list_get_next(cont_item) ) {
6381 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)cont_item;
6382
6383 traffic_message_dump_msg_content_indv(content_ref);
6384 }
6385 }
6386
traffic_message_dump_drain_msg_indv(ompi_crcp_bkmrk_pml_drain_message_ref_t * msg_ref,char * msg,bool vshort)6387 static void traffic_message_dump_drain_msg_indv(ompi_crcp_bkmrk_pml_drain_message_ref_t * msg_ref, char * msg, bool vshort)
6388 {
6389 ompi_crcp_bkmrk_pml_message_content_ref_t *content_ref = NULL;
6390 opal_list_item_t* cont_item = NULL;
6391 char * type_name = NULL;
6392
6393 switch(msg_ref->msg_type) {
6394 case COORD_MSG_TYPE_B_SEND:
6395 type_name = strdup(" Send");
6396 break;
6397 case COORD_MSG_TYPE_I_SEND:
6398 type_name = strdup("iSend");
6399 break;
6400 case COORD_MSG_TYPE_P_SEND:
6401 type_name = strdup("pSend");
6402 break;
6403 case COORD_MSG_TYPE_B_RECV:
6404 type_name = strdup(" Recv");
6405 break;
6406 case COORD_MSG_TYPE_I_RECV:
6407 type_name = strdup("iRecv");
6408 break;
6409 case COORD_MSG_TYPE_P_RECV:
6410 type_name = strdup("pRecv");
6411 break;
6412 default:
6413 type_name = strdup("Unknown");
6414 break;
6415 }
6416
6417 if( !vshort ) {
6418 opal_output(0, "\t%s %10s (%3d): [d %3d/a %3d] Contents %2d ... count %6d, tag %6d, rank %3d",
6419 type_name,
6420 msg,
6421 (int)msg_ref->msg_id,
6422 msg_ref->done,
6423 msg_ref->active,
6424 (int)opal_list_get_size(&msg_ref->msg_contents),
6425 (int)msg_ref->count,
6426 msg_ref->tag,
6427 msg_ref->rank);
6428 } else {
6429 opal_output(0, "\t%s %10s (%3d): [d %3d/a %3d] Contents %2d ... count %6d",
6430 type_name,
6431 msg,
6432 (int)msg_ref->msg_id,
6433 msg_ref->done,
6434 msg_ref->active,
6435 (int)opal_list_get_size(&msg_ref->msg_contents),
6436 (int)msg_ref->count);
6437 }
6438
6439 free(type_name);
6440
6441 for(cont_item = opal_list_get_first(&(msg_ref->msg_contents));
6442 cont_item != opal_list_get_end( &(msg_ref->msg_contents));
6443 cont_item = opal_list_get_next(cont_item) ) {
6444 content_ref = (ompi_crcp_bkmrk_pml_message_content_ref_t*)cont_item;
6445
6446 traffic_message_dump_msg_content_indv(content_ref);
6447 }
6448 }
6449
traffic_message_dump_msg_list(opal_list_t * msg_list,bool is_drain)6450 static void traffic_message_dump_msg_list(opal_list_t *msg_list, bool is_drain)
6451 {
6452 opal_list_item_t* item = NULL;
6453 ompi_crcp_bkmrk_pml_traffic_message_ref_t * msg_ref = NULL;
6454 ompi_crcp_bkmrk_pml_drain_message_ref_t * drain_msg_ref = NULL;
6455
6456 for(item = opal_list_get_last(msg_list);
6457 item != opal_list_get_begin(msg_list);
6458 item = opal_list_get_prev(item) ) {
6459 if( !is_drain ) {
6460 msg_ref = (ompi_crcp_bkmrk_pml_traffic_message_ref_t*)item;
6461 traffic_message_dump_msg_indv(msg_ref, "", false);
6462 } else {
6463 drain_msg_ref = (ompi_crcp_bkmrk_pml_drain_message_ref_t*)item;
6464 traffic_message_dump_drain_msg_indv(drain_msg_ref, "Drain", false);
6465 }
6466 }
6467 }
6468
traffic_message_dump_peer(ompi_crcp_bkmrk_pml_peer_ref_t * peer_ref,char * msg,bool root_only)6469 static void traffic_message_dump_peer(ompi_crcp_bkmrk_pml_peer_ref_t *peer_ref, char * msg, bool root_only)
6470 {
6471 if( root_only && ompi_process_info.my_name.vpid != 0 ) {
6472 return;
6473 } else {
6474 sleep(ompi_process_info.my_name.vpid * 2);
6475 }
6476
6477 opal_output(0, "------------- %s ---------------------------------", msg);
6478 opal_output(0, "%s <-> %s Totals Sent [ %3d / %3d ] Recv [ %3d / %3d ]",
6479 OMPI_NAME_PRINT(OMPI_PROC_MY_NAME),
6480 OMPI_NAME_PRINT(&(peer_ref->proc_name)),
6481 peer_ref->total_msgs_sent,
6482 peer_ref->matched_msgs_sent,
6483 peer_ref->total_msgs_recvd,
6484 peer_ref->matched_msgs_recvd);
6485 opal_output(0, "\n");
6486
6487 traffic_message_dump_msg_list(&(peer_ref->send_list), false);
6488 traffic_message_dump_msg_list(&(peer_ref->isend_list), false);
6489 traffic_message_dump_msg_list(&(peer_ref->send_init_list), false);
6490
6491 traffic_message_dump_msg_list(&(peer_ref->recv_list), false);
6492 traffic_message_dump_msg_list(&(peer_ref->irecv_list), false);
6493 traffic_message_dump_msg_list(&(peer_ref->recv_init_list), false);
6494
6495 traffic_message_dump_msg_list(&(peer_ref->drained_list), true);
6496
6497 opal_output(0, "--------------------------------------------------");
6498 usleep(250000);
6499 }
6500 #endif
6501