1 /*
2 * Copyright (C) by Argonne National Laboratory
3 * See COPYRIGHT in top-level directory
4 */
5
6 #include "mpidimpl.h"
7 #include "mpidch4r.h"
8 #include "ch4r_callbacks.h"
9
10 static int handle_unexp_cmpl(MPIR_Request * rreq);
11 static int recv_target_cmpl_cb(MPIR_Request * rreq);
12
MPIDIG_do_cts(MPIR_Request * rreq)13 int MPIDIG_do_cts(MPIR_Request * rreq)
14 {
15 int mpi_errno = MPI_SUCCESS;
16
17 MPIDIG_send_cts_msg_t am_hdr;
18 am_hdr.sreq_ptr = (MPIDIG_REQUEST(rreq, req->rreq.peer_req_ptr));
19 am_hdr.rreq_ptr = rreq;
20 MPIR_Assert((void *) am_hdr.sreq_ptr != NULL);
21
22 MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, VERBOSE,
23 (MPL_DBG_FDEST, "do cts req %p handle=0x%x", rreq, rreq->handle));
24
25 #ifdef MPIDI_CH4_DIRECT_NETMOD
26 mpi_errno = MPIDI_NM_am_send_hdr_reply(MPIDIG_REQUEST(rreq, context_id),
27 MPIDIG_REQUEST(rreq, rank), MPIDIG_SEND_CTS, &am_hdr,
28 sizeof(am_hdr));
29 #else
30 if (MPIDI_REQUEST(rreq, is_local)) {
31 mpi_errno = MPIDI_SHM_am_send_hdr_reply(MPIDIG_REQUEST(rreq, context_id),
32 MPIDIG_REQUEST(rreq, rank),
33 MPIDIG_SEND_CTS, &am_hdr, sizeof(am_hdr));
34 } else {
35 mpi_errno = MPIDI_NM_am_send_hdr_reply(MPIDIG_REQUEST(rreq, context_id),
36 MPIDIG_REQUEST(rreq, rank),
37 MPIDIG_SEND_CTS, &am_hdr, sizeof(am_hdr));
38 }
39 #endif
40 MPIR_ERR_CHECK(mpi_errno);
41
42 fn_exit:
43 return mpi_errno;
44 fn_fail:
45 goto fn_exit;
46 }
47
48 /* Checks to make sure that the specified request is the next one expected to finish. If it isn't
49 * supposed to finish next, it is appended to a list of requests to be retrieved later. */
MPIDIG_check_cmpl_order(MPIR_Request * req)50 int MPIDIG_check_cmpl_order(MPIR_Request * req)
51 {
52 int ret = 0;
53
54 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_CHECK_CMPL_ORDER);
55 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_CHECK_CMPL_ORDER);
56
57 if (MPIDIG_REQUEST(req, req->seq_no) == MPL_atomic_load_uint64(&MPIDI_global.exp_seq_no)) {
58 MPL_atomic_fetch_add_uint64(&MPIDI_global.exp_seq_no, 1);
59 ret = 1;
60 goto fn_exit;
61 }
62
63 MPIDIG_REQUEST(req, req->request) = req;
64 /* MPIDI_CS_ENTER(); */
65 DL_APPEND(MPIDI_global.cmpl_list, req->dev.ch4.am.req);
66 /* MPIDI_CS_EXIT(); */
67
68 fn_exit:
69 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_CHECK_CMPL_ORDER);
70 return ret;
71 }
72
MPIDIG_progress_compl_list(void)73 void MPIDIG_progress_compl_list(void)
74 {
75 MPIR_Request *req;
76 MPIDIG_req_ext_t *curr, *tmp;
77
78 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_PROGRESS_COMPL_LIST);
79 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_PROGRESS_COMPL_LIST);
80
81 /* MPIDI_CS_ENTER(); */
82 do_check_again:
83 DL_FOREACH_SAFE(MPIDI_global.cmpl_list, curr, tmp) {
84 if (curr->seq_no == MPL_atomic_load_uint64(&MPIDI_global.exp_seq_no)) {
85 DL_DELETE(MPIDI_global.cmpl_list, curr);
86 req = (MPIR_Request *) curr->request;
87 MPIDIG_REQUEST(req, req->target_cmpl_cb) (req);
88 goto do_check_again;
89 }
90 }
91 /* MPIDI_CS_EXIT(); */
92 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_PROGRESS_COMPL_LIST);
93 }
94
handle_unexp_cmpl(MPIR_Request * rreq)95 static int handle_unexp_cmpl(MPIR_Request * rreq)
96 {
97 int mpi_errno = MPI_SUCCESS, in_use;
98 MPIR_Request *match_req = NULL;
99 size_t nbytes;
100 int dt_contig;
101 MPI_Aint dt_true_lb;
102 MPIR_Datatype *dt_ptr;
103 size_t dt_sz;
104
105 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_HANDLE_UNEXP_CMPL);
106 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_HANDLE_UNEXP_CMPL);
107
108 /* Check if this message has already been claimed by mprobe. */
109 /* MPIDI_CS_ENTER(); */
110 if (MPIDIG_REQUEST(rreq, req->status) & MPIDIG_REQ_UNEXP_DQUED) {
111 /* This request has been claimed by mprobe */
112 if (MPIDIG_REQUEST(rreq, req->status) & MPIDIG_REQ_UNEXP_CLAIMED) {
113 /* mrecv has been already called */
114 MPIDIG_handle_unexp_mrecv(rreq);
115 } else {
116 /* mrecv has not been called yet -- just take out the busy flag so that
117 * mrecv in future knows this request is ready */
118 MPIDIG_REQUEST(rreq, req->status) &= ~MPIDIG_REQ_BUSY;
119 }
120 /* MPIDI_CS_EXIT(); */
121 goto fn_exit;
122 }
123 /* MPIDI_CS_EXIT(); */
124
125 /* If this request was previously matched, but not handled */
126 if (MPIDIG_REQUEST(rreq, req->status) & MPIDIG_REQ_MATCHED) {
127 match_req = (MPIR_Request *) MPIDIG_REQUEST(rreq, req->rreq.match_req);
128
129 #ifndef MPIDI_CH4_DIRECT_NETMOD
130 int is_cancelled;
131 mpi_errno = MPIDI_anysrc_try_cancel_partner(match_req, &is_cancelled);
132 MPIR_ERR_CHECK(mpi_errno);
133 /* `is_cancelled` is assumed to be always true.
134 * In typical config, anysrc partners won't occur if matching unexpected
135 * message already exist.
136 * In workq setup, since we will always progress shm first, when unexpected
137 * message match, the NM partner wouldn't have progressed yet, so the cancel
138 * should always succeed.
139 */
140 MPIR_Assert(is_cancelled);
141 #endif /* MPIDI_CH4_DIRECT_NETMOD */
142 }
143
144 /* If we didn't match the request, unmark the busy bit and skip the data movement below. */
145 if (!match_req) {
146 MPIDIG_REQUEST(rreq, req->status) &= ~MPIDIG_REQ_BUSY;
147 goto fn_exit;
148 }
149
150 match_req->status.MPI_SOURCE = MPIDIG_REQUEST(rreq, rank);
151 match_req->status.MPI_TAG = MPIDIG_REQUEST(rreq, tag);
152
153 /* Figure out how much data needs to be moved. */
154 MPIDI_Datatype_get_info(MPIDIG_REQUEST(match_req, count),
155 MPIDIG_REQUEST(match_req, datatype),
156 dt_contig, dt_sz, dt_ptr, dt_true_lb);
157 MPIR_Datatype_get_size_macro(MPIDIG_REQUEST(match_req, datatype), dt_sz);
158
159 /* Make sure this request has the right amount of data in it. */
160 if (MPIDIG_REQUEST(rreq, count) > dt_sz * MPIDIG_REQUEST(match_req, count)) {
161 rreq->status.MPI_ERROR = MPI_ERR_TRUNCATE;
162 nbytes = dt_sz * MPIDIG_REQUEST(match_req, count);
163 } else {
164 rreq->status.MPI_ERROR = MPI_SUCCESS;
165 nbytes = MPIDIG_REQUEST(rreq, count); /* incoming message is always count of bytes. */
166 }
167
168 MPIR_STATUS_SET_COUNT(match_req->status, nbytes);
169 MPIDIG_REQUEST(rreq, count) = dt_sz > 0 ? nbytes / dt_sz : 0;
170
171 /* Perform the data copy (using the datatype engine if necessary for non-contig transfers) */
172 if (!dt_contig) {
173 MPI_Aint actual_unpack_bytes;
174 mpi_errno = MPIR_Typerep_unpack(MPIDIG_REQUEST(rreq, buffer), nbytes,
175 MPIDIG_REQUEST(match_req, buffer),
176 MPIDIG_REQUEST(match_req, count),
177 MPIDIG_REQUEST(match_req, datatype), 0,
178 &actual_unpack_bytes);
179 MPIR_ERR_CHECK(mpi_errno);
180
181 if (actual_unpack_bytes != (MPI_Aint) nbytes) {
182 mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
183 __FUNCTION__, __LINE__,
184 MPI_ERR_TYPE, "**dtypemismatch", 0);
185 match_req->status.MPI_ERROR = mpi_errno;
186 }
187 } else {
188 MPIR_Typerep_copy((char *) MPIDIG_REQUEST(match_req, buffer) + dt_true_lb,
189 MPIDIG_REQUEST(rreq, buffer), nbytes);
190 }
191
192 /* Now that the unexpected message has been completed, unset the status bit. */
193 MPIDIG_REQUEST(rreq, req->status) &= ~MPIDIG_REQ_UNEXPECTED;
194
195 /* If this is a synchronous send, send the reply back to the sender to unlock them. */
196 if (MPIDIG_REQUEST(rreq, req->status) & MPIDIG_REQ_PEER_SSEND) {
197 mpi_errno = MPIDIG_reply_ssend(rreq);
198 MPIR_ERR_CHECK(mpi_errno);
199 }
200 #ifndef MPIDI_CH4_DIRECT_NETMOD
201 MPIDI_anysrc_free_partner(match_req);
202 #endif
203
204 MPIR_Datatype_release_if_not_builtin(MPIDIG_REQUEST(match_req, datatype));
205 if (MPIDIG_REQUEST(rreq, buffer)) {
206 /* unexp pack buf is MPI_BYTE type, count == data size */
207 MPIDU_genq_private_pool_free_cell(MPIDI_global.unexp_pack_buf_pool,
208 MPIDIG_REQUEST(rreq, buffer));
209 }
210 MPIR_Object_release_ref(rreq, &in_use);
211 MPID_Request_complete(rreq);
212 MPID_Request_complete(match_req);
213 fn_exit:
214 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_HANDLE_UNEXP_CMPL);
215 return mpi_errno;
216 fn_fail:
217 goto fn_exit;
218 }
219
220 /* This function is called when a receive has completed on the receiver side. The input is the
221 * request that has been completed. */
recv_target_cmpl_cb(MPIR_Request * rreq)222 static int recv_target_cmpl_cb(MPIR_Request * rreq)
223 {
224 int mpi_errno = MPI_SUCCESS;
225
226 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_RECV_TARGET_CMPL_CB);
227 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_RECV_TARGET_CMPL_CB);
228
229 MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, VERBOSE,
230 (MPL_DBG_FDEST, "req %p handle=0x%x", rreq, rreq->handle));
231
232 /* Check if this request is supposed to complete next or if it should be delayed. */
233 if (!MPIDIG_check_cmpl_order(rreq))
234 return mpi_errno;
235
236 MPIDIG_recv_finish(rreq);
237
238 if (MPIDIG_REQUEST(rreq, req->status) & MPIDIG_REQ_UNEXPECTED) {
239 mpi_errno = handle_unexp_cmpl(rreq);
240 MPIR_ERR_CHECK(mpi_errno);
241 goto fn_exit;
242 }
243
244 rreq->status.MPI_SOURCE = MPIDIG_REQUEST(rreq, rank);
245 rreq->status.MPI_TAG = MPIDIG_REQUEST(rreq, tag);
246
247 if (MPIDIG_REQUEST(rreq, req->status) & MPIDIG_REQ_PEER_SSEND) {
248 mpi_errno = MPIDIG_reply_ssend(rreq);
249 MPIR_ERR_CHECK(mpi_errno);
250 }
251 #ifndef MPIDI_CH4_DIRECT_NETMOD
252 MPIDI_anysrc_free_partner(rreq);
253 #endif
254
255 MPIR_Datatype_release_if_not_builtin(MPIDIG_REQUEST(rreq, datatype));
256 if ((MPIDIG_REQUEST(rreq, req->status) & MPIDIG_REQ_RTS) &&
257 MPIDIG_REQUEST(rreq, req->rreq.match_req) != NULL) {
258 /* This block is executed only when the receive is enqueued (handoff) &&
259 * receive was matched with an unexpected long RTS message.
260 * `rreq` is the unexpected message received and `sigreq` is the message
261 * that came from CH4 (e.g. MPIDI_recv_safe) */
262 MPIR_Request *sigreq = MPIDIG_REQUEST(rreq, req->rreq.match_req);
263 sigreq->status = rreq->status;
264 MPIR_Request_add_ref(sigreq);
265 MPID_Request_complete(sigreq);
266 /* Free the unexpected request on behalf of the user */
267 MPIR_Request_free_unsafe(rreq);
268 }
269 MPID_Request_complete(rreq);
270 fn_exit:
271 MPIDIG_progress_compl_list();
272 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_RECV_TARGET_CMPL_CB);
273 return mpi_errno;
274 fn_fail:
275 goto fn_exit;
276 }
277
MPIDIG_send_origin_cb(MPIR_Request * sreq)278 int MPIDIG_send_origin_cb(MPIR_Request * sreq)
279 {
280 int mpi_errno = MPI_SUCCESS;
281 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_SEND_ORIGIN_CB);
282 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_SEND_ORIGIN_CB);
283 MPID_Request_complete(sreq);
284 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_SEND_ORIGIN_CB);
285 return mpi_errno;
286 }
287
MPIDIG_send_data_origin_cb(MPIR_Request * sreq)288 int MPIDIG_send_data_origin_cb(MPIR_Request * sreq)
289 {
290 int mpi_errno = MPI_SUCCESS;
291 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_SEND_DATA_ORIGIN_CB);
292 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_SEND_DATA_ORIGIN_CB);
293 MPIR_Datatype_release_if_not_builtin(MPIDIG_REQUEST(sreq, req->sreq).datatype);
294 MPID_Request_complete(sreq);
295 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_SEND_DATA_ORIGIN_CB);
296 return mpi_errno;
297 }
298
MPIDIG_send_target_msg_cb(int handler_id,void * am_hdr,void * data,MPI_Aint in_data_sz,int is_local,int is_async,MPIR_Request ** req)299 int MPIDIG_send_target_msg_cb(int handler_id, void *am_hdr, void *data, MPI_Aint in_data_sz,
300 int is_local, int is_async, MPIR_Request ** req)
301 {
302 int mpi_errno = MPI_SUCCESS;
303 MPIR_Request *rreq = NULL;
304 MPIR_Comm *root_comm;
305 MPIDIG_hdr_t *hdr = (MPIDIG_hdr_t *) am_hdr;
306 void *pack_buf = NULL;
307
308 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_SEND_TARGET_MSG_CB);
309 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_SEND_TARGET_MSG_CB);
310 MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, VERBOSE,
311 (MPL_DBG_FDEST, "HDR: data_sz=%ld, flags=0x%X", hdr->data_sz, hdr->flags));
312 root_comm = MPIDIG_context_id_to_comm(hdr->context_id);
313 if (root_comm) {
314 root_comm_retry:
315 /* MPIDI_CS_ENTER(); */
316 while (TRUE) {
317 rreq = MPIDIG_dequeue_posted(hdr->src_rank, hdr->tag, hdr->context_id,
318 is_local, &MPIDIG_COMM(root_comm, posted_list));
319 #ifndef MPIDI_CH4_DIRECT_NETMOD
320 if (rreq) {
321 int is_cancelled;
322 mpi_errno = MPIDI_anysrc_try_cancel_partner(rreq, &is_cancelled);
323 MPIR_ERR_CHECK(mpi_errno);
324 if (!is_cancelled) {
325 MPIR_Comm_release(root_comm); /* -1 for posted_list */
326 MPIR_Datatype_release_if_not_builtin(MPIDIG_REQUEST(rreq, datatype));
327 continue;
328 }
329 }
330 #endif /* MPIDI_CH4_DIRECT_NETMOD */
331 break;
332 }
333 /* MPIDI_CS_EXIT(); */
334 }
335
336 if (rreq == NULL) {
337 rreq = MPIDIG_request_create(MPIR_REQUEST_KIND__RECV, 2);
338 MPIR_ERR_CHKANDSTMT(rreq == NULL, mpi_errno, MPIX_ERR_NOREQ, goto fn_fail, "**nomemreq");
339 /* for unexpected message, always recv as MPI_BYTE into unexpected buffer. They will be
340 * set to the recv side datatype and count when it is matched */
341 MPIDIG_REQUEST(rreq, datatype) = MPI_BYTE;
342 MPIDIG_REQUEST(rreq, count) = hdr->data_sz;
343 if (in_data_sz) {
344 MPIR_Assert(in_data_sz <= MPIR_CVAR_CH4_AM_PACK_BUFFER_SIZE);
345 mpi_errno =
346 MPIDU_genq_private_pool_alloc_cell(MPIDI_global.unexp_pack_buf_pool, &pack_buf);
347 MPIR_Assert(pack_buf);
348 MPIDIG_REQUEST(rreq, buffer) = pack_buf;
349 } else {
350 MPIDIG_REQUEST(rreq, buffer) = NULL;
351 }
352 MPIDIG_REQUEST(rreq, rank) = hdr->src_rank;
353 MPIDIG_REQUEST(rreq, tag) = hdr->tag;
354 MPIDIG_REQUEST(rreq, context_id) = hdr->context_id;
355
356 MPIDIG_REQUEST(rreq, req->status) |= MPIDIG_REQ_UNEXPECTED;
357 if (hdr->flags & MPIDIG_AM_SEND_FLAGS_RTS) {
358 /* this is unexpected RNDV */
359 MPIDIG_REQUEST(rreq, req->rreq.peer_req_ptr) = hdr->sreq_ptr;
360 MPIDIG_REQUEST(rreq, req->status) |= MPIDIG_REQ_RTS;
361 MPIDIG_REQUEST(rreq, req->rreq.match_req) = NULL;
362 } else {
363 /* this is unexpected EAGER */
364 MPIDIG_REQUEST(rreq, req->status) |= MPIDIG_REQ_BUSY;
365 }
366 #ifndef MPIDI_CH4_DIRECT_NETMOD
367 MPIDI_REQUEST(rreq, is_local) = is_local;
368 #endif
369 MPID_THREAD_CS_ENTER(VCI, MPIDIU_THREAD_MPIDIG_GLOBAL_MUTEX);
370 if (root_comm) {
371 MPIR_Comm_add_ref(root_comm);
372 MPIDIG_enqueue_unexp(rreq, &MPIDIG_COMM(root_comm, unexp_list));
373 } else {
374 MPIR_Comm *root_comm_again;
375 /* This branch means that last time we checked, there was no communicator
376 * associated with the arriving message.
377 * In a multi-threaded environment, it is possible that the communicator
378 * has been created since we checked root_comm last time.
379 * If that is the case, the new message must be put into a queue in
380 * the new communicator. Otherwise that message will be lost forever.
381 * Here that strategy is to query root_comm again, and if found,
382 * simply re-execute the per-communicator enqueue logic above. */
383 root_comm_again = MPIDIG_context_id_to_comm(hdr->context_id);
384 if (unlikely(root_comm_again != NULL)) {
385 MPID_THREAD_CS_EXIT(VCI, MPIDIU_THREAD_MPIDIG_GLOBAL_MUTEX);
386 MPIDU_genq_private_pool_free_cell(MPIDI_global.unexp_pack_buf_pool,
387 MPIDIG_REQUEST(rreq, buffer));
388 MPIR_Request_free_unsafe(rreq);
389 MPID_Request_complete(rreq);
390 rreq = NULL;
391 root_comm = root_comm_again;
392 goto root_comm_retry;
393 }
394 MPIDIG_enqueue_unexp(rreq, MPIDIG_context_id_to_uelist(hdr->context_id));
395 }
396 MPID_THREAD_CS_EXIT(VCI, MPIDIU_THREAD_MPIDIG_GLOBAL_MUTEX);
397 } else {
398 MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, VERBOSE,
399 (MPL_DBG_FDEST, "posted req %p handle=0x%x", rreq, rreq->handle));
400
401 /* rreq != NULL <=> root_comm != NULL */
402 MPIR_Assert(root_comm);
403 /* Decrement the refcnt when popping a request out from posted_list */
404 MPIR_Comm_release(root_comm);
405 MPIDIG_REQUEST(rreq, rank) = hdr->src_rank;
406 MPIDIG_REQUEST(rreq, tag) = hdr->tag;
407 MPIDIG_REQUEST(rreq, context_id) = hdr->context_id;
408
409 if (hdr->flags & MPIDIG_AM_SEND_FLAGS_RTS) {
410 /* this is expected RNDV, init a special recv into unexp buffer */
411 MPIDIG_REQUEST(rreq, req->rreq.peer_req_ptr) = hdr->sreq_ptr;
412 MPIDIG_REQUEST(rreq, req->status) |= MPIDIG_REQ_RTS;
413 MPIDIG_REQUEST(rreq, req->rreq.match_req) = NULL;
414 MPIDIG_do_cts(rreq);
415 }
416 }
417
418 if (hdr->flags & MPIDIG_AM_SEND_FLAGS_SYNC) {
419 MPIDIG_REQUEST(rreq, req->rreq.peer_req_ptr) = hdr->sreq_ptr;
420 MPIDIG_REQUEST(rreq, req->status) |= MPIDIG_REQ_PEER_SSEND;
421 }
422
423 rreq->status.MPI_ERROR = hdr->error_bits;
424 MPIDIG_REQUEST(rreq, req->status) |= MPIDIG_REQ_IN_PROGRESS;
425
426 MPIDIG_REQUEST(rreq, req->target_cmpl_cb) = recv_target_cmpl_cb;
427 if (!(hdr->flags & MPIDIG_AM_SEND_FLAGS_RTS)) {
428 MPIDIG_REQUEST(rreq, req->seq_no) =
429 MPL_atomic_fetch_add_uint64(&MPIDI_global.nxt_seq_no, 1);
430 MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, VERBOSE,
431 (MPL_DBG_FDEST, "seq_no: me=%" PRIu64 " exp=%" PRIu64,
432 MPIDIG_REQUEST(rreq, req->seq_no),
433 MPL_atomic_load_uint64(&MPIDI_global.exp_seq_no)));
434 MPIDIG_recv_type_init(hdr->data_sz, rreq);
435 }
436
437 if (is_async) {
438 if (hdr->flags & MPIDIG_AM_SEND_FLAGS_RTS) {
439 *req = NULL;
440 } else {
441 *req = rreq;
442 }
443 } else {
444 if (!(hdr->flags & MPIDIG_AM_SEND_FLAGS_RTS)) {
445 MPIDIG_recv_copy(data, rreq);
446 MPIDIG_REQUEST(rreq, req->target_cmpl_cb) (rreq);
447 }
448 }
449
450 fn_exit:
451 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_SEND_TARGET_MSG_CB);
452 return mpi_errno;
453 fn_fail:
454 goto fn_exit;
455 }
456
MPIDIG_send_data_target_msg_cb(int handler_id,void * am_hdr,void * data,MPI_Aint in_data_sz,int is_local,int is_async,MPIR_Request ** req)457 int MPIDIG_send_data_target_msg_cb(int handler_id, void *am_hdr, void *data, MPI_Aint in_data_sz,
458 int is_local, int is_async, MPIR_Request ** req)
459 {
460 int mpi_errno = MPI_SUCCESS;
461 MPIR_Request *rreq;
462 MPIDIG_send_data_msg_t *seg_hdr = (MPIDIG_send_data_msg_t *) am_hdr;
463
464 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_SEND_DATA_TARGET_MSG_CB);
465 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_SEND_DATA_TARGET_MSG_CB);
466
467 rreq = (MPIR_Request *) seg_hdr->rreq_ptr;
468 MPIR_Assert(rreq);
469
470 MPIDIG_REQUEST(rreq, req->seq_no) = MPL_atomic_fetch_add_uint64(&MPIDI_global.nxt_seq_no, 1);
471 MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, VERBOSE,
472 (MPL_DBG_FDEST, "seq_no: me=%" PRIu64 " exp=%" PRIu64,
473 MPIDIG_REQUEST(rreq, req->seq_no),
474 MPL_atomic_load_uint64(&MPIDI_global.exp_seq_no)));
475 MPIDIG_recv_type_init(in_data_sz, rreq);
476
477 if (is_async) {
478 *req = rreq;
479 } else {
480 MPIDIG_recv_copy(data, rreq);
481 MPIDIG_REQUEST(rreq, req->target_cmpl_cb) (rreq);
482 }
483
484 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_SEND_DATA_TARGET_MSG_CB);
485 return mpi_errno;
486 }
487
MPIDIG_ssend_ack_target_msg_cb(int handler_id,void * am_hdr,void * data,MPI_Aint in_data_sz,int is_local,int is_async,MPIR_Request ** req)488 int MPIDIG_ssend_ack_target_msg_cb(int handler_id, void *am_hdr, void *data, MPI_Aint in_data_sz,
489 int is_local, int is_async, MPIR_Request ** req)
490 {
491 int mpi_errno = MPI_SUCCESS;
492 MPIR_Request *sreq;
493 MPIDIG_ssend_ack_msg_t *msg_hdr = (MPIDIG_ssend_ack_msg_t *) am_hdr;
494 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_SSEND_ACK_TARGET_MSG_CB);
495 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_SSEND_ACK_TARGET_MSG_CB);
496
497 sreq = (MPIR_Request *) msg_hdr->sreq_ptr;
498 MPID_Request_complete(sreq);
499
500 if (is_async)
501 *req = NULL;
502
503 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_SSEND_ACK_TARGET_MSG_CB);
504 return mpi_errno;
505 }
506
MPIDIG_send_cts_target_msg_cb(int handler_id,void * am_hdr,void * data,MPI_Aint in_data_sz,int is_local,int is_async,MPIR_Request ** req)507 int MPIDIG_send_cts_target_msg_cb(int handler_id, void *am_hdr, void *data, MPI_Aint in_data_sz,
508 int is_local, int is_async, MPIR_Request ** req)
509 {
510 int mpi_errno = MPI_SUCCESS;
511 MPIR_Request *sreq;
512 MPIDIG_send_cts_msg_t *msg_hdr = (MPIDIG_send_cts_msg_t *) am_hdr;
513 MPIDIG_send_data_msg_t send_hdr;
514 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDIG_SEND_CTS_TARGET_MSG_CB);
515 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDIG_SEND_CTS_TARGET_MSG_CB);
516
517 sreq = (MPIR_Request *) msg_hdr->sreq_ptr;
518 MPIR_Assert(sreq != NULL);
519
520 MPL_DBG_MSG_FMT(MPIDI_CH4_DBG_GENERAL, VERBOSE,
521 (MPL_DBG_FDEST, "got cts req handle=0x%x", sreq->handle));
522
523 /* Start the main data transfer */
524 send_hdr.rreq_ptr = msg_hdr->rreq_ptr;
525 #ifndef MPIDI_CH4_DIRECT_NETMOD
526 if (MPIDI_REQUEST(sreq, is_local))
527 mpi_errno =
528 MPIDI_SHM_am_isend_reply(MPIDIG_REQUEST(sreq, req->sreq).context_id,
529 MPIDIG_REQUEST(sreq, rank), MPIDIG_SEND_DATA,
530 &send_hdr, sizeof(send_hdr),
531 MPIDIG_REQUEST(sreq, req->sreq).src_buf,
532 MPIDIG_REQUEST(sreq, req->sreq).count,
533 MPIDIG_REQUEST(sreq, req->sreq).datatype, sreq);
534 else
535 #endif
536 {
537 mpi_errno =
538 MPIDI_NM_am_isend_reply(MPIDIG_REQUEST(sreq, req->sreq).context_id,
539 MPIDIG_REQUEST(sreq, rank), MPIDIG_SEND_DATA,
540 &send_hdr, sizeof(send_hdr),
541 MPIDIG_REQUEST(sreq, req->sreq).src_buf,
542 MPIDIG_REQUEST(sreq, req->sreq).count,
543 MPIDIG_REQUEST(sreq, req->sreq).datatype, sreq);
544 }
545
546 MPIR_ERR_CHECK(mpi_errno);
547
548 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDIG_SEND_CTS_TARGET_MSG_CB);
549
550 fn_exit:
551 return mpi_errno;
552 fn_fail:
553 goto fn_exit;
554 }
555