1 /*
2 * Copyright (C) by Argonne National Laboratory
3 * See COPYRIGHT in top-level directory
4 */
5
6 #include "mpidimpl.h"
7 #include "mpidrma.h"
8
MPIDI_CH3U_Handle_recv_req(MPIDI_VC_t * vc,MPIR_Request * rreq,int * complete)9 int MPIDI_CH3U_Handle_recv_req(MPIDI_VC_t * vc, MPIR_Request * rreq, int *complete)
10 {
11 static int in_routine ATTRIBUTE((unused)) = FALSE;
12 int mpi_errno = MPI_SUCCESS;
13 int (*reqFn) (MPIDI_VC_t *, MPIR_Request *, int *);
14 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3U_HANDLE_RECV_REQ);
15
16 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3U_HANDLE_RECV_REQ);
17
18 MPIR_Assert(in_routine == FALSE);
19 in_routine = TRUE;
20
21 reqFn = rreq->dev.OnDataAvail;
22 if (!reqFn) {
23 MPIR_Assert(MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_RECV);
24 mpi_errno = MPID_Request_complete(rreq);
25 MPIR_ERR_CHECK(mpi_errno);
26 *complete = TRUE;
27 }
28 else {
29 mpi_errno = reqFn(vc, rreq, complete);
30 }
31
32 in_routine = FALSE;
33
34 fn_exit:
35 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3U_HANDLE_RECV_REQ);
36 return mpi_errno;
37 fn_fail:
38 goto fn_exit;
39 }
40
41 /* ----------------------------------------------------------------------- */
42 /* Here are the functions that implement the actions that are taken when
43 * data is available for a receive request (or other completion operations)
44 * These include "receive" requests that are part of the RMA implementation.
45 *
46 * The convention for the names of routines that are called when data is
47 * available is
48 * MPIDI_CH3_ReqHandler_<type>(MPIDI_VC_t *, MPIR_Request *, int *)
49 * as in
50 * MPIDI_CH3_ReqHandler_...
51 *
52 * ToDo:
53 * We need a way for each of these functions to describe what they are,
54 * so that given a pointer to one of these functions, we can retrieve
55 * a description of the routine. We may want to use a static string
56 * and require the user to maintain thread-safety, at least while
57 * accessing the string.
58 */
59 /* ----------------------------------------------------------------------- */
MPIDI_CH3_ReqHandler_RecvComplete(MPIDI_VC_t * vc ATTRIBUTE ((unused)),MPIR_Request * rreq,int * complete)60 int MPIDI_CH3_ReqHandler_RecvComplete(MPIDI_VC_t * vc ATTRIBUTE((unused)),
61 MPIR_Request * rreq, int *complete)
62 {
63 int mpi_errno = MPI_SUCCESS;
64
65 /* mark data transfer as complete and decrement CC */
66 mpi_errno = MPID_Request_complete(rreq);
67 MPIR_ERR_CHECK(mpi_errno);
68
69 *complete = TRUE;
70
71 fn_exit:
72 return mpi_errno;
73 fn_fail:
74 goto fn_exit;
75 }
76
MPIDI_CH3_ReqHandler_PutRecvComplete(MPIDI_VC_t * vc,MPIR_Request * rreq,int * complete)77 int MPIDI_CH3_ReqHandler_PutRecvComplete(MPIDI_VC_t * vc, MPIR_Request * rreq, int *complete)
78 {
79 int mpi_errno = MPI_SUCCESS;
80 MPIR_Win *win_ptr;
81 MPI_Win source_win_handle = rreq->dev.source_win_handle;
82 int pkt_flags = rreq->dev.pkt_flags;
83 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_PUTRECVCOMPLETE);
84
85 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_PUTRECVCOMPLETE);
86
87 /* NOTE: It is possible that this request is already completed before
88 * entering this handler. This happens when this req handler is called
89 * within the same req handler on the same request.
90 * Consider this case: req is queued up in SHM queue with ref count of 2:
91 * one is for completing the request and another is for dequeueing from
92 * the queue. The first called req handler on this request completed
93 * this request and decrement ref counter to 1. Request is still in the
94 * queue. Within this handler, we call the req handler on the same request
95 * for the second time (for example when making progress on SHM queue),
96 * and the second called handler also tries to complete this request,
97 * which leads to wrong execution.
98 * Here we check if req is already completed to prevent processing the
99 * same request twice. */
100 if (MPIR_Request_is_complete(rreq)) {
101 *complete = FALSE;
102 goto fn_exit;
103 }
104
105 MPIR_Win_get_ptr(rreq->dev.target_win_handle, win_ptr);
106
107 /* mark data transfer as complete and decrement CC */
108 mpi_errno = MPID_Request_complete(rreq);
109 MPIR_ERR_CHECK(mpi_errno);
110
111 /* NOTE: finish_op_on_target() must be called after we complete this request,
112 * because inside finish_op_on_target() we may call this request handler
113 * on the same request again (in release_lock()). Marking this request as
114 * completed will prevent us from processing the same request twice. */
115 mpi_errno = finish_op_on_target(win_ptr, vc, FALSE /* has no response data */ ,
116 pkt_flags, source_win_handle);
117 MPIR_ERR_CHECK(mpi_errno);
118
119 *complete = TRUE;
120
121 fn_exit:
122 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_PUTRECVCOMPLETE);
123 return mpi_errno;
124
125 /* --BEGIN ERROR HANDLING-- */
126 fn_fail:
127 goto fn_exit;
128 /* --END ERROR HANDLING-- */
129 }
130
131
MPIDI_CH3_ReqHandler_AccumRecvComplete(MPIDI_VC_t * vc,MPIR_Request * rreq,int * complete)132 int MPIDI_CH3_ReqHandler_AccumRecvComplete(MPIDI_VC_t * vc, MPIR_Request * rreq, int *complete)
133 {
134 int mpi_errno = MPI_SUCCESS;
135 MPIR_Win *win_ptr;
136 MPI_Win source_win_handle = rreq->dev.source_win_handle;
137 int pkt_flags = rreq->dev.pkt_flags;
138 MPI_Datatype basic_type;
139 MPI_Aint predef_count, predef_dtp_size;
140 MPI_Aint stream_offset;
141 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMRECVCOMPLETE);
142
143 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMRECVCOMPLETE);
144
145 /* NOTE: It is possible that this request is already completed before
146 * entering this handler. This happens when this req handler is called
147 * within the same req handler on the same request.
148 * Consider this case: req is queued up in SHM queue with ref count of 2:
149 * one is for completing the request and another is for dequeueing from
150 * the queue. The first called req handler on this request completed
151 * this request and decrement ref counter to 1. Request is still in the
152 * queue. Within this handler, we call the req handler on the same request
153 * for the second time (for example when making progress on SHM queue),
154 * and the second called handler also tries to complete this request,
155 * which leads to wrong execution.
156 * Here we check if req is already completed to prevent processing the
157 * same request twice. */
158 if (MPIR_Request_is_complete(rreq)) {
159 *complete = FALSE;
160 goto fn_exit;
161 }
162
163 MPIR_Win_get_ptr(rreq->dev.target_win_handle, win_ptr);
164
165 MPIR_Assert(MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_ACCUM_RECV);
166
167 if (MPIR_DATATYPE_IS_PREDEFINED(rreq->dev.datatype))
168 basic_type = rreq->dev.datatype;
169 else {
170 basic_type = rreq->dev.datatype_ptr->basic_type;
171 }
172 MPIR_Assert(basic_type != MPI_DATATYPE_NULL);
173
174 MPIR_Datatype_get_size_macro(basic_type, predef_dtp_size);
175 predef_count = rreq->dev.recv_data_sz / predef_dtp_size;
176 MPIR_Assert(predef_count > 0);
177
178 stream_offset = 0;
179 MPIDI_CH3_ExtPkt_Accum_get_stream(pkt_flags,
180 (!MPIR_DATATYPE_IS_PREDEFINED(rreq->dev.datatype)),
181 rreq->dev.ext_hdr_ptr, &stream_offset);
182
183 if (win_ptr->shm_allocated == TRUE) {
184 MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
185 }
186 /* accumulate data from tmp_buf into user_buf */
187 MPIR_Assert(predef_count == (int) predef_count);
188 mpi_errno = do_accumulate_op(rreq->dev.user_buf, (int) predef_count, basic_type,
189 rreq->dev.real_user_buf, rreq->dev.user_count, rreq->dev.datatype,
190 stream_offset, rreq->dev.op, MPIDI_RMA_ACC_SRCBUF_DEFAULT);
191 if (win_ptr->shm_allocated == TRUE) {
192 MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
193 }
194 MPIR_ERR_CHECK(mpi_errno);
195
196 /* free the temporary buffer */
197 MPIDI_CH3U_SRBuf_free(rreq);
198
199 /* mark data transfer as complete and decrement CC */
200 mpi_errno = MPID_Request_complete(rreq);
201 MPIR_ERR_CHECK(mpi_errno);
202
203 /* NOTE: finish_op_on_target() must be called after we complete this request,
204 * because inside finish_op_on_target() we may call this request handler
205 * on the same request again (in release_lock()). Marking this request as
206 * completed will prevent us from processing the same request twice. */
207 mpi_errno = finish_op_on_target(win_ptr, vc, FALSE /* has no response data */ ,
208 pkt_flags, source_win_handle);
209 MPIR_ERR_CHECK(mpi_errno);
210
211 *complete = TRUE;
212
213 fn_exit:
214 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMRECVCOMPLETE);
215 return mpi_errno;
216
217 /* --BEGIN ERROR HANDLING-- */
218 fn_fail:
219 goto fn_exit;
220 /* --END ERROR HANDLING-- */
221 }
222
223
MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc,MPIR_Request * rreq,int * complete)224 int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPIR_Request * rreq, int *complete)
225 {
226 int mpi_errno = MPI_SUCCESS;
227 MPIR_Win *win_ptr;
228 MPIDI_CH3_Pkt_t upkt;
229 MPIDI_CH3_Pkt_get_accum_resp_t *get_accum_resp_pkt = &upkt.get_accum_resp;
230 MPIR_Request *resp_req;
231 struct iovec iov[MPL_IOV_LIMIT];
232 int iovcnt;
233 int is_contig;
234 MPI_Datatype basic_type;
235 MPI_Aint predef_count, predef_dtp_size;
236 MPI_Aint dt_true_lb;
237 MPI_Aint stream_offset;
238 int is_empty_origin = FALSE;
239 MPI_Aint extent, type_size;
240 MPI_Aint stream_data_len, total_len;
241 MPIR_CHKPMEM_DECL(1);
242 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMRECVCOMPLETE);
243
244 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMRECVCOMPLETE);
245
246 /* Judge if origin buffer is empty */
247 if (rreq->dev.op == MPI_NO_OP) {
248 is_empty_origin = TRUE;
249 }
250
251 MPIR_Win_get_ptr(rreq->dev.target_win_handle, win_ptr);
252
253 if (MPIR_DATATYPE_IS_PREDEFINED(rreq->dev.datatype))
254 basic_type = rreq->dev.datatype;
255 else {
256 basic_type = rreq->dev.datatype_ptr->basic_type;
257 }
258 MPIR_Assert(basic_type != MPI_DATATYPE_NULL);
259
260 stream_offset = 0;
261 MPIDI_CH3_ExtPkt_Gaccum_get_stream(rreq->dev.pkt_flags,
262 (!MPIR_DATATYPE_IS_PREDEFINED(rreq->dev.datatype)),
263 rreq->dev.ext_hdr_ptr, &stream_offset);
264
265 /* Use target data to calculate current stream unit size */
266 MPIR_Datatype_get_size_macro(rreq->dev.datatype, type_size);
267 total_len = type_size * rreq->dev.user_count;
268 MPIR_Datatype_get_size_macro(basic_type, predef_dtp_size);
269 MPIR_Datatype_get_extent_macro(basic_type, extent);
270 stream_data_len = MPL_MIN(total_len - (stream_offset / extent) * predef_dtp_size,
271 (MPIDI_CH3U_SRBuf_size / extent) * predef_dtp_size);
272
273 predef_count = stream_data_len / predef_dtp_size;
274 MPIR_Assert(predef_count > 0);
275
276 MPIDI_Pkt_init(get_accum_resp_pkt, MPIDI_CH3_PKT_GET_ACCUM_RESP);
277 get_accum_resp_pkt->request_handle = rreq->dev.resp_request_handle;
278 get_accum_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
279 get_accum_resp_pkt->pkt_flags = MPIDI_CH3_PKT_FLAG_NONE;
280 if (rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
281 rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
282 get_accum_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
283 if ((rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
284 (rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
285 get_accum_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_ACK;
286
287 /* check if data is contiguous and get true lb */
288 MPIR_Datatype_is_contig(rreq->dev.datatype, &is_contig);
289 MPIR_Datatype_get_true_lb(rreq->dev.datatype, &dt_true_lb);
290
291 resp_req = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
292 MPIR_ERR_CHKANDJUMP(resp_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
293 MPIR_Object_set_ref(resp_req, 1);
294 MPIDI_Request_set_type(resp_req, MPIDI_REQUEST_TYPE_GET_ACCUM_RESP);
295
296 MPIR_CHKPMEM_MALLOC(resp_req->dev.user_buf, void *, stream_data_len,
297 mpi_errno, "GACC resp. buffer", MPL_MEM_BUFFER);
298
299 /* NOTE: 'copy data + ACC' needs to be atomic */
300
301 if (win_ptr->shm_allocated == TRUE) {
302 MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
303 }
304
305 /* Copy data from target window to temporary buffer */
306
307 if (is_contig) {
308 MPIR_Memcpy(resp_req->dev.user_buf,
309 (void *) ((char *) rreq->dev.real_user_buf + dt_true_lb +
310 stream_offset), stream_data_len);
311 }
312 else {
313 MPI_Aint actual_pack_bytes;
314 MPIR_Typerep_pack(rreq->dev.real_user_buf, rreq->dev.user_count, rreq->dev.datatype,
315 stream_offset, resp_req->dev.user_buf, stream_data_len, &actual_pack_bytes);
316 MPIR_Assert(actual_pack_bytes == stream_data_len);
317 }
318
319 /* accumulate data from tmp_buf into user_buf */
320 MPIR_Assert(predef_count == (int) predef_count);
321 mpi_errno = do_accumulate_op(rreq->dev.user_buf, (int) predef_count, basic_type,
322 rreq->dev.real_user_buf, rreq->dev.user_count, rreq->dev.datatype,
323 stream_offset, rreq->dev.op, MPIDI_RMA_ACC_SRCBUF_DEFAULT);
324
325 if (win_ptr->shm_allocated == TRUE) {
326 MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
327 }
328
329 MPIR_ERR_CHECK(mpi_errno);
330
331 resp_req->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumSendComplete;
332 resp_req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumSendComplete;
333 resp_req->dev.target_win_handle = rreq->dev.target_win_handle;
334 resp_req->dev.pkt_flags = rreq->dev.pkt_flags;
335
336 /* here we increment the Active Target counter to guarantee the GET-like
337 * operation are completed when counter reaches zero. */
338 win_ptr->at_completion_counter++;
339
340 iov[0].iov_base = (void *) get_accum_resp_pkt;
341 iov[0].iov_len = sizeof(*get_accum_resp_pkt);
342 iov[1].iov_base = (void *) ((char *) resp_req->dev.user_buf);
343 iov[1].iov_len = stream_data_len;
344 iovcnt = 2;
345
346 MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
347 mpi_errno = MPIDI_CH3_iSendv(vc, resp_req, iov, iovcnt);
348 MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
349
350 MPIR_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
351
352 /* Mark get portion as handled */
353 rreq->dev.resp_request_handle = MPI_REQUEST_NULL;
354
355 MPIR_Assert(MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_GET_ACCUM_RECV);
356
357 if (is_empty_origin == FALSE) {
358 /* free the temporary buffer.
359 * When origin data is zero, there
360 * is no temporary buffer allocated */
361 MPIDI_CH3U_SRBuf_free(rreq);
362 }
363
364 /* mark data transfer as complete and decrement CC */
365 mpi_errno = MPID_Request_complete(rreq);
366 MPIR_ERR_CHECK(mpi_errno);
367
368 *complete = TRUE;
369 fn_exit:
370 MPIR_CHKPMEM_COMMIT();
371 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMRECVCOMPLETE);
372 return mpi_errno;
373
374 /* --BEGIN ERROR HANDLING-- */
375 fn_fail:
376 MPIR_CHKPMEM_REAP();
377 goto fn_exit;
378 /* --END ERROR HANDLING-- */
379 }
380
381
MPIDI_CH3_ReqHandler_FOPRecvComplete(MPIDI_VC_t * vc,MPIR_Request * rreq,int * complete)382 int MPIDI_CH3_ReqHandler_FOPRecvComplete(MPIDI_VC_t * vc, MPIR_Request * rreq, int *complete)
383 {
384 int mpi_errno = MPI_SUCCESS;
385 MPIR_Win *win_ptr = NULL;
386 MPI_Aint type_size;
387 MPIR_Request *resp_req = NULL;
388 struct iovec iov[MPL_IOV_LIMIT];
389 int iovcnt;
390 MPIDI_CH3_Pkt_t upkt;
391 MPIDI_CH3_Pkt_fop_resp_t *fop_resp_pkt = &upkt.fop_resp;
392 int is_contig;
393 int is_empty_origin = FALSE;
394 MPIR_CHKPMEM_DECL(1);
395 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_FOPRECVCOMPLETE);
396
397 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_FOPRECVCOMPLETE);
398
399 /* Judge if origin buffer is empty */
400 if (rreq->dev.op == MPI_NO_OP) {
401 is_empty_origin = TRUE;
402 }
403
404 MPIR_Assert(MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_FOP_RECV);
405
406 MPIR_Win_get_ptr(rreq->dev.target_win_handle, win_ptr);
407
408 MPIR_Datatype_get_size_macro(rreq->dev.datatype, type_size);
409
410 MPIR_Datatype_is_contig(rreq->dev.datatype, &is_contig);
411
412 /* Create response request */
413 resp_req = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
414 MPIR_ERR_CHKANDJUMP(resp_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
415 MPIDI_Request_set_type(resp_req, MPIDI_REQUEST_TYPE_FOP_RESP);
416 MPIR_Object_set_ref(resp_req, 1);
417 resp_req->dev.OnFinal = MPIDI_CH3_ReqHandler_FOPSendComplete;
418 resp_req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_FOPSendComplete;
419 resp_req->dev.target_win_handle = rreq->dev.target_win_handle;
420 resp_req->dev.pkt_flags = rreq->dev.pkt_flags;
421
422 MPIR_CHKPMEM_MALLOC(resp_req->dev.user_buf, void *, type_size, mpi_errno, "FOP resp. buffer", MPL_MEM_BUFFER);
423
424 /* here we increment the Active Target counter to guarantee the GET-like
425 * operation are completed when counter reaches zero. */
426 win_ptr->at_completion_counter++;
427
428 /* NOTE: 'copy data + ACC' needs to be atomic */
429
430 if (win_ptr->shm_allocated == TRUE) {
431 MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
432 }
433
434 /* Copy data into a temporary buffer in response request */
435 if (is_contig) {
436 MPIR_Memcpy(resp_req->dev.user_buf, rreq->dev.real_user_buf, type_size);
437 }
438 else {
439 MPI_Aint actual_pack_bytes;
440 MPIR_Typerep_pack(rreq->dev.real_user_buf, 1, rreq->dev.datatype, 0, resp_req->dev.user_buf,
441 type_size, &actual_pack_bytes);
442 MPIR_Assert(actual_pack_bytes == type_size);
443 }
444
445 /* Perform accumulate computation */
446 mpi_errno = do_accumulate_op(rreq->dev.user_buf, 1, rreq->dev.datatype,
447 rreq->dev.real_user_buf, 1, rreq->dev.datatype, 0, rreq->dev.op,
448 MPIDI_RMA_ACC_SRCBUF_DEFAULT);
449
450 if (win_ptr->shm_allocated == TRUE) {
451 MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
452 }
453
454 MPIR_ERR_CHECK(mpi_errno);
455
456 /* Send back data */
457 MPIDI_Pkt_init(fop_resp_pkt, MPIDI_CH3_PKT_FOP_RESP);
458 fop_resp_pkt->request_handle = rreq->dev.resp_request_handle;
459 fop_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
460 fop_resp_pkt->pkt_flags = MPIDI_CH3_PKT_FLAG_NONE;
461 if (rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
462 rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
463 fop_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
464 if ((rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
465 (rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
466 fop_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_ACK;
467
468 iov[0].iov_base = (void *) fop_resp_pkt;
469 iov[0].iov_len = sizeof(*fop_resp_pkt);
470 iov[1].iov_base = (void *) ((char *) resp_req->dev.user_buf);
471 iov[1].iov_len = type_size;
472 iovcnt = 2;
473
474 MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
475 mpi_errno = MPIDI_CH3_iSendv(vc, resp_req, iov, iovcnt);
476 MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
477
478 MPIR_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
479
480 if (is_empty_origin == FALSE) {
481 /* free the temporary buffer.
482 * When origin data is zero, there
483 * is no temporary buffer allocated */
484 MPL_free((char *) rreq->dev.user_buf);
485 }
486
487 /* mark data transfer as complete and decrement CC */
488 mpi_errno = MPID_Request_complete(rreq);
489 MPIR_ERR_CHECK(mpi_errno);
490
491 *complete = TRUE;
492
493 fn_exit:
494 MPIR_CHKPMEM_COMMIT();
495 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_FOPRECVCOMPLETE);
496 return mpi_errno;
497 /* --BEGIN ERROR HANDLING-- */
498 fn_fail:
499 MPIR_CHKPMEM_REAP();
500 goto fn_exit;
501 /* --END ERROR HANDLING-- */
502 }
503
504
MPIDI_CH3_ReqHandler_PutDerivedDTRecvComplete(MPIDI_VC_t * vc ATTRIBUTE ((unused)),MPIR_Request * rreq,int * complete)505 int MPIDI_CH3_ReqHandler_PutDerivedDTRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((unused)),
506 MPIR_Request * rreq, int *complete)
507 {
508 int mpi_errno = MPI_SUCCESS;
509 MPIR_Datatype*new_dtp = NULL;
510 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_PUTDERIVEDDTRECVCOMPLETE);
511
512 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_PUTDERIVEDDTRECVCOMPLETE);
513
514 /* get data from extended header */
515 new_dtp = (MPIR_Datatype *) MPIR_Handle_obj_alloc(&MPIR_Datatype_mem);
516 if (!new_dtp) {
517 MPIR_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
518 "MPIR_Datatype_mem");
519 }
520 /* Note: handle is filled in by MPIR_Handle_obj_alloc() */
521 MPIR_Object_set_ref(new_dtp, 1);
522 MPIR_Typerep_unflatten(new_dtp, rreq->dev.flattened_type);
523
524 /* update request to get the data */
525 MPIDI_Request_set_type(rreq, MPIDI_REQUEST_TYPE_PUT_RECV);
526 rreq->dev.datatype = new_dtp->handle;
527 rreq->dev.recv_data_sz = new_dtp->size * rreq->dev.user_count;
528
529 rreq->dev.datatype_ptr = new_dtp;
530
531 rreq->dev.msg_offset = 0;
532 rreq->dev.msgsize = rreq->dev.recv_data_sz;
533
534 mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq);
535 if (mpi_errno != MPI_SUCCESS) {
536 MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|loadrecviov");
537 }
538 if (!rreq->dev.OnDataAvail)
539 rreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PutRecvComplete;
540
541 *complete = FALSE;
542 fn_fail:
543 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_PUTDERIVEDDTRECVCOMPLETE);
544 return mpi_errno;
545 }
546
MPIDI_CH3_ReqHandler_AccumMetadataRecvComplete(MPIDI_VC_t * vc ATTRIBUTE ((unused)),MPIR_Request * rreq,int * complete)547 int MPIDI_CH3_ReqHandler_AccumMetadataRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((unused)),
548 MPIR_Request * rreq, int *complete)
549 {
550 int mpi_errno = MPI_SUCCESS;
551 MPIR_Datatype*new_dtp = NULL;
552 MPI_Aint basic_type_extent, basic_type_size;
553 MPI_Aint total_len, rest_len, stream_elem_count;
554 MPI_Aint stream_offset;
555 MPI_Aint type_size;
556 MPI_Datatype basic_dtp;
557 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMMETADATARECVCOMPLETE);
558
559 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMMETADATARECVCOMPLETE);
560
561 stream_offset = 0;
562
563 /* get stream offset */
564 if (rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
565 MPIDI_CH3_Ext_pkt_stream_t *ext_hdr = NULL;
566 MPIR_Assert(rreq->dev.ext_hdr_ptr != NULL);
567 ext_hdr = ((MPIDI_CH3_Ext_pkt_stream_t *) rreq->dev.ext_hdr_ptr);
568 stream_offset = ext_hdr->stream_offset;
569 }
570
571 if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_ACCUM_RECV_DERIVED_DT) {
572 new_dtp = (MPIR_Datatype *) MPIR_Handle_obj_alloc(&MPIR_Datatype_mem);
573 if (!new_dtp) {
574 MPIR_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
575 "MPIR_Datatype_mem");
576 }
577 /* Note: handle is filled in by MPIR_Handle_obj_alloc() */
578 MPIR_Object_set_ref(new_dtp, 1);
579 MPIR_Typerep_unflatten(new_dtp, rreq->dev.flattened_type);
580
581 /* update new request to get the data */
582 MPIDI_Request_set_type(rreq, MPIDI_REQUEST_TYPE_ACCUM_RECV);
583
584 MPIR_Assert(rreq->dev.datatype == MPI_DATATYPE_NULL);
585 rreq->dev.datatype = new_dtp->handle;
586 rreq->dev.datatype_ptr = new_dtp;
587
588 type_size = new_dtp->size;
589
590 basic_dtp = new_dtp->basic_type;
591 }
592 else {
593 MPIR_Assert(MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_ACCUM_RECV);
594 MPIR_Assert(rreq->dev.datatype != MPI_DATATYPE_NULL);
595
596 MPIR_Datatype_get_size_macro(rreq->dev.datatype, type_size);
597
598 basic_dtp = rreq->dev.datatype;
599 }
600
601 MPIR_Datatype_get_size_macro(basic_dtp, basic_type_size);
602 MPIR_Datatype_get_extent_macro(basic_dtp, basic_type_extent);
603
604 MPIR_Assert(!MPIDI_Request_get_srbuf_flag(rreq));
605 /* allocate a SRBuf for receiving stream unit */
606 MPIDI_CH3U_SRBuf_alloc(rreq, MPIDI_CH3U_SRBuf_size);
607 /* --BEGIN ERROR HANDLING-- */
608 if (rreq->dev.tmpbuf_sz == 0) {
609 MPL_DBG_MSG(MPIDI_CH3_DBG_CHANNEL, TYPICAL, "SRBuf allocation failure");
610 mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL,
611 __func__, __LINE__, MPI_ERR_OTHER, "**nomem",
612 "**nomem %d", MPIDI_CH3U_SRBuf_size);
613 rreq->status.MPI_ERROR = mpi_errno;
614 goto fn_fail;
615 }
616 /* --END ERROR HANDLING-- */
617
618 rreq->dev.user_buf = rreq->dev.tmpbuf;
619
620 total_len = type_size * rreq->dev.user_count;
621 rest_len = total_len - stream_offset;
622 stream_elem_count = MPIDI_CH3U_SRBuf_size / basic_type_extent;
623
624 rreq->dev.recv_data_sz = MPL_MIN(rest_len, stream_elem_count * basic_type_size);
625
626 rreq->dev.msg_offset = 0;
627 rreq->dev.msgsize = rreq->dev.recv_data_sz;
628
629 MPI_Aint actual_iov_bytes, actual_iov_len;
630 MPIR_Typerep_to_iov(rreq->dev.tmpbuf, rreq->dev.recv_data_sz / basic_type_size, basic_dtp,
631 0, rreq->dev.iov, MPL_IOV_LIMIT, rreq->dev.recv_data_sz,
632 &actual_iov_len, &actual_iov_bytes);
633 rreq->dev.iov_count = (int) actual_iov_len;
634 rreq->dev.iov_offset = 0;
635
636 rreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_AccumRecvComplete;
637
638 *complete = FALSE;
639 fn_fail:
640 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMMETADATARECVCOMPLETE);
641 return mpi_errno;
642 }
643
644
MPIDI_CH3_ReqHandler_GaccumMetadataRecvComplete(MPIDI_VC_t * vc,MPIR_Request * rreq,int * complete)645 int MPIDI_CH3_ReqHandler_GaccumMetadataRecvComplete(MPIDI_VC_t * vc,
646 MPIR_Request * rreq, int *complete)
647 {
648 int mpi_errno = MPI_SUCCESS;
649 MPIR_Datatype*new_dtp = NULL;
650 MPI_Aint basic_type_extent, basic_type_size;
651 MPI_Aint total_len, rest_len, stream_elem_count;
652 MPI_Aint stream_offset;
653 MPI_Aint type_size;
654 MPI_Datatype basic_dtp;
655 int is_empty_origin = FALSE;
656 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMMETADATARECVCOMPLETE);
657
658 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMMETADATARECVCOMPLETE);
659
660 /* Judge if origin buffer is empty */
661 if (rreq->dev.op == MPI_NO_OP) {
662 is_empty_origin = TRUE;
663 }
664
665 stream_offset = 0;
666
667 /* get stream offset */
668 if (rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
669 MPIDI_CH3_Ext_pkt_stream_t *ext_hdr = NULL;
670 MPIR_Assert(rreq->dev.ext_hdr_ptr != NULL);
671 ext_hdr = ((MPIDI_CH3_Ext_pkt_stream_t *) rreq->dev.ext_hdr_ptr);
672 stream_offset = ext_hdr->stream_offset;
673 }
674
675 if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_GET_ACCUM_RECV_DERIVED_DT) {
676 new_dtp = (MPIR_Datatype *) MPIR_Handle_obj_alloc(&MPIR_Datatype_mem);
677 if (!new_dtp) {
678 MPIR_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
679 "MPIR_Datatype_mem");
680 }
681 /* Note: handle is filled in by MPIR_Handle_obj_alloc() */
682 MPIR_Object_set_ref(new_dtp, 1);
683 MPIR_Typerep_unflatten(new_dtp, rreq->dev.flattened_type);
684
685 /* update new request to get the data */
686 MPIDI_Request_set_type(rreq, MPIDI_REQUEST_TYPE_GET_ACCUM_RECV);
687
688 MPIR_Assert(rreq->dev.datatype == MPI_DATATYPE_NULL);
689 rreq->dev.datatype = new_dtp->handle;
690 rreq->dev.datatype_ptr = new_dtp;
691
692 type_size = new_dtp->size;
693
694 basic_dtp = new_dtp->basic_type;
695 }
696 else {
697 MPIR_Assert(MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_GET_ACCUM_RECV);
698 MPIR_Assert(rreq->dev.datatype != MPI_DATATYPE_NULL);
699
700 MPIR_Datatype_get_size_macro(rreq->dev.datatype, type_size);
701
702 basic_dtp = rreq->dev.datatype;
703 }
704
705 if (is_empty_origin == TRUE) {
706 rreq->dev.recv_data_sz = 0;
707
708 /* There is no origin data coming, directly call final req handler. */
709 mpi_errno = MPIDI_CH3_ReqHandler_GaccumRecvComplete(vc, rreq, complete);
710 MPIR_ERR_CHECK(mpi_errno);
711 }
712 else {
713 MPIR_Datatype_get_size_macro(basic_dtp, basic_type_size);
714 MPIR_Datatype_get_extent_macro(basic_dtp, basic_type_extent);
715
716 MPIR_Assert(!MPIDI_Request_get_srbuf_flag(rreq));
717 /* allocate a SRBuf for receiving stream unit */
718 MPIDI_CH3U_SRBuf_alloc(rreq, MPIDI_CH3U_SRBuf_size);
719 /* --BEGIN ERROR HANDLING-- */
720 if (rreq->dev.tmpbuf_sz == 0) {
721 MPL_DBG_MSG(MPIDI_CH3_DBG_CHANNEL, TYPICAL, "SRBuf allocation failure");
722 mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL,
723 __func__, __LINE__, MPI_ERR_OTHER, "**nomem",
724 "**nomem %d", MPIDI_CH3U_SRBuf_size);
725 rreq->status.MPI_ERROR = mpi_errno;
726 goto fn_fail;
727 }
728 /* --END ERROR HANDLING-- */
729
730 rreq->dev.user_buf = rreq->dev.tmpbuf;
731
732 total_len = type_size * rreq->dev.user_count;
733 rest_len = total_len - stream_offset;
734 stream_elem_count = MPIDI_CH3U_SRBuf_size / basic_type_extent;
735
736 rreq->dev.recv_data_sz = MPL_MIN(rest_len, stream_elem_count * basic_type_size);
737
738 rreq->dev.msg_offset = 0;
739 rreq->dev.msgsize = rreq->dev.recv_data_sz;
740
741 MPI_Aint actual_iov_bytes, actual_iov_len;
742 MPIR_Typerep_to_iov(rreq->dev.tmpbuf, rreq->dev.recv_data_sz / basic_type_size, basic_dtp,
743 0, rreq->dev.iov, MPL_IOV_LIMIT, rreq->dev.recv_data_sz,
744 &actual_iov_len, &actual_iov_bytes);
745 rreq->dev.iov_count = actual_iov_len;
746 rreq->dev.iov_offset = 0;
747
748 rreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumRecvComplete;
749
750 *complete = FALSE;
751 }
752
753 fn_fail:
754 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMMETADATARECVCOMPLETE);
755 return mpi_errno;
756 }
757
758
759
MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete(MPIDI_VC_t * vc,MPIR_Request * rreq,int * complete)760 int MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete(MPIDI_VC_t * vc,
761 MPIR_Request * rreq, int *complete)
762 {
763 int mpi_errno = MPI_SUCCESS;
764 MPIR_Datatype*new_dtp = NULL;
765 MPIDI_CH3_Pkt_t upkt;
766 MPIDI_CH3_Pkt_get_resp_t *get_resp_pkt = &upkt.get_resp;
767 MPIR_Request *sreq;
768 MPIR_Win *win_ptr;
769 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_GETDERIVEDDTRECVCOMPLETE);
770
771 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_GETDERIVEDDTRECVCOMPLETE);
772
773 MPIR_Win_get_ptr(rreq->dev.target_win_handle, win_ptr);
774
775 MPIR_Assert(!(rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP));
776
777 /* get data from extended header */
778 new_dtp = (MPIR_Datatype *) MPIR_Handle_obj_alloc(&MPIR_Datatype_mem);
779 if (!new_dtp) {
780 MPIR_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
781 "MPIR_Datatype_mem");
782 }
783 /* Note: handle is filled in by MPIR_Handle_obj_alloc() */
784 MPIR_Object_set_ref(new_dtp, 1);
785 MPIR_Typerep_unflatten(new_dtp, rreq->dev.flattened_type);
786
787 /* create request for sending data */
788 sreq = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
789 MPIR_ERR_CHKANDJUMP(sreq == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
790
791 sreq->kind = MPIR_REQUEST_KIND__SEND;
792 MPIDI_Request_set_type(sreq, MPIDI_REQUEST_TYPE_GET_RESP);
793 sreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GetSendComplete;
794 sreq->dev.OnFinal = MPIDI_CH3_ReqHandler_GetSendComplete;
795 sreq->dev.user_buf = rreq->dev.user_buf;
796 sreq->dev.user_count = rreq->dev.user_count;
797 sreq->dev.datatype = new_dtp->handle;
798 sreq->dev.datatype_ptr = new_dtp;
799 sreq->dev.target_win_handle = rreq->dev.target_win_handle;
800 sreq->dev.pkt_flags = rreq->dev.pkt_flags;
801
802 MPIDI_Pkt_init(get_resp_pkt, MPIDI_CH3_PKT_GET_RESP);
803 get_resp_pkt->request_handle = rreq->dev.request_handle;
804 get_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
805 get_resp_pkt->pkt_flags = MPIDI_CH3_PKT_FLAG_NONE;
806 if (rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
807 rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
808 get_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
809 if ((rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
810 (rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
811 get_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_ACK;
812
813 sreq->dev.msg_offset = 0;
814 sreq->dev.msgsize = new_dtp->size * sreq->dev.user_count;
815
816 /* Because this is in a packet handler, it is already within a critical section */
817 /* MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex); */
818 mpi_errno = vc->sendNoncontig_fn(vc, sreq, get_resp_pkt, sizeof(*get_resp_pkt),
819 NULL, 0);
820 /* MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex); */
821 /* --BEGIN ERROR HANDLING-- */
822 if (mpi_errno != MPI_SUCCESS) {
823 MPIR_Request_free(sreq);
824 sreq = NULL;
825 MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
826 }
827 /* --END ERROR HANDLING-- */
828
829 /* mark receive data transfer as complete and decrement CC in receive
830 * request */
831 mpi_errno = MPID_Request_complete(rreq);
832 MPIR_ERR_CHECK(mpi_errno);
833
834 *complete = TRUE;
835
836 fn_exit:
837 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_GETDERIVEDDTRECVCOMPLETE);
838 return mpi_errno;
839 fn_fail:
840 goto fn_exit;
841 }
842
843
MPIDI_CH3_ReqHandler_UnpackUEBufComplete(MPIDI_VC_t * vc ATTRIBUTE ((unused)),MPIR_Request * rreq,int * complete)844 int MPIDI_CH3_ReqHandler_UnpackUEBufComplete(MPIDI_VC_t * vc ATTRIBUTE((unused)),
845 MPIR_Request * rreq, int *complete)
846 {
847 int recv_pending;
848 int mpi_errno = MPI_SUCCESS;
849 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_UNPACKUEBUFCOMPLETE);
850
851 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_UNPACKUEBUFCOMPLETE);
852
853 MPIDI_Request_decr_pending(rreq);
854 MPIDI_Request_check_pending(rreq, &recv_pending);
855 if (!recv_pending) {
856 if (rreq->dev.recv_data_sz > 0) {
857 MPIDI_CH3U_Request_unpack_uebuf(rreq);
858 MPL_free(rreq->dev.tmpbuf);
859 }
860 }
861 else {
862 /* The receive has not been posted yet. MPID_{Recv/Irecv}()
863 * is responsible for unpacking the buffer. */
864 }
865
866 /* mark data transfer as complete and decrement CC */
867 mpi_errno = MPID_Request_complete(rreq);
868 MPIR_ERR_CHECK(mpi_errno);
869
870 *complete = TRUE;
871
872 fn_exit:
873 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_UNPACKUEBUFCOMPLETE);
874 return mpi_errno;
875 fn_fail:
876 goto fn_exit;
877 }
878
MPIDI_CH3_ReqHandler_UnpackSRBufComplete(MPIDI_VC_t * vc,MPIR_Request * rreq,int * complete)879 int MPIDI_CH3_ReqHandler_UnpackSRBufComplete(MPIDI_VC_t * vc, MPIR_Request * rreq, int *complete)
880 {
881 int mpi_errno = MPI_SUCCESS;
882 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_UNPACKSRBUFCOMPLETE);
883
884 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_UNPACKSRBUFCOMPLETE);
885
886 MPIDI_CH3U_Request_unpack_srbuf(rreq);
887
888 if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_PUT_RECV) {
889 mpi_errno = MPIDI_CH3_ReqHandler_PutRecvComplete(vc, rreq, complete);
890 }
891 else if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_ACCUM_RECV) {
892 mpi_errno = MPIDI_CH3_ReqHandler_AccumRecvComplete(vc, rreq, complete);
893 }
894 else if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_GET_ACCUM_RECV) {
895 mpi_errno = MPIDI_CH3_ReqHandler_GaccumRecvComplete(vc, rreq, complete);
896 }
897 else if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_FOP_RECV) {
898 mpi_errno = MPIDI_CH3_ReqHandler_FOPRecvComplete(vc, rreq, complete);
899 }
900 else {
901 /* mark data transfer as complete and decrement CC */
902 mpi_errno = MPID_Request_complete(rreq);
903 MPIR_ERR_CHECK(mpi_errno);
904
905 *complete = TRUE;
906 }
907
908 fn_exit:
909 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_UNPACKSRBUFCOMPLETE);
910 return mpi_errno;
911 fn_fail:
912 goto fn_exit;
913 }
914
MPIDI_CH3_ReqHandler_UnpackSRBufReloadIOV(MPIDI_VC_t * vc ATTRIBUTE ((unused)),MPIR_Request * rreq,int * complete)915 int MPIDI_CH3_ReqHandler_UnpackSRBufReloadIOV(MPIDI_VC_t * vc ATTRIBUTE((unused)),
916 MPIR_Request * rreq, int *complete)
917 {
918 int mpi_errno = MPI_SUCCESS;
919 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_UNPACKSRBUFRELOADIOV);
920
921 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_UNPACKSRBUFRELOADIOV);
922
923 MPIDI_CH3U_Request_unpack_srbuf(rreq);
924 mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq);
925 if (mpi_errno != MPI_SUCCESS) {
926 MPIR_ERR_SETFATALANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|loadrecviov");
927 }
928 *complete = FALSE;
929 fn_fail:
930 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_UNPACKSRBUFRELOADIOV);
931 return mpi_errno;
932 }
933
MPIDI_CH3_ReqHandler_ReloadIOV(MPIDI_VC_t * vc ATTRIBUTE ((unused)),MPIR_Request * rreq,int * complete)934 int MPIDI_CH3_ReqHandler_ReloadIOV(MPIDI_VC_t * vc ATTRIBUTE((unused)),
935 MPIR_Request * rreq, int *complete)
936 {
937 int mpi_errno = MPI_SUCCESS;
938 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_RELOADIOV);
939
940 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_RELOADIOV);
941
942 mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq);
943 if (mpi_errno != MPI_SUCCESS) {
944 MPIR_ERR_SETFATALANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|loadrecviov");
945 }
946 *complete = FALSE;
947 fn_fail:
948 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_RELOADIOV);
949 return mpi_errno;
950 }
951
952 /* ----------------------------------------------------------------------- */
953 /* ----------------------------------------------------------------------- */
954
perform_put_in_lock_queue(MPIR_Win * win_ptr,MPIDI_RMA_Target_lock_entry_t * target_lock_entry)955 static inline int perform_put_in_lock_queue(MPIR_Win * win_ptr,
956 MPIDI_RMA_Target_lock_entry_t * target_lock_entry)
957 {
958 MPIDI_CH3_Pkt_put_t *put_pkt = &((target_lock_entry->pkt).put);
959 int mpi_errno = MPI_SUCCESS;
960
961 /* Piggyback candidate should have basic datatype for target datatype. */
962 MPIR_Assert(MPIR_DATATYPE_IS_PREDEFINED(put_pkt->datatype));
963
964 /* Make sure that all data is received for this op. */
965 MPIR_Assert(target_lock_entry->all_data_recved == 1);
966
967 if (put_pkt->type == MPIDI_CH3_PKT_PUT_IMMED) {
968 /* all data fits in packet header */
969 mpi_errno = MPIR_Localcopy((void *) &put_pkt->info.data, put_pkt->count, put_pkt->datatype,
970 put_pkt->addr, put_pkt->count, put_pkt->datatype);
971 MPIR_ERR_CHECK(mpi_errno);
972 }
973 else {
974 MPIR_Assert(put_pkt->type == MPIDI_CH3_PKT_PUT);
975
976 mpi_errno = MPIR_Localcopy(target_lock_entry->data, put_pkt->count, put_pkt->datatype,
977 put_pkt->addr, put_pkt->count, put_pkt->datatype);
978 MPIR_ERR_CHECK(mpi_errno);
979 }
980
981 /* do final action */
982 mpi_errno =
983 finish_op_on_target(win_ptr, target_lock_entry->vc, FALSE /* has no response data */ ,
984 put_pkt->pkt_flags, put_pkt->source_win_handle);
985 MPIR_ERR_CHECK(mpi_errno);
986
987 fn_exit:
988 return mpi_errno;
989 fn_fail:
990 goto fn_exit;
991 }
992
perform_get_in_lock_queue(MPIR_Win * win_ptr,MPIDI_RMA_Target_lock_entry_t * target_lock_entry)993 static inline int perform_get_in_lock_queue(MPIR_Win * win_ptr,
994 MPIDI_RMA_Target_lock_entry_t * target_lock_entry)
995 {
996 MPIDI_CH3_Pkt_t upkt;
997 MPIDI_CH3_Pkt_get_resp_t *get_resp_pkt = &upkt.get_resp;
998 MPIDI_CH3_Pkt_get_t *get_pkt = &((target_lock_entry->pkt).get);
999 MPIR_Request *sreq = NULL;
1000 MPI_Aint type_size;
1001 size_t len;
1002 int iovcnt;
1003 struct iovec iov[MPL_IOV_LIMIT];
1004 int is_contig;
1005 int mpi_errno = MPI_SUCCESS;
1006
1007 /* Piggyback candidate should have basic datatype for target datatype. */
1008 MPIR_Assert(MPIR_DATATYPE_IS_PREDEFINED(get_pkt->datatype));
1009
1010 /* Make sure that all data is received for this op. */
1011 MPIR_Assert(target_lock_entry->all_data_recved == 1);
1012
1013 sreq = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
1014 if (sreq == NULL) {
1015 MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**nomemreq");
1016 }
1017 MPIR_Object_set_ref(sreq, 1);
1018
1019 MPIDI_Request_set_type(sreq, MPIDI_REQUEST_TYPE_GET_RESP);
1020 sreq->kind = MPIR_REQUEST_KIND__SEND;
1021 sreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GetSendComplete;
1022 sreq->dev.OnFinal = MPIDI_CH3_ReqHandler_GetSendComplete;
1023
1024 sreq->dev.target_win_handle = win_ptr->handle;
1025 sreq->dev.pkt_flags = get_pkt->pkt_flags;
1026
1027 /* here we increment the Active Target counter to guarantee the GET-like
1028 * operation are completed when counter reaches zero. */
1029 win_ptr->at_completion_counter++;
1030
1031 if (get_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP) {
1032 MPIDI_Pkt_init(get_resp_pkt, MPIDI_CH3_PKT_GET_RESP_IMMED);
1033 }
1034 else {
1035 MPIDI_Pkt_init(get_resp_pkt, MPIDI_CH3_PKT_GET_RESP);
1036 }
1037 get_resp_pkt->request_handle = get_pkt->request_handle;
1038 get_resp_pkt->pkt_flags = MPIDI_CH3_PKT_FLAG_NONE;
1039 if (get_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
1040 get_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
1041 get_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
1042 if ((get_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
1043 (get_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
1044 get_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_ACK;
1045 get_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
1046
1047 /* length of target data */
1048 MPIR_Datatype_get_size_macro(get_pkt->datatype, type_size);
1049 MPIR_Assign_trunc(len, get_pkt->count * type_size, size_t);
1050
1051 MPIR_Datatype_is_contig(get_pkt->datatype, &is_contig);
1052
1053 if (get_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP) {
1054 void *src = (void *) (get_pkt->addr), *dest = (void *) &get_resp_pkt->info.data;
1055 mpi_errno = immed_copy(src, dest, len);
1056 MPIR_ERR_CHECK(mpi_errno);
1057
1058 /* All origin data is in packet header, issue the header. */
1059 iov[0].iov_base = (void *) get_resp_pkt;
1060 iov[0].iov_len = sizeof(*get_resp_pkt);
1061 iovcnt = 1;
1062
1063 mpi_errno = MPIDI_CH3_iSendv(target_lock_entry->vc, sreq, iov, iovcnt);
1064 if (mpi_errno != MPI_SUCCESS) {
1065 MPIR_Request_free(sreq);
1066 MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
1067 }
1068 }
1069 else if (is_contig) {
1070 iov[0].iov_base = (void *) get_resp_pkt;
1071 iov[0].iov_len = sizeof(*get_resp_pkt);
1072 iov[1].iov_base = (void *) (get_pkt->addr);
1073 iov[1].iov_len = get_pkt->count * type_size;
1074 iovcnt = 2;
1075
1076 mpi_errno = MPIDI_CH3_iSendv(target_lock_entry->vc, sreq, iov, iovcnt);
1077 if (mpi_errno != MPI_SUCCESS) {
1078 MPIR_Request_free(sreq);
1079 MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
1080 }
1081 }
1082 else {
1083 iov[0].iov_base = (void *) get_resp_pkt;
1084 iov[0].iov_len = sizeof(*get_resp_pkt);
1085
1086 sreq->dev.user_buf = get_pkt->addr;
1087 sreq->dev.user_count = get_pkt->count;
1088 sreq->dev.datatype = get_pkt->datatype;
1089 sreq->dev.msg_offset = 0;
1090 sreq->dev.msgsize = get_pkt->count * type_size;
1091
1092 mpi_errno = target_lock_entry->vc->sendNoncontig_fn(target_lock_entry->vc, sreq,
1093 iov[0].iov_base, iov[0].iov_len,
1094 NULL, 0);
1095 MPIR_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
1096 }
1097
1098 fn_exit:
1099 return mpi_errno;
1100 fn_fail:
1101 goto fn_exit;
1102 }
1103
1104
perform_acc_in_lock_queue(MPIR_Win * win_ptr,MPIDI_RMA_Target_lock_entry_t * target_lock_entry)1105 static inline int perform_acc_in_lock_queue(MPIR_Win * win_ptr,
1106 MPIDI_RMA_Target_lock_entry_t * target_lock_entry)
1107 {
1108 MPIDI_CH3_Pkt_accum_t *acc_pkt = &((target_lock_entry->pkt).accum);
1109 int mpi_errno = MPI_SUCCESS;
1110
1111 MPIR_Assert(target_lock_entry->all_data_recved == 1);
1112
1113 /* Piggyback candidate should have basic datatype for target datatype. */
1114 MPIR_Assert(MPIR_DATATYPE_IS_PREDEFINED(acc_pkt->datatype));
1115
1116 if (win_ptr->shm_allocated == TRUE) {
1117 MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
1118 }
1119
1120 if (acc_pkt->type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
1121 /* All data fits in packet header */
1122 mpi_errno = do_accumulate_op((void *) &acc_pkt->info.data, acc_pkt->count,
1123 acc_pkt->datatype, acc_pkt->addr, acc_pkt->count,
1124 acc_pkt->datatype, 0, acc_pkt->op, MPIDI_RMA_ACC_SRCBUF_DEFAULT);
1125 }
1126 else {
1127 MPIR_Assert(acc_pkt->type == MPIDI_CH3_PKT_ACCUMULATE);
1128 MPI_Aint type_size, type_extent;
1129 MPI_Aint total_len, recv_count;
1130
1131 MPIR_Datatype_get_size_macro(acc_pkt->datatype, type_size);
1132 MPIR_Datatype_get_extent_macro(acc_pkt->datatype, type_extent);
1133
1134 total_len = type_size * acc_pkt->count;
1135 recv_count = MPL_MIN((total_len / type_size), (MPIDI_CH3U_SRBuf_size / type_extent));
1136 MPIR_Assert(recv_count > 0);
1137
1138 /* Note: here stream_offset is 0 because when piggybacking LOCK, we must use
1139 * the first stream unit. */
1140 MPIR_Assert(recv_count == (int) recv_count);
1141 mpi_errno = do_accumulate_op(target_lock_entry->data, (int) recv_count, acc_pkt->datatype,
1142 acc_pkt->addr, acc_pkt->count, acc_pkt->datatype,
1143 0, acc_pkt->op, MPIDI_RMA_ACC_SRCBUF_DEFAULT);
1144 }
1145
1146 if (win_ptr->shm_allocated == TRUE) {
1147 MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
1148 }
1149
1150 MPIR_ERR_CHECK(mpi_errno);
1151
1152 mpi_errno =
1153 finish_op_on_target(win_ptr, target_lock_entry->vc, FALSE /* has no response data */ ,
1154 acc_pkt->pkt_flags, acc_pkt->source_win_handle);
1155 MPIR_ERR_CHECK(mpi_errno);
1156
1157 fn_exit:
1158 return mpi_errno;
1159 fn_fail:
1160 goto fn_exit;
1161 }
1162
1163
perform_get_acc_in_lock_queue(MPIR_Win * win_ptr,MPIDI_RMA_Target_lock_entry_t * target_lock_entry)1164 static inline int perform_get_acc_in_lock_queue(MPIR_Win * win_ptr,
1165 MPIDI_RMA_Target_lock_entry_t * target_lock_entry)
1166 {
1167 MPIDI_CH3_Pkt_t upkt;
1168 MPIDI_CH3_Pkt_get_accum_resp_t *get_accum_resp_pkt = &upkt.get_accum_resp;
1169 MPIDI_CH3_Pkt_get_accum_t *get_accum_pkt = &((target_lock_entry->pkt).get_accum);
1170 MPIR_Request *sreq = NULL;
1171 MPI_Aint type_size;
1172 size_t len;
1173 int iovcnt;
1174 struct iovec iov[MPL_IOV_LIMIT];
1175 int is_contig;
1176 int mpi_errno = MPI_SUCCESS;
1177 MPI_Aint type_extent;
1178 MPI_Aint total_len, recv_count;
1179
1180 /* Piggyback candidate should have basic datatype for target datatype. */
1181 MPIR_Assert(MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype));
1182
1183 /* Make sure that all data is received for this op. */
1184 MPIR_Assert(target_lock_entry->all_data_recved == 1);
1185
1186 sreq = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
1187 if (sreq == NULL) {
1188 MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**nomemreq");
1189 }
1190 MPIR_Object_set_ref(sreq, 1);
1191
1192 MPIDI_Request_set_type(sreq, MPIDI_REQUEST_TYPE_GET_ACCUM_RESP);
1193 sreq->kind = MPIR_REQUEST_KIND__SEND;
1194 sreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumSendComplete;
1195 sreq->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumSendComplete;
1196
1197 sreq->dev.target_win_handle = win_ptr->handle;
1198 sreq->dev.pkt_flags = get_accum_pkt->pkt_flags;
1199
1200 /* Copy data into a temporary buffer */
1201 MPIR_Datatype_get_size_macro(get_accum_pkt->datatype, type_size);
1202
1203 /* length of target data */
1204 MPIR_Assign_trunc(len, get_accum_pkt->count * type_size, size_t);
1205
1206 if (get_accum_pkt->type == MPIDI_CH3_PKT_GET_ACCUM_IMMED) {
1207 MPIDI_Pkt_init(get_accum_resp_pkt, MPIDI_CH3_PKT_GET_ACCUM_RESP_IMMED);
1208 get_accum_resp_pkt->request_handle = get_accum_pkt->request_handle;
1209 get_accum_resp_pkt->pkt_flags = MPIDI_CH3_PKT_FLAG_NONE;
1210 if (get_accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
1211 get_accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
1212 get_accum_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
1213 if ((get_accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
1214 (get_accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
1215 get_accum_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_ACK;
1216 get_accum_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
1217
1218
1219 /* NOTE: copy 'data + ACC' needs to be atomic */
1220
1221 if (win_ptr->shm_allocated == TRUE) {
1222 MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
1223 }
1224
1225 /* Copy data from target window to response packet header */
1226
1227 void *src = (void *) (get_accum_pkt->addr), *dest =
1228 (void *) &(get_accum_resp_pkt->info.data);
1229 mpi_errno = immed_copy(src, dest, len);
1230 if (mpi_errno != MPI_SUCCESS) {
1231 if (win_ptr->shm_allocated == TRUE) {
1232 MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
1233 }
1234 MPIR_ERR_POP(mpi_errno);
1235 }
1236
1237 /* Perform ACCUMULATE OP */
1238
1239 /* All data fits in packet header */
1240 mpi_errno =
1241 do_accumulate_op((void *) &get_accum_pkt->info.data, get_accum_pkt->count,
1242 get_accum_pkt->datatype, get_accum_pkt->addr, get_accum_pkt->count,
1243 get_accum_pkt->datatype, 0, get_accum_pkt->op,
1244 MPIDI_RMA_ACC_SRCBUF_DEFAULT);
1245
1246 if (win_ptr->shm_allocated == TRUE) {
1247 MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
1248 }
1249
1250 MPIR_ERR_CHECK(mpi_errno);
1251
1252 /* here we increment the Active Target counter to guarantee the GET-like
1253 * operation are completed when counter reaches zero. */
1254 win_ptr->at_completion_counter++;
1255
1256 /* All origin data is in packet header, issue the header. */
1257 iov[0].iov_base = (void *) get_accum_resp_pkt;
1258 iov[0].iov_len = sizeof(*get_accum_resp_pkt);
1259 iovcnt = 1;
1260
1261 mpi_errno = MPIDI_CH3_iSendv(target_lock_entry->vc, sreq, iov, iovcnt);
1262 if (mpi_errno != MPI_SUCCESS) {
1263 MPIR_Request_free(sreq);
1264 MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
1265 }
1266
1267 goto fn_exit;
1268 }
1269
1270 MPIR_Assert(get_accum_pkt->type == MPIDI_CH3_PKT_GET_ACCUM);
1271
1272 MPIR_Datatype_get_extent_macro(get_accum_pkt->datatype, type_extent);
1273
1274 total_len = type_size * get_accum_pkt->count;
1275 recv_count = MPL_MIN((total_len / type_size), (MPIDI_CH3U_SRBuf_size / type_extent));
1276 MPIR_Assert(recv_count > 0);
1277
1278 sreq->dev.user_buf = (void *) MPL_malloc(recv_count * type_size, MPL_MEM_BUFFER);
1279
1280 MPIR_Datatype_is_contig(get_accum_pkt->datatype, &is_contig);
1281
1282 /* NOTE: 'copy data + ACC' needs to be atomic */
1283
1284 if (win_ptr->shm_allocated == TRUE) {
1285 MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
1286 }
1287
1288 /* Copy data from target window to temporary buffer */
1289
1290 /* Note: here stream_offset is 0 because when piggybacking LOCK, we must use
1291 * the first stream unit. */
1292
1293 if (is_contig) {
1294 MPIR_Memcpy(sreq->dev.user_buf, get_accum_pkt->addr, recv_count * type_size);
1295 }
1296 else {
1297 MPI_Aint actual_pack_bytes;
1298 MPIR_Typerep_pack(get_accum_pkt->addr, get_accum_pkt->count, get_accum_pkt->datatype,
1299 0, sreq->dev.user_buf, type_size * recv_count, &actual_pack_bytes);
1300 MPIR_Assert(actual_pack_bytes == type_size * recv_count);
1301 }
1302
1303 /* Perform ACCUMULATE OP */
1304
1305 MPIR_Assert(recv_count == (int) recv_count);
1306 mpi_errno = do_accumulate_op(target_lock_entry->data, (int) recv_count, get_accum_pkt->datatype,
1307 get_accum_pkt->addr, get_accum_pkt->count, get_accum_pkt->datatype,
1308 0, get_accum_pkt->op, MPIDI_RMA_ACC_SRCBUF_DEFAULT);
1309
1310 if (win_ptr->shm_allocated == TRUE) {
1311 MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
1312 }
1313
1314 MPIR_ERR_CHECK(mpi_errno);
1315
1316 /* here we increment the Active Target counter to guarantee the GET-like
1317 * operation are completed when counter reaches zero. */
1318 win_ptr->at_completion_counter++;
1319
1320 MPIDI_Pkt_init(get_accum_resp_pkt, MPIDI_CH3_PKT_GET_ACCUM_RESP);
1321 get_accum_resp_pkt->request_handle = get_accum_pkt->request_handle;
1322 get_accum_resp_pkt->pkt_flags = MPIDI_CH3_PKT_FLAG_NONE;
1323 if (get_accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
1324 get_accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
1325 get_accum_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
1326 if ((get_accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
1327 (get_accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
1328 get_accum_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_ACK;
1329 get_accum_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
1330
1331 iov[0].iov_base = (void *) get_accum_resp_pkt;
1332 iov[0].iov_len = sizeof(*get_accum_resp_pkt);
1333 iov[1].iov_base = (void *) ((char *) sreq->dev.user_buf);
1334 iov[1].iov_len = recv_count * type_size;
1335 iovcnt = 2;
1336
1337 mpi_errno = MPIDI_CH3_iSendv(target_lock_entry->vc, sreq, iov, iovcnt);
1338 if (mpi_errno != MPI_SUCCESS) {
1339 MPIR_Request_free(sreq);
1340 MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
1341 }
1342
1343 fn_exit:
1344 return mpi_errno;
1345 fn_fail:
1346 goto fn_exit;
1347 }
1348
1349
perform_fop_in_lock_queue(MPIR_Win * win_ptr,MPIDI_RMA_Target_lock_entry_t * target_lock_entry)1350 static inline int perform_fop_in_lock_queue(MPIR_Win * win_ptr,
1351 MPIDI_RMA_Target_lock_entry_t * target_lock_entry)
1352 {
1353 MPIDI_CH3_Pkt_t upkt;
1354 MPIDI_CH3_Pkt_fop_resp_t *fop_resp_pkt = &upkt.fop_resp;
1355 MPIDI_CH3_Pkt_fop_t *fop_pkt = &((target_lock_entry->pkt).fop);
1356 MPIR_Request *resp_req = NULL;
1357 MPI_Aint type_size;
1358 struct iovec iov[MPL_IOV_LIMIT];
1359 int iovcnt;
1360 int is_contig;
1361 int mpi_errno = MPI_SUCCESS;
1362
1363 /* Piggyback candidate should have basic datatype for target datatype. */
1364 MPIR_Assert(MPIR_DATATYPE_IS_PREDEFINED(fop_pkt->datatype));
1365
1366 /* Make sure that all data is received for this op. */
1367 MPIR_Assert(target_lock_entry->all_data_recved == 1);
1368
1369 /* FIXME: this function is same with PktHandler_FOP(), should
1370 * do code refactoring on both of them. */
1371
1372 MPIR_Datatype_get_size_macro(fop_pkt->datatype, type_size);
1373
1374 MPIR_Datatype_is_contig(fop_pkt->datatype, &is_contig);
1375
1376 if (fop_pkt->type == MPIDI_CH3_PKT_FOP_IMMED) {
1377 MPIDI_Pkt_init(fop_resp_pkt, MPIDI_CH3_PKT_FOP_RESP_IMMED);
1378 }
1379 else {
1380 MPIDI_Pkt_init(fop_resp_pkt, MPIDI_CH3_PKT_FOP_RESP);
1381 }
1382
1383 fop_resp_pkt->request_handle = fop_pkt->request_handle;
1384 fop_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
1385 fop_resp_pkt->pkt_flags = MPIDI_CH3_PKT_FLAG_NONE;
1386 if (fop_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
1387 fop_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
1388 fop_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
1389 if ((fop_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
1390 (fop_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
1391 fop_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_ACK;
1392
1393 if (fop_pkt->type == MPIDI_CH3_PKT_FOP) {
1394 resp_req = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
1395 if (resp_req == NULL) {
1396 MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**nomemreq");
1397 }
1398 MPIR_Object_set_ref(resp_req, 1);
1399
1400 resp_req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_FOPSendComplete;
1401 resp_req->dev.OnFinal = MPIDI_CH3_ReqHandler_FOPSendComplete;
1402
1403 resp_req->dev.target_win_handle = win_ptr->handle;
1404 resp_req->dev.pkt_flags = fop_pkt->pkt_flags;
1405
1406 resp_req->dev.user_buf = (void *) MPL_malloc(type_size, MPL_MEM_BUFFER);
1407
1408 /* here we increment the Active Target counter to guarantee the GET-like
1409 * operation are completed when counter reaches zero. */
1410 win_ptr->at_completion_counter++;
1411 }
1412
1413 /* NOTE: 'copy data + ACC' needs to be atomic */
1414
1415 if (win_ptr->shm_allocated == TRUE) {
1416 MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
1417 }
1418
1419 /* Copy data from target window to temporary buffer / response packet header */
1420
1421 if (fop_pkt->type == MPIDI_CH3_PKT_FOP_IMMED) {
1422 /* copy data to resp pkt header */
1423 void *src = fop_pkt->addr, *dest = (void *) &fop_resp_pkt->info.data;
1424 mpi_errno = immed_copy(src, dest, type_size);
1425 if (mpi_errno != MPI_SUCCESS) {
1426 if (win_ptr->shm_allocated == TRUE) {
1427 MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
1428 }
1429 MPIR_ERR_POP(mpi_errno);
1430 }
1431 }
1432 else if (is_contig) {
1433 MPIR_Memcpy(resp_req->dev.user_buf, fop_pkt->addr, type_size);
1434 }
1435 else {
1436 MPI_Aint actual_pack_bytes;
1437 MPIR_Typerep_pack(fop_pkt->addr, 1, fop_pkt->datatype, 0, resp_req->dev.user_buf,
1438 type_size, &actual_pack_bytes);
1439 MPIR_Assert(actual_pack_bytes == type_size);
1440 }
1441
1442 /* Apply the op */
1443 if (fop_pkt->type == MPIDI_CH3_PKT_FOP_IMMED) {
1444 mpi_errno = do_accumulate_op((void *) &fop_pkt->info.data, 1, fop_pkt->datatype,
1445 fop_pkt->addr, 1, fop_pkt->datatype, 0, fop_pkt->op,
1446 MPIDI_RMA_ACC_SRCBUF_DEFAULT);
1447 }
1448 else {
1449 mpi_errno = do_accumulate_op(target_lock_entry->data, 1, fop_pkt->datatype,
1450 fop_pkt->addr, 1, fop_pkt->datatype, 0, fop_pkt->op,
1451 MPIDI_RMA_ACC_SRCBUF_DEFAULT);
1452 }
1453
1454 if (win_ptr->shm_allocated == TRUE) {
1455 MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
1456 }
1457
1458 MPIR_ERR_CHECK(mpi_errno);
1459
1460 if (fop_pkt->type == MPIDI_CH3_PKT_FOP_IMMED) {
1461 /* send back the original data */
1462 MPID_THREAD_CS_ENTER(POBJ, target_lock_entry->vc->pobj_mutex);
1463 mpi_errno =
1464 MPIDI_CH3_iStartMsg(target_lock_entry->vc, fop_resp_pkt, sizeof(*fop_resp_pkt),
1465 &resp_req);
1466 MPID_THREAD_CS_EXIT(POBJ, target_lock_entry->vc->pobj_mutex);
1467 MPIR_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
1468
1469 if (resp_req != NULL) {
1470 if (!MPIR_Request_is_complete(resp_req)) {
1471 /* sending process is not completed, set proper OnDataAvail
1472 * (it is initialized to NULL by lower layer) */
1473 resp_req->dev.target_win_handle = fop_pkt->target_win_handle;
1474 resp_req->dev.pkt_flags = fop_pkt->pkt_flags;
1475 resp_req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_FOPSendComplete;
1476
1477 /* here we increment the Active Target counter to guarantee the GET-like
1478 * operation are completed when counter reaches zero. */
1479 win_ptr->at_completion_counter++;
1480
1481 MPIR_Request_free(resp_req);
1482 goto fn_exit;
1483 }
1484 else {
1485 MPIR_Request_free(resp_req);
1486 }
1487 }
1488 }
1489 else {
1490 iov[0].iov_base = (void *) fop_resp_pkt;
1491 iov[0].iov_len = sizeof(*fop_resp_pkt);
1492 iov[1].iov_base = (void *) ((char *) resp_req->dev.user_buf);
1493 iov[1].iov_len = type_size;
1494 iovcnt = 2;
1495
1496 mpi_errno = MPIDI_CH3_iSendv(target_lock_entry->vc, resp_req, iov, iovcnt);
1497 if (mpi_errno != MPI_SUCCESS) {
1498 MPIR_Request_free(resp_req);
1499 MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
1500 }
1501 goto fn_exit;
1502 }
1503
1504 /* do final action */
1505 mpi_errno = finish_op_on_target(win_ptr, target_lock_entry->vc, TRUE /* has response data */ ,
1506 fop_pkt->pkt_flags, MPI_WIN_NULL);
1507 MPIR_ERR_CHECK(mpi_errno);
1508
1509 fn_exit:
1510 return mpi_errno;
1511 fn_fail:
1512 goto fn_exit;
1513 }
1514
1515
perform_cas_in_lock_queue(MPIR_Win * win_ptr,MPIDI_RMA_Target_lock_entry_t * target_lock_entry)1516 static inline int perform_cas_in_lock_queue(MPIR_Win * win_ptr,
1517 MPIDI_RMA_Target_lock_entry_t * target_lock_entry)
1518 {
1519 MPIDI_CH3_Pkt_t upkt;
1520 MPIDI_CH3_Pkt_cas_resp_t *cas_resp_pkt = &upkt.cas_resp;
1521 MPIDI_CH3_Pkt_cas_t *cas_pkt = &((target_lock_entry->pkt).cas);
1522 MPIR_Request *send_req = NULL;
1523 MPI_Aint len;
1524 int mpi_errno = MPI_SUCCESS;
1525
1526 /* Piggyback candidate should have basic datatype for target datatype. */
1527 MPIR_Assert(MPIR_DATATYPE_IS_PREDEFINED(cas_pkt->datatype));
1528
1529 /* Make sure that all data is received for this op. */
1530 MPIR_Assert(target_lock_entry->all_data_recved == 1);
1531
1532 MPIDI_Pkt_init(cas_resp_pkt, MPIDI_CH3_PKT_CAS_RESP_IMMED);
1533 cas_resp_pkt->request_handle = cas_pkt->request_handle;
1534 cas_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
1535 cas_resp_pkt->pkt_flags = MPIDI_CH3_PKT_FLAG_NONE;
1536 if (cas_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
1537 cas_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
1538 cas_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
1539 if ((cas_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
1540 (cas_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
1541 cas_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_ACK;
1542
1543 /* Copy old value into the response packet */
1544 MPIR_Datatype_get_size_macro(cas_pkt->datatype, len);
1545 MPIR_Assert(len <= sizeof(MPIDI_CH3_CAS_Immed_u));
1546
1547 if (win_ptr->shm_allocated == TRUE) {
1548 MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
1549 }
1550
1551 MPIR_Memcpy((void *) &cas_resp_pkt->info.data, cas_pkt->addr, len);
1552
1553 /* Compare and replace if equal */
1554 if (MPIR_Compare_equal(&cas_pkt->compare_data, cas_pkt->addr, cas_pkt->datatype)) {
1555 MPIR_Memcpy(cas_pkt->addr, &cas_pkt->origin_data, len);
1556 }
1557
1558 if (win_ptr->shm_allocated == TRUE) {
1559 MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
1560 }
1561
1562 /* Send the response packet */
1563 MPID_THREAD_CS_ENTER(POBJ, target_lock_entry->vc->pobj_mutex);
1564 mpi_errno =
1565 MPIDI_CH3_iStartMsg(target_lock_entry->vc, cas_resp_pkt, sizeof(*cas_resp_pkt), &send_req);
1566 MPID_THREAD_CS_EXIT(POBJ, target_lock_entry->vc->pobj_mutex);
1567
1568 MPIR_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
1569
1570 if (send_req != NULL) {
1571 if (!MPIR_Request_is_complete(send_req)) {
1572 /* sending process is not completed, set proper OnDataAvail
1573 * (it is initialized to NULL by lower layer) */
1574 send_req->dev.target_win_handle = cas_pkt->target_win_handle;
1575 send_req->dev.pkt_flags = cas_pkt->pkt_flags;
1576 send_req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_CASSendComplete;
1577
1578 /* here we increment the Active Target counter to guarantee the GET-like
1579 * operation are completed when counter reaches zero. */
1580 win_ptr->at_completion_counter++;
1581
1582 MPIR_Request_free(send_req);
1583 goto fn_exit;
1584 }
1585 else
1586 MPIR_Request_free(send_req);
1587 }
1588
1589 /* do final action */
1590 mpi_errno = finish_op_on_target(win_ptr, target_lock_entry->vc, TRUE /* has response data */ ,
1591 cas_pkt->pkt_flags, MPI_WIN_NULL);
1592 MPIR_ERR_CHECK(mpi_errno);
1593
1594 fn_exit:
1595 return mpi_errno;
1596 fn_fail:
1597 goto fn_exit;
1598 }
1599
1600
perform_op_in_lock_queue(MPIR_Win * win_ptr,MPIDI_RMA_Target_lock_entry_t * target_lock_entry)1601 static inline int perform_op_in_lock_queue(MPIR_Win * win_ptr,
1602 MPIDI_RMA_Target_lock_entry_t * target_lock_entry)
1603 {
1604 int mpi_errno = MPI_SUCCESS;
1605
1606 if (target_lock_entry->pkt.type == MPIDI_CH3_PKT_LOCK) {
1607
1608 /* single LOCK request */
1609
1610 MPIDI_CH3_Pkt_lock_t *lock_pkt = &(target_lock_entry->pkt.lock);
1611 MPIDI_VC_t *my_vc = NULL;
1612
1613 MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &my_vc);
1614
1615 if (target_lock_entry->vc == my_vc) {
1616 mpi_errno = handle_lock_ack(win_ptr, win_ptr->comm_ptr->rank,
1617 MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED);
1618 MPIR_ERR_CHECK(mpi_errno);
1619 }
1620 else {
1621 mpi_errno = MPIDI_CH3I_Send_lock_ack_pkt(target_lock_entry->vc, win_ptr,
1622 MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED,
1623 lock_pkt->source_win_handle,
1624 lock_pkt->request_handle);
1625 MPIR_ERR_CHECK(mpi_errno);
1626 }
1627 }
1628 else {
1629 /* LOCK+OP packet */
1630 switch (target_lock_entry->pkt.type) {
1631 case (MPIDI_CH3_PKT_PUT):
1632 case (MPIDI_CH3_PKT_PUT_IMMED):
1633 mpi_errno = perform_put_in_lock_queue(win_ptr, target_lock_entry);
1634 MPIR_ERR_CHECK(mpi_errno);
1635 break;
1636 case (MPIDI_CH3_PKT_GET):
1637 mpi_errno = perform_get_in_lock_queue(win_ptr, target_lock_entry);
1638 MPIR_ERR_CHECK(mpi_errno);
1639 break;
1640 case (MPIDI_CH3_PKT_ACCUMULATE):
1641 case (MPIDI_CH3_PKT_ACCUMULATE_IMMED):
1642 mpi_errno = perform_acc_in_lock_queue(win_ptr, target_lock_entry);
1643 MPIR_ERR_CHECK(mpi_errno);
1644 break;
1645 case (MPIDI_CH3_PKT_GET_ACCUM):
1646 case (MPIDI_CH3_PKT_GET_ACCUM_IMMED):
1647 mpi_errno = perform_get_acc_in_lock_queue(win_ptr, target_lock_entry);
1648 MPIR_ERR_CHECK(mpi_errno);
1649 break;
1650 case (MPIDI_CH3_PKT_FOP):
1651 case (MPIDI_CH3_PKT_FOP_IMMED):
1652 mpi_errno = perform_fop_in_lock_queue(win_ptr, target_lock_entry);
1653 MPIR_ERR_CHECK(mpi_errno);
1654 break;
1655 case (MPIDI_CH3_PKT_CAS_IMMED):
1656 mpi_errno = perform_cas_in_lock_queue(win_ptr, target_lock_entry);
1657 MPIR_ERR_CHECK(mpi_errno);
1658 break;
1659 default:
1660 MPIR_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER,
1661 "**invalidpkt", "**invalidpkt %d", target_lock_entry->pkt.type);
1662 }
1663 }
1664
1665 fn_exit:
1666 return mpi_errno;
1667 fn_fail:
1668 goto fn_exit;
1669 }
1670
1671
1672 static int entered_flag = 0;
1673 static int entered_count = 0;
1674
1675 /* Release the current lock on the window and grant the next lock in the
1676 queue if any */
MPIDI_CH3I_Release_lock(MPIR_Win * win_ptr)1677 int MPIDI_CH3I_Release_lock(MPIR_Win * win_ptr)
1678 {
1679 MPIDI_RMA_Target_lock_entry_t *target_lock_entry, *target_lock_entry_next;
1680 int requested_lock, mpi_errno = MPI_SUCCESS, temp_entered_count;
1681 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3I_RELEASE_LOCK);
1682
1683 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3I_RELEASE_LOCK);
1684
1685 if (win_ptr->current_lock_type == MPI_LOCK_SHARED) {
1686 /* decr ref cnt */
1687 /* FIXME: MT: Must be done atomically */
1688 win_ptr->shared_lock_ref_cnt--;
1689 }
1690
1691 /* If shared lock ref count is 0 (which is also true if the lock is an
1692 * exclusive lock), release the lock. */
1693 if (win_ptr->shared_lock_ref_cnt == 0) {
1694
1695 /* This function needs to be reentrant even in the single-threaded case
1696 * because when going through the lock queue, pkt_handler() in
1697 * perform_op_in_lock_queue() may again call release_lock(). To handle
1698 * this possibility, we use an entered_flag.
1699 * If the flag is not 0, we simply increment the entered_count and return.
1700 * The loop through the lock queue is repeated if the entered_count has
1701 * changed while we are in the loop.
1702 */
1703 if (entered_flag != 0) {
1704 entered_count++; /* Count how many times we re-enter */
1705 goto fn_exit;
1706 }
1707
1708 entered_flag = 1; /* Mark that we are now entering release_lock() */
1709 temp_entered_count = entered_count;
1710
1711 do {
1712 if (temp_entered_count != entered_count)
1713 temp_entered_count++;
1714
1715 /* FIXME: MT: The setting of the lock type must be done atomically */
1716 win_ptr->current_lock_type = MPID_LOCK_NONE;
1717
1718 /* If there is a lock queue, try to satisfy as many lock requests as
1719 * possible. If the first one is a shared lock, grant it and grant all
1720 * other shared locks. If the first one is an exclusive lock, grant
1721 * only that one. */
1722
1723 /* FIXME: MT: All queue accesses need to be made atomic */
1724 target_lock_entry = (MPIDI_RMA_Target_lock_entry_t *) win_ptr->target_lock_queue_head;
1725 while (target_lock_entry) {
1726 target_lock_entry_next = target_lock_entry->next;
1727
1728 if (target_lock_entry->all_data_recved) {
1729 int pkt_flags;
1730 MPIDI_CH3_PKT_RMA_GET_FLAGS(target_lock_entry->pkt, pkt_flags, mpi_errno);
1731 if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED)
1732 requested_lock = MPI_LOCK_SHARED;
1733 else {
1734 MPIR_Assert(pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE);
1735 requested_lock = MPI_LOCK_EXCLUSIVE;
1736 }
1737 if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, requested_lock) == 1) {
1738 /* dequeue entry from lock queue */
1739 DL_DELETE(win_ptr->target_lock_queue_head, target_lock_entry);
1740
1741 /* perform this OP */
1742 mpi_errno = perform_op_in_lock_queue(win_ptr, target_lock_entry);
1743 MPIR_ERR_CHECK(mpi_errno);
1744
1745 /* free this entry */
1746 mpi_errno =
1747 MPIDI_CH3I_Win_target_lock_entry_free(win_ptr, target_lock_entry);
1748 MPIR_ERR_CHECK(mpi_errno);
1749
1750 /* if the granted lock is exclusive,
1751 * no need to continue */
1752 if (requested_lock == MPI_LOCK_EXCLUSIVE)
1753 break;
1754 }
1755 }
1756 target_lock_entry = target_lock_entry_next;
1757 }
1758 } while (temp_entered_count != entered_count);
1759
1760 entered_count = entered_flag = 0;
1761 }
1762
1763 fn_exit:
1764 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3I_RELEASE_LOCK);
1765 return mpi_errno;
1766 fn_fail:
1767 goto fn_exit;
1768 }
1769
1770
1771
MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete(MPIDI_VC_t * vc,MPIR_Request * rreq,int * complete)1772 int MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete(MPIDI_VC_t * vc,
1773 MPIR_Request * rreq, int *complete)
1774 {
1775 int requested_lock;
1776 MPI_Win target_win_handle;
1777 MPIR_Win *win_ptr = NULL;
1778 int pkt_flags;
1779 MPIDI_RMA_Target_lock_entry_t *target_lock_queue_entry = rreq->dev.target_lock_queue_entry;
1780 int mpi_errno = MPI_SUCCESS;
1781 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_PIGGYBACKLOCKOPRECVCOMPLETE);
1782
1783 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_PIGGYBACKLOCKOPRECVCOMPLETE);
1784
1785 /* This handler is triggered when we received all data of a lock queue
1786 * entry */
1787
1788 /* Note that if we decided to drop op data, here we just need to complete this
1789 * request; otherwise we try to get the lock again in this handler. */
1790 if (rreq->dev.target_lock_queue_entry != NULL) {
1791
1792 /* Mark all data received in lock queue entry */
1793 target_lock_queue_entry->all_data_recved = 1;
1794
1795 /* try to acquire the lock here */
1796 MPIDI_CH3_PKT_RMA_GET_FLAGS(target_lock_queue_entry->pkt, pkt_flags, mpi_errno);
1797 MPIDI_CH3_PKT_RMA_GET_TARGET_WIN_HANDLE(target_lock_queue_entry->pkt, target_win_handle,
1798 mpi_errno);
1799 MPIR_Win_get_ptr(target_win_handle, win_ptr);
1800
1801 if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM &&
1802 (rreq->dev.target_lock_queue_entry)->data != NULL) {
1803
1804 MPIR_Assert(target_lock_queue_entry->pkt.type == MPIDI_CH3_PKT_ACCUMULATE ||
1805 target_lock_queue_entry->pkt.type == MPIDI_CH3_PKT_GET_ACCUM);
1806
1807 int ext_hdr_sz = sizeof(MPIDI_CH3_Ext_pkt_stream_t);
1808
1809 /* here we drop the stream_offset received, because the stream unit that piggybacked with
1810 * LOCK must be the first stream unit, with stream_offset equals to 0. */
1811 rreq->dev.recv_data_sz -= ext_hdr_sz;
1812 memmove((rreq->dev.target_lock_queue_entry)->data,
1813 (void *) ((char *) ((rreq->dev.target_lock_queue_entry)->data) + ext_hdr_sz),
1814 rreq->dev.recv_data_sz);
1815 }
1816
1817 if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED) {
1818 requested_lock = MPI_LOCK_SHARED;
1819 }
1820 else {
1821 MPIR_Assert(pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE);
1822 requested_lock = MPI_LOCK_EXCLUSIVE;
1823 }
1824
1825 if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, requested_lock) == 1) {
1826 /* dequeue entry from lock queue */
1827 DL_DELETE(win_ptr->target_lock_queue_head, target_lock_queue_entry);
1828
1829 /* perform this OP */
1830 mpi_errno = perform_op_in_lock_queue(win_ptr, target_lock_queue_entry);
1831 MPIR_ERR_CHECK(mpi_errno);
1832
1833 /* free this entry */
1834 mpi_errno = MPIDI_CH3I_Win_target_lock_entry_free(win_ptr, target_lock_queue_entry);
1835 MPIR_ERR_CHECK(mpi_errno);
1836 }
1837 /* If try acquiring lock failed, just leave the lock queue entry in the queue with
1838 * all_data_recved marked as 1, release_lock() function will traverse the queue
1839 * and find entry with all_data_recved being 1 to grant the lock. */
1840 }
1841
1842 /* mark receive data transfer as complete and decrement CC in receive
1843 * request */
1844 mpi_errno = MPID_Request_complete(rreq);
1845 MPIR_ERR_CHECK(mpi_errno);
1846
1847 *complete = TRUE;
1848
1849 fn_exit:
1850 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_PIGGYBACKLOCKOPRECVCOMPLETE);
1851 return mpi_errno;
1852 fn_fail:
1853 goto fn_exit;
1854 }
1855