1 /*
2  * Copyright (C) by Argonne National Laboratory
3  *     See COPYRIGHT in top-level directory
4  */
5 
6 #include "mpidimpl.h"
7 #include "mpidch4r.h"
8 #include "ch4r_callbacks.h"
9 
10 static int handle_unexp_cmpl(MPIR_Request * rreq);
11 static int recv_target_cmpl_cb(MPIR_Request * rreq);
12 
MPIDIG_do_cts(MPIR_Request * rreq)13 int MPIDIG_do_cts(MPIR_Request * rreq)
14 {
15     int mpi_errno = MPI_SUCCESS;
16 
17     MPIDIG_send_cts_msg_t am_hdr;
18     am_hdr.sreq_ptr = (MPIDIG_REQUEST(rreq, req->rreq.peer_req_ptr));
19     am_hdr.rreq_ptr = rreq;
20     MPIR_Assert((void *) am_hdr.sreq_ptr != NULL);
21 
22     MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, VERBOSE,
23                     (MPL_DBG_FDEST, "do cts req %p handle=0x%x", rreq, rreq->handle));
24 
25 #ifdef MPIDI_CH4_DIRECT_NETMOD
26     mpi_errno = MPIDI_NM_am_send_hdr_reply(MPIDIG_REQUEST(rreq, context_id),
27                                            MPIDIG_REQUEST(rreq, rank), MPIDIG_SEND_CTS, &am_hdr,
28                                            sizeof(am_hdr));
29 #else
30     if (MPIDI_REQUEST(rreq, is_local)) {
31         mpi_errno = MPIDI_SHM_am_send_hdr_reply(MPIDIG_REQUEST(rreq, context_id),
32                                                 MPIDIG_REQUEST(rreq, rank),
33                                                 MPIDIG_SEND_CTS, &am_hdr, sizeof(am_hdr));
34     } else {
35         mpi_errno = MPIDI_NM_am_send_hdr_reply(MPIDIG_REQUEST(rreq, context_id),
36                                                MPIDIG_REQUEST(rreq, rank),
37                                                MPIDIG_SEND_CTS, &am_hdr, sizeof(am_hdr));
38     }
39 #endif
40     MPIR_ERR_CHECK(mpi_errno);
41 
42   fn_exit:
43     return mpi_errno;
44   fn_fail:
45     goto fn_exit;
46 }
47 
48 /* Checks to make sure that the specified request is the next one expected to finish. If it isn't
49  * supposed to finish next, it is appended to a list of requests to be retrieved later. */
MPIDIG_check_cmpl_order(MPIR_Request * req)50 int MPIDIG_check_cmpl_order(MPIR_Request * req)
51 {
52     int ret = 0;
53 
54     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_CHECK_CMPL_ORDER);
55     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_CHECK_CMPL_ORDER);
56 
57     if (MPIDIG_REQUEST(req, req->seq_no) == MPL_atomic_load_uint64(&MPIDI_global.exp_seq_no)) {
58         MPL_atomic_fetch_add_uint64(&MPIDI_global.exp_seq_no, 1);
59         ret = 1;
60         goto fn_exit;
61     }
62 
63     MPIDIG_REQUEST(req, req->request) = req;
64     /* MPIDI_CS_ENTER(); */
65     DL_APPEND(MPIDI_global.cmpl_list, req->dev.ch4.am.req);
66     /* MPIDI_CS_EXIT(); */
67 
68   fn_exit:
69     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_CHECK_CMPL_ORDER);
70     return ret;
71 }
72 
MPIDIG_progress_compl_list(void)73 void MPIDIG_progress_compl_list(void)
74 {
75     MPIR_Request *req;
76     MPIDIG_req_ext_t *curr, *tmp;
77 
78     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_PROGRESS_COMPL_LIST);
79     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_PROGRESS_COMPL_LIST);
80 
81     /* MPIDI_CS_ENTER(); */
82   do_check_again:
83     DL_FOREACH_SAFE(MPIDI_global.cmpl_list, curr, tmp) {
84         if (curr->seq_no == MPL_atomic_load_uint64(&MPIDI_global.exp_seq_no)) {
85             DL_DELETE(MPIDI_global.cmpl_list, curr);
86             req = (MPIR_Request *) curr->request;
87             MPIDIG_REQUEST(req, req->target_cmpl_cb) (req);
88             goto do_check_again;
89         }
90     }
91     /* MPIDI_CS_EXIT(); */
92     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_PROGRESS_COMPL_LIST);
93 }
94 
handle_unexp_cmpl(MPIR_Request * rreq)95 static int handle_unexp_cmpl(MPIR_Request * rreq)
96 {
97     int mpi_errno = MPI_SUCCESS, in_use;
98     MPIR_Request *match_req = NULL;
99     size_t nbytes;
100     int dt_contig;
101     MPI_Aint dt_true_lb;
102     MPIR_Datatype *dt_ptr;
103     size_t dt_sz;
104 
105     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_HANDLE_UNEXP_CMPL);
106     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_HANDLE_UNEXP_CMPL);
107 
108     /* Check if this message has already been claimed by mprobe. */
109     /* MPIDI_CS_ENTER(); */
110     if (MPIDIG_REQUEST(rreq, req->status) & MPIDIG_REQ_UNEXP_DQUED) {
111         /* This request has been claimed by mprobe */
112         if (MPIDIG_REQUEST(rreq, req->status) & MPIDIG_REQ_UNEXP_CLAIMED) {
113             /* mrecv has been already called */
114             MPIDIG_handle_unexp_mrecv(rreq);
115         } else {
116             /* mrecv has not been called yet -- just take out the busy flag so that
117              * mrecv in future knows this request is ready */
118             MPIDIG_REQUEST(rreq, req->status) &= ~MPIDIG_REQ_BUSY;
119         }
120         /* MPIDI_CS_EXIT(); */
121         goto fn_exit;
122     }
123     /* MPIDI_CS_EXIT(); */
124 
125     /* If this request was previously matched, but not handled */
126     if (MPIDIG_REQUEST(rreq, req->status) & MPIDIG_REQ_MATCHED) {
127         match_req = (MPIR_Request *) MPIDIG_REQUEST(rreq, req->rreq.match_req);
128 
129 #ifndef MPIDI_CH4_DIRECT_NETMOD
130         int is_cancelled;
131         mpi_errno = MPIDI_anysrc_try_cancel_partner(match_req, &is_cancelled);
132         MPIR_ERR_CHECK(mpi_errno);
133         /* `is_cancelled` is assumed to be always true.
134          * In typical config, anysrc partners won't occur if matching unexpected
135          * message already exist.
136          * In workq setup, since we will always progress shm first, when unexpected
137          * message match, the NM partner wouldn't have progressed yet, so the cancel
138          * should always succeed.
139          */
140         MPIR_Assert(is_cancelled);
141 #endif /* MPIDI_CH4_DIRECT_NETMOD */
142     }
143 
144     /* If we didn't match the request, unmark the busy bit and skip the data movement below. */
145     if (!match_req) {
146         MPIDIG_REQUEST(rreq, req->status) &= ~MPIDIG_REQ_BUSY;
147         goto fn_exit;
148     }
149 
150     match_req->status.MPI_SOURCE = MPIDIG_REQUEST(rreq, rank);
151     match_req->status.MPI_TAG = MPIDIG_REQUEST(rreq, tag);
152 
153     /* Figure out how much data needs to be moved. */
154     MPIDI_Datatype_get_info(MPIDIG_REQUEST(match_req, count),
155                             MPIDIG_REQUEST(match_req, datatype),
156                             dt_contig, dt_sz, dt_ptr, dt_true_lb);
157     MPIR_Datatype_get_size_macro(MPIDIG_REQUEST(match_req, datatype), dt_sz);
158 
159     /* Make sure this request has the right amount of data in it. */
160     if (MPIDIG_REQUEST(rreq, count) > dt_sz * MPIDIG_REQUEST(match_req, count)) {
161         rreq->status.MPI_ERROR = MPI_ERR_TRUNCATE;
162         nbytes = dt_sz * MPIDIG_REQUEST(match_req, count);
163     } else {
164         rreq->status.MPI_ERROR = MPI_SUCCESS;
165         nbytes = MPIDIG_REQUEST(rreq, count);   /* incoming message is always count of bytes. */
166     }
167 
168     MPIR_STATUS_SET_COUNT(match_req->status, nbytes);
169     MPIDIG_REQUEST(rreq, count) = dt_sz > 0 ? nbytes / dt_sz : 0;
170 
171     /* Perform the data copy (using the datatype engine if necessary for non-contig transfers) */
172     if (!dt_contig) {
173         MPI_Aint actual_unpack_bytes;
174         mpi_errno = MPIR_Typerep_unpack(MPIDIG_REQUEST(rreq, buffer), nbytes,
175                                         MPIDIG_REQUEST(match_req, buffer),
176                                         MPIDIG_REQUEST(match_req, count),
177                                         MPIDIG_REQUEST(match_req, datatype), 0,
178                                         &actual_unpack_bytes);
179         MPIR_ERR_CHECK(mpi_errno);
180 
181         if (actual_unpack_bytes != (MPI_Aint) nbytes) {
182             mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
183                                              __FUNCTION__, __LINE__,
184                                              MPI_ERR_TYPE, "**dtypemismatch", 0);
185             match_req->status.MPI_ERROR = mpi_errno;
186         }
187     } else {
188         MPIR_Typerep_copy((char *) MPIDIG_REQUEST(match_req, buffer) + dt_true_lb,
189                           MPIDIG_REQUEST(rreq, buffer), nbytes);
190     }
191 
192     /* Now that the unexpected message has been completed, unset the status bit. */
193     MPIDIG_REQUEST(rreq, req->status) &= ~MPIDIG_REQ_UNEXPECTED;
194 
195     /* If this is a synchronous send, send the reply back to the sender to unlock them. */
196     if (MPIDIG_REQUEST(rreq, req->status) & MPIDIG_REQ_PEER_SSEND) {
197         mpi_errno = MPIDIG_reply_ssend(rreq);
198         MPIR_ERR_CHECK(mpi_errno);
199     }
200 #ifndef MPIDI_CH4_DIRECT_NETMOD
201     MPIDI_anysrc_free_partner(match_req);
202 #endif
203 
204     MPIR_Datatype_release_if_not_builtin(MPIDIG_REQUEST(match_req, datatype));
205     if (MPIDIG_REQUEST(rreq, buffer)) {
206         /* unexp pack buf is MPI_BYTE type, count == data size */
207         MPIDU_genq_private_pool_free_cell(MPIDI_global.unexp_pack_buf_pool,
208                                           MPIDIG_REQUEST(rreq, buffer));
209     }
210     MPIR_Object_release_ref(rreq, &in_use);
211     MPID_Request_complete(rreq);
212     MPID_Request_complete(match_req);
213   fn_exit:
214     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_HANDLE_UNEXP_CMPL);
215     return mpi_errno;
216   fn_fail:
217     goto fn_exit;
218 }
219 
220 /* This function is called when a receive has completed on the receiver side. The input is the
221  * request that has been completed. */
recv_target_cmpl_cb(MPIR_Request * rreq)222 static int recv_target_cmpl_cb(MPIR_Request * rreq)
223 {
224     int mpi_errno = MPI_SUCCESS;
225 
226     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_RECV_TARGET_CMPL_CB);
227     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_RECV_TARGET_CMPL_CB);
228 
229     MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, VERBOSE,
230                     (MPL_DBG_FDEST, "req %p handle=0x%x", rreq, rreq->handle));
231 
232     /* Check if this request is supposed to complete next or if it should be delayed. */
233     if (!MPIDIG_check_cmpl_order(rreq))
234         return mpi_errno;
235 
236     MPIDIG_recv_finish(rreq);
237 
238     if (MPIDIG_REQUEST(rreq, req->status) & MPIDIG_REQ_UNEXPECTED) {
239         mpi_errno = handle_unexp_cmpl(rreq);
240         MPIR_ERR_CHECK(mpi_errno);
241         goto fn_exit;
242     }
243 
244     rreq->status.MPI_SOURCE = MPIDIG_REQUEST(rreq, rank);
245     rreq->status.MPI_TAG = MPIDIG_REQUEST(rreq, tag);
246 
247     if (MPIDIG_REQUEST(rreq, req->status) & MPIDIG_REQ_PEER_SSEND) {
248         mpi_errno = MPIDIG_reply_ssend(rreq);
249         MPIR_ERR_CHECK(mpi_errno);
250     }
251 #ifndef MPIDI_CH4_DIRECT_NETMOD
252     MPIDI_anysrc_free_partner(rreq);
253 #endif
254 
255     MPIR_Datatype_release_if_not_builtin(MPIDIG_REQUEST(rreq, datatype));
256     if ((MPIDIG_REQUEST(rreq, req->status) & MPIDIG_REQ_RTS) &&
257         MPIDIG_REQUEST(rreq, req->rreq.match_req) != NULL) {
258         /* This block is executed only when the receive is enqueued (handoff) &&
259          * receive was matched with an unexpected long RTS message.
260          * `rreq` is the unexpected message received and `sigreq` is the message
261          * that came from CH4 (e.g. MPIDI_recv_safe) */
262         MPIR_Request *sigreq = MPIDIG_REQUEST(rreq, req->rreq.match_req);
263         sigreq->status = rreq->status;
264         MPIR_Request_add_ref(sigreq);
265         MPID_Request_complete(sigreq);
266         /* Free the unexpected request on behalf of the user */
267         MPIR_Request_free_unsafe(rreq);
268     }
269     MPID_Request_complete(rreq);
270   fn_exit:
271     MPIDIG_progress_compl_list();
272     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_RECV_TARGET_CMPL_CB);
273     return mpi_errno;
274   fn_fail:
275     goto fn_exit;
276 }
277 
MPIDIG_send_origin_cb(MPIR_Request * sreq)278 int MPIDIG_send_origin_cb(MPIR_Request * sreq)
279 {
280     int mpi_errno = MPI_SUCCESS;
281     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_SEND_ORIGIN_CB);
282     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_SEND_ORIGIN_CB);
283     MPID_Request_complete(sreq);
284     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_SEND_ORIGIN_CB);
285     return mpi_errno;
286 }
287 
MPIDIG_send_data_origin_cb(MPIR_Request * sreq)288 int MPIDIG_send_data_origin_cb(MPIR_Request * sreq)
289 {
290     int mpi_errno = MPI_SUCCESS;
291     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_SEND_DATA_ORIGIN_CB);
292     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_SEND_DATA_ORIGIN_CB);
293     MPIR_Datatype_release_if_not_builtin(MPIDIG_REQUEST(sreq, req->sreq).datatype);
294     MPID_Request_complete(sreq);
295     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_SEND_DATA_ORIGIN_CB);
296     return mpi_errno;
297 }
298 
MPIDIG_send_target_msg_cb(int handler_id,void * am_hdr,void * data,MPI_Aint in_data_sz,int is_local,int is_async,MPIR_Request ** req)299 int MPIDIG_send_target_msg_cb(int handler_id, void *am_hdr, void *data, MPI_Aint in_data_sz,
300                               int is_local, int is_async, MPIR_Request ** req)
301 {
302     int mpi_errno = MPI_SUCCESS;
303     MPIR_Request *rreq = NULL;
304     MPIR_Comm *root_comm;
305     MPIDIG_hdr_t *hdr = (MPIDIG_hdr_t *) am_hdr;
306     void *pack_buf = NULL;
307 
308     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_SEND_TARGET_MSG_CB);
309     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_SEND_TARGET_MSG_CB);
310     MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, VERBOSE,
311                     (MPL_DBG_FDEST, "HDR: data_sz=%ld, flags=0x%X", hdr->data_sz, hdr->flags));
312     root_comm = MPIDIG_context_id_to_comm(hdr->context_id);
313     if (root_comm) {
314       root_comm_retry:
315         /* MPIDI_CS_ENTER(); */
316         while (TRUE) {
317             rreq = MPIDIG_dequeue_posted(hdr->src_rank, hdr->tag, hdr->context_id,
318                                          is_local, &MPIDIG_COMM(root_comm, posted_list));
319 #ifndef MPIDI_CH4_DIRECT_NETMOD
320             if (rreq) {
321                 int is_cancelled;
322                 mpi_errno = MPIDI_anysrc_try_cancel_partner(rreq, &is_cancelled);
323                 MPIR_ERR_CHECK(mpi_errno);
324                 if (!is_cancelled) {
325                     MPIR_Comm_release(root_comm);       /* -1 for posted_list */
326                     MPIR_Datatype_release_if_not_builtin(MPIDIG_REQUEST(rreq, datatype));
327                     continue;
328                 }
329             }
330 #endif /* MPIDI_CH4_DIRECT_NETMOD */
331             break;
332         }
333         /* MPIDI_CS_EXIT(); */
334     }
335 
336     if (rreq == NULL) {
337         rreq = MPIDIG_request_create(MPIR_REQUEST_KIND__RECV, 2);
338         MPIR_ERR_CHKANDSTMT(rreq == NULL, mpi_errno, MPIX_ERR_NOREQ, goto fn_fail, "**nomemreq");
339         /* for unexpected message, always recv as MPI_BYTE into unexpected buffer. They will be
340          * set to the recv side datatype and count when it is matched */
341         MPIDIG_REQUEST(rreq, datatype) = MPI_BYTE;
342         MPIDIG_REQUEST(rreq, count) = hdr->data_sz;
343         if (in_data_sz) {
344             MPIR_Assert(in_data_sz <= MPIR_CVAR_CH4_AM_PACK_BUFFER_SIZE);
345             mpi_errno =
346                 MPIDU_genq_private_pool_alloc_cell(MPIDI_global.unexp_pack_buf_pool, &pack_buf);
347             MPIR_Assert(pack_buf);
348             MPIDIG_REQUEST(rreq, buffer) = pack_buf;
349         } else {
350             MPIDIG_REQUEST(rreq, buffer) = NULL;
351         }
352         MPIDIG_REQUEST(rreq, rank) = hdr->src_rank;
353         MPIDIG_REQUEST(rreq, tag) = hdr->tag;
354         MPIDIG_REQUEST(rreq, context_id) = hdr->context_id;
355 
356         MPIDIG_REQUEST(rreq, req->status) |= MPIDIG_REQ_UNEXPECTED;
357         if (hdr->flags & MPIDIG_AM_SEND_FLAGS_RTS) {
358             /* this is unexpected RNDV */
359             MPIDIG_REQUEST(rreq, req->rreq.peer_req_ptr) = hdr->sreq_ptr;
360             MPIDIG_REQUEST(rreq, req->status) |= MPIDIG_REQ_RTS;
361             MPIDIG_REQUEST(rreq, req->rreq.match_req) = NULL;
362         } else {
363             /* this is unexpected EAGER */
364             MPIDIG_REQUEST(rreq, req->status) |= MPIDIG_REQ_BUSY;
365         }
366 #ifndef MPIDI_CH4_DIRECT_NETMOD
367         MPIDI_REQUEST(rreq, is_local) = is_local;
368 #endif
369         MPID_THREAD_CS_ENTER(VCI, MPIDIU_THREAD_MPIDIG_GLOBAL_MUTEX);
370         if (root_comm) {
371             MPIR_Comm_add_ref(root_comm);
372             MPIDIG_enqueue_unexp(rreq, &MPIDIG_COMM(root_comm, unexp_list));
373         } else {
374             MPIR_Comm *root_comm_again;
375             /* This branch means that last time we checked, there was no communicator
376              * associated with the arriving message.
377              * In a multi-threaded environment, it is possible that the communicator
378              * has been created since we checked root_comm last time.
379              * If that is the case, the new message must be put into a queue in
380              * the new communicator. Otherwise that message will be lost forever.
381              * Here that strategy is to query root_comm again, and if found,
382              * simply re-execute the per-communicator enqueue logic above. */
383             root_comm_again = MPIDIG_context_id_to_comm(hdr->context_id);
384             if (unlikely(root_comm_again != NULL)) {
385                 MPID_THREAD_CS_EXIT(VCI, MPIDIU_THREAD_MPIDIG_GLOBAL_MUTEX);
386                 MPIDU_genq_private_pool_free_cell(MPIDI_global.unexp_pack_buf_pool,
387                                                   MPIDIG_REQUEST(rreq, buffer));
388                 MPIR_Request_free_unsafe(rreq);
389                 MPID_Request_complete(rreq);
390                 rreq = NULL;
391                 root_comm = root_comm_again;
392                 goto root_comm_retry;
393             }
394             MPIDIG_enqueue_unexp(rreq, MPIDIG_context_id_to_uelist(hdr->context_id));
395         }
396         MPID_THREAD_CS_EXIT(VCI, MPIDIU_THREAD_MPIDIG_GLOBAL_MUTEX);
397     } else {
398         MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, VERBOSE,
399                         (MPL_DBG_FDEST, "posted req %p handle=0x%x", rreq, rreq->handle));
400 
401         /* rreq != NULL <=> root_comm != NULL */
402         MPIR_Assert(root_comm);
403         /* Decrement the refcnt when popping a request out from posted_list */
404         MPIR_Comm_release(root_comm);
405         MPIDIG_REQUEST(rreq, rank) = hdr->src_rank;
406         MPIDIG_REQUEST(rreq, tag) = hdr->tag;
407         MPIDIG_REQUEST(rreq, context_id) = hdr->context_id;
408 
409         if (hdr->flags & MPIDIG_AM_SEND_FLAGS_RTS) {
410             /* this is expected RNDV, init a special recv into unexp buffer */
411             MPIDIG_REQUEST(rreq, req->rreq.peer_req_ptr) = hdr->sreq_ptr;
412             MPIDIG_REQUEST(rreq, req->status) |= MPIDIG_REQ_RTS;
413             MPIDIG_REQUEST(rreq, req->rreq.match_req) = NULL;
414             MPIDIG_do_cts(rreq);
415         }
416     }
417 
418     if (hdr->flags & MPIDIG_AM_SEND_FLAGS_SYNC) {
419         MPIDIG_REQUEST(rreq, req->rreq.peer_req_ptr) = hdr->sreq_ptr;
420         MPIDIG_REQUEST(rreq, req->status) |= MPIDIG_REQ_PEER_SSEND;
421     }
422 
423     rreq->status.MPI_ERROR = hdr->error_bits;
424     MPIDIG_REQUEST(rreq, req->status) |= MPIDIG_REQ_IN_PROGRESS;
425 
426     MPIDIG_REQUEST(rreq, req->target_cmpl_cb) = recv_target_cmpl_cb;
427     if (!(hdr->flags & MPIDIG_AM_SEND_FLAGS_RTS)) {
428         MPIDIG_REQUEST(rreq, req->seq_no) =
429             MPL_atomic_fetch_add_uint64(&MPIDI_global.nxt_seq_no, 1);
430         MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, VERBOSE,
431                         (MPL_DBG_FDEST, "seq_no: me=%" PRIu64 " exp=%" PRIu64,
432                          MPIDIG_REQUEST(rreq, req->seq_no),
433                          MPL_atomic_load_uint64(&MPIDI_global.exp_seq_no)));
434         MPIDIG_recv_type_init(hdr->data_sz, rreq);
435     }
436 
437     if (is_async) {
438         if (hdr->flags & MPIDIG_AM_SEND_FLAGS_RTS) {
439             *req = NULL;
440         } else {
441             *req = rreq;
442         }
443     } else {
444         if (!(hdr->flags & MPIDIG_AM_SEND_FLAGS_RTS)) {
445             MPIDIG_recv_copy(data, rreq);
446             MPIDIG_REQUEST(rreq, req->target_cmpl_cb) (rreq);
447         }
448     }
449 
450   fn_exit:
451     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_SEND_TARGET_MSG_CB);
452     return mpi_errno;
453   fn_fail:
454     goto fn_exit;
455 }
456 
MPIDIG_send_data_target_msg_cb(int handler_id,void * am_hdr,void * data,MPI_Aint in_data_sz,int is_local,int is_async,MPIR_Request ** req)457 int MPIDIG_send_data_target_msg_cb(int handler_id, void *am_hdr, void *data, MPI_Aint in_data_sz,
458                                    int is_local, int is_async, MPIR_Request ** req)
459 {
460     int mpi_errno = MPI_SUCCESS;
461     MPIR_Request *rreq;
462     MPIDIG_send_data_msg_t *seg_hdr = (MPIDIG_send_data_msg_t *) am_hdr;
463 
464     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_SEND_DATA_TARGET_MSG_CB);
465     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_SEND_DATA_TARGET_MSG_CB);
466 
467     rreq = (MPIR_Request *) seg_hdr->rreq_ptr;
468     MPIR_Assert(rreq);
469 
470     MPIDIG_REQUEST(rreq, req->seq_no) = MPL_atomic_fetch_add_uint64(&MPIDI_global.nxt_seq_no, 1);
471     MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, VERBOSE,
472                     (MPL_DBG_FDEST, "seq_no: me=%" PRIu64 " exp=%" PRIu64,
473                      MPIDIG_REQUEST(rreq, req->seq_no),
474                      MPL_atomic_load_uint64(&MPIDI_global.exp_seq_no)));
475     MPIDIG_recv_type_init(in_data_sz, rreq);
476 
477     if (is_async) {
478         *req = rreq;
479     } else {
480         MPIDIG_recv_copy(data, rreq);
481         MPIDIG_REQUEST(rreq, req->target_cmpl_cb) (rreq);
482     }
483 
484     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_SEND_DATA_TARGET_MSG_CB);
485     return mpi_errno;
486 }
487 
MPIDIG_ssend_ack_target_msg_cb(int handler_id,void * am_hdr,void * data,MPI_Aint in_data_sz,int is_local,int is_async,MPIR_Request ** req)488 int MPIDIG_ssend_ack_target_msg_cb(int handler_id, void *am_hdr, void *data, MPI_Aint in_data_sz,
489                                    int is_local, int is_async, MPIR_Request ** req)
490 {
491     int mpi_errno = MPI_SUCCESS;
492     MPIR_Request *sreq;
493     MPIDIG_ssend_ack_msg_t *msg_hdr = (MPIDIG_ssend_ack_msg_t *) am_hdr;
494     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_SSEND_ACK_TARGET_MSG_CB);
495     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_SSEND_ACK_TARGET_MSG_CB);
496 
497     sreq = (MPIR_Request *) msg_hdr->sreq_ptr;
498     MPID_Request_complete(sreq);
499 
500     if (is_async)
501         *req = NULL;
502 
503     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_SSEND_ACK_TARGET_MSG_CB);
504     return mpi_errno;
505 }
506 
MPIDIG_send_cts_target_msg_cb(int handler_id,void * am_hdr,void * data,MPI_Aint in_data_sz,int is_local,int is_async,MPIR_Request ** req)507 int MPIDIG_send_cts_target_msg_cb(int handler_id, void *am_hdr, void *data, MPI_Aint in_data_sz,
508                                   int is_local, int is_async, MPIR_Request ** req)
509 {
510     int mpi_errno = MPI_SUCCESS;
511     MPIR_Request *sreq;
512     MPIDIG_send_cts_msg_t *msg_hdr = (MPIDIG_send_cts_msg_t *) am_hdr;
513     MPIDIG_send_data_msg_t send_hdr;
514     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_SEND_CTS_TARGET_MSG_CB);
515     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_SEND_CTS_TARGET_MSG_CB);
516 
517     sreq = (MPIR_Request *) msg_hdr->sreq_ptr;
518     MPIR_Assert(sreq != NULL);
519 
520     MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, VERBOSE,
521                     (MPL_DBG_FDEST, "got cts req handle=0x%x", sreq->handle));
522 
523     /* Start the main data transfer */
524     send_hdr.rreq_ptr = msg_hdr->rreq_ptr;
525 #ifndef MPIDI_CH4_DIRECT_NETMOD
526     if (MPIDI_REQUEST(sreq, is_local))
527         mpi_errno =
528             MPIDI_SHM_am_isend_reply(MPIDIG_REQUEST(sreq, req->sreq).context_id,
529                                      MPIDIG_REQUEST(sreq, rank), MPIDIG_SEND_DATA,
530                                      &send_hdr, sizeof(send_hdr),
531                                      MPIDIG_REQUEST(sreq, req->sreq).src_buf,
532                                      MPIDIG_REQUEST(sreq, req->sreq).count,
533                                      MPIDIG_REQUEST(sreq, req->sreq).datatype, sreq);
534     else
535 #endif
536     {
537         mpi_errno =
538             MPIDI_NM_am_isend_reply(MPIDIG_REQUEST(sreq, req->sreq).context_id,
539                                     MPIDIG_REQUEST(sreq, rank), MPIDIG_SEND_DATA,
540                                     &send_hdr, sizeof(send_hdr),
541                                     MPIDIG_REQUEST(sreq, req->sreq).src_buf,
542                                     MPIDIG_REQUEST(sreq, req->sreq).count,
543                                     MPIDIG_REQUEST(sreq, req->sreq).datatype, sreq);
544     }
545 
546     MPIR_ERR_CHECK(mpi_errno);
547 
548     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_SEND_CTS_TARGET_MSG_CB);
549 
550   fn_exit:
551     return mpi_errno;
552   fn_fail:
553     goto fn_exit;
554 }
555