1 /*
2  * Copyright (C) by Argonne National Laboratory
3  *     See COPYRIGHT in top-level directory
4  */
5 
6 #include "mpidimpl.h"
7 #include "mpidrma.h"
8 
MPIDI_CH3U_Handle_recv_req(MPIDI_VC_t * vc,MPIR_Request * rreq,int * complete)9 int MPIDI_CH3U_Handle_recv_req(MPIDI_VC_t * vc, MPIR_Request * rreq, int *complete)
10 {
11     static int in_routine ATTRIBUTE((unused)) = FALSE;
12     int mpi_errno = MPI_SUCCESS;
13     int (*reqFn) (MPIDI_VC_t *, MPIR_Request *, int *);
14     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3U_HANDLE_RECV_REQ);
15 
16     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3U_HANDLE_RECV_REQ);
17 
18     MPIR_Assert(in_routine == FALSE);
19     in_routine = TRUE;
20 
21     reqFn = rreq->dev.OnDataAvail;
22     if (!reqFn) {
23         MPIR_Assert(MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_RECV);
24         mpi_errno = MPID_Request_complete(rreq);
25         MPIR_ERR_CHECK(mpi_errno);
26         *complete = TRUE;
27     }
28     else {
29         mpi_errno = reqFn(vc, rreq, complete);
30     }
31 
32     in_routine = FALSE;
33 
34   fn_exit:
35     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3U_HANDLE_RECV_REQ);
36     return mpi_errno;
37   fn_fail:
38     goto fn_exit;
39 }
40 
41 /* ----------------------------------------------------------------------- */
42 /* Here are the functions that implement the actions that are taken when
43  * data is available for a receive request (or other completion operations)
44  * These include "receive" requests that are part of the RMA implementation.
45  *
46  * The convention for the names of routines that are called when data is
47  * available is
48  *    MPIDI_CH3_ReqHandler_<type>(MPIDI_VC_t *, MPIR_Request *, int *)
49  * as in
50  *    MPIDI_CH3_ReqHandler_...
51  *
52  * ToDo:
53  *    We need a way for each of these functions to describe what they are,
54  *    so that given a pointer to one of these functions, we can retrieve
55  *    a description of the routine.  We may want to use a static string
56  *    and require the user to maintain thread-safety, at least while
57  *    accessing the string.
58  */
59 /* ----------------------------------------------------------------------- */
MPIDI_CH3_ReqHandler_RecvComplete(MPIDI_VC_t * vc ATTRIBUTE ((unused)),MPIR_Request * rreq,int * complete)60 int MPIDI_CH3_ReqHandler_RecvComplete(MPIDI_VC_t * vc ATTRIBUTE((unused)),
61                                       MPIR_Request * rreq, int *complete)
62 {
63     int mpi_errno = MPI_SUCCESS;
64 
65     /* mark data transfer as complete and decrement CC */
66     mpi_errno = MPID_Request_complete(rreq);
67     MPIR_ERR_CHECK(mpi_errno);
68 
69     *complete = TRUE;
70 
71   fn_exit:
72     return mpi_errno;
73   fn_fail:
74     goto fn_exit;
75 }
76 
MPIDI_CH3_ReqHandler_PutRecvComplete(MPIDI_VC_t * vc,MPIR_Request * rreq,int * complete)77 int MPIDI_CH3_ReqHandler_PutRecvComplete(MPIDI_VC_t * vc, MPIR_Request * rreq, int *complete)
78 {
79     int mpi_errno = MPI_SUCCESS;
80     MPIR_Win *win_ptr;
81     MPI_Win source_win_handle = rreq->dev.source_win_handle;
82     int pkt_flags = rreq->dev.pkt_flags;
83     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_PUTRECVCOMPLETE);
84 
85     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_PUTRECVCOMPLETE);
86 
87     /* NOTE: It is possible that this request is already completed before
88      * entering this handler. This happens when this req handler is called
89      * within the same req handler on the same request.
90      * Consider this case: req is queued up in SHM queue with ref count of 2:
91      * one is for completing the request and another is for dequeueing from
92      * the queue. The first called req handler on this request completed
93      * this request and decrement ref counter to 1. Request is still in the
94      * queue. Within this handler, we call the req handler on the same request
95      * for the second time (for example when making progress on SHM queue),
96      * and the second called handler also tries to complete this request,
97      * which leads to wrong execution.
98      * Here we check if req is already completed to prevent processing the
99      * same request twice. */
100     if (MPIR_Request_is_complete(rreq)) {
101         *complete = FALSE;
102         goto fn_exit;
103     }
104 
105     MPIR_Win_get_ptr(rreq->dev.target_win_handle, win_ptr);
106 
107     /* mark data transfer as complete and decrement CC */
108     mpi_errno = MPID_Request_complete(rreq);
109     MPIR_ERR_CHECK(mpi_errno);
110 
111     /* NOTE: finish_op_on_target() must be called after we complete this request,
112      * because inside finish_op_on_target() we may call this request handler
113      * on the same request again (in release_lock()). Marking this request as
114      * completed will prevent us from processing the same request twice. */
115     mpi_errno = finish_op_on_target(win_ptr, vc, FALSE /* has no response data */ ,
116                                     pkt_flags, source_win_handle);
117     MPIR_ERR_CHECK(mpi_errno);
118 
119     *complete = TRUE;
120 
121   fn_exit:
122     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_PUTRECVCOMPLETE);
123     return mpi_errno;
124 
125     /* --BEGIN ERROR HANDLING-- */
126   fn_fail:
127     goto fn_exit;
128     /* --END ERROR HANDLING-- */
129 }
130 
131 
MPIDI_CH3_ReqHandler_AccumRecvComplete(MPIDI_VC_t * vc,MPIR_Request * rreq,int * complete)132 int MPIDI_CH3_ReqHandler_AccumRecvComplete(MPIDI_VC_t * vc, MPIR_Request * rreq, int *complete)
133 {
134     int mpi_errno = MPI_SUCCESS;
135     MPIR_Win *win_ptr;
136     MPI_Win source_win_handle = rreq->dev.source_win_handle;
137     int pkt_flags = rreq->dev.pkt_flags;
138     MPI_Datatype basic_type;
139     MPI_Aint predef_count, predef_dtp_size;
140     MPI_Aint stream_offset;
141     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMRECVCOMPLETE);
142 
143     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMRECVCOMPLETE);
144 
145     /* NOTE: It is possible that this request is already completed before
146      * entering this handler. This happens when this req handler is called
147      * within the same req handler on the same request.
148      * Consider this case: req is queued up in SHM queue with ref count of 2:
149      * one is for completing the request and another is for dequeueing from
150      * the queue. The first called req handler on this request completed
151      * this request and decrement ref counter to 1. Request is still in the
152      * queue. Within this handler, we call the req handler on the same request
153      * for the second time (for example when making progress on SHM queue),
154      * and the second called handler also tries to complete this request,
155      * which leads to wrong execution.
156      * Here we check if req is already completed to prevent processing the
157      * same request twice. */
158     if (MPIR_Request_is_complete(rreq)) {
159         *complete = FALSE;
160         goto fn_exit;
161     }
162 
163     MPIR_Win_get_ptr(rreq->dev.target_win_handle, win_ptr);
164 
165     MPIR_Assert(MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_ACCUM_RECV);
166 
167     if (MPIR_DATATYPE_IS_PREDEFINED(rreq->dev.datatype))
168         basic_type = rreq->dev.datatype;
169     else {
170         basic_type = rreq->dev.datatype_ptr->basic_type;
171     }
172     MPIR_Assert(basic_type != MPI_DATATYPE_NULL);
173 
174     MPIR_Datatype_get_size_macro(basic_type, predef_dtp_size);
175     predef_count = rreq->dev.recv_data_sz / predef_dtp_size;
176     MPIR_Assert(predef_count > 0);
177 
178     stream_offset = 0;
179     MPIDI_CH3_ExtPkt_Accum_get_stream(pkt_flags,
180                                       (!MPIR_DATATYPE_IS_PREDEFINED(rreq->dev.datatype)),
181                                       rreq->dev.ext_hdr_ptr, &stream_offset);
182 
183     if (win_ptr->shm_allocated == TRUE) {
184         MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
185     }
186     /* accumulate data from tmp_buf into user_buf */
187     MPIR_Assert(predef_count == (int) predef_count);
188     mpi_errno = do_accumulate_op(rreq->dev.user_buf, (int) predef_count, basic_type,
189                                  rreq->dev.real_user_buf, rreq->dev.user_count, rreq->dev.datatype,
190                                  stream_offset, rreq->dev.op, MPIDI_RMA_ACC_SRCBUF_DEFAULT);
191     if (win_ptr->shm_allocated == TRUE) {
192         MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
193     }
194     MPIR_ERR_CHECK(mpi_errno);
195 
196     /* free the temporary buffer */
197     MPIDI_CH3U_SRBuf_free(rreq);
198 
199     /* mark data transfer as complete and decrement CC */
200     mpi_errno = MPID_Request_complete(rreq);
201     MPIR_ERR_CHECK(mpi_errno);
202 
203     /* NOTE: finish_op_on_target() must be called after we complete this request,
204      * because inside finish_op_on_target() we may call this request handler
205      * on the same request again (in release_lock()). Marking this request as
206      * completed will prevent us from processing the same request twice. */
207     mpi_errno = finish_op_on_target(win_ptr, vc, FALSE /* has no response data */ ,
208                                     pkt_flags, source_win_handle);
209     MPIR_ERR_CHECK(mpi_errno);
210 
211     *complete = TRUE;
212 
213   fn_exit:
214     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMRECVCOMPLETE);
215     return mpi_errno;
216 
217     /* --BEGIN ERROR HANDLING-- */
218   fn_fail:
219     goto fn_exit;
220     /* --END ERROR HANDLING-- */
221 }
222 
223 
MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc,MPIR_Request * rreq,int * complete)224 int MPIDI_CH3_ReqHandler_GaccumRecvComplete(MPIDI_VC_t * vc, MPIR_Request * rreq, int *complete)
225 {
226     int mpi_errno = MPI_SUCCESS;
227     MPIR_Win *win_ptr;
228     MPIDI_CH3_Pkt_t upkt;
229     MPIDI_CH3_Pkt_get_accum_resp_t *get_accum_resp_pkt = &upkt.get_accum_resp;
230     MPIR_Request *resp_req;
231     struct iovec iov[MPL_IOV_LIMIT];
232     int iovcnt;
233     int is_contig;
234     MPI_Datatype basic_type;
235     MPI_Aint predef_count, predef_dtp_size;
236     MPI_Aint dt_true_lb;
237     MPI_Aint stream_offset;
238     int is_empty_origin = FALSE;
239     MPI_Aint extent, type_size;
240     MPI_Aint stream_data_len, total_len;
241     MPIR_CHKPMEM_DECL(1);
242     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMRECVCOMPLETE);
243 
244     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMRECVCOMPLETE);
245 
246     /* Judge if origin buffer is empty */
247     if (rreq->dev.op == MPI_NO_OP) {
248         is_empty_origin = TRUE;
249     }
250 
251     MPIR_Win_get_ptr(rreq->dev.target_win_handle, win_ptr);
252 
253     if (MPIR_DATATYPE_IS_PREDEFINED(rreq->dev.datatype))
254         basic_type = rreq->dev.datatype;
255     else {
256         basic_type = rreq->dev.datatype_ptr->basic_type;
257     }
258     MPIR_Assert(basic_type != MPI_DATATYPE_NULL);
259 
260     stream_offset = 0;
261     MPIDI_CH3_ExtPkt_Gaccum_get_stream(rreq->dev.pkt_flags,
262                                        (!MPIR_DATATYPE_IS_PREDEFINED(rreq->dev.datatype)),
263                                        rreq->dev.ext_hdr_ptr, &stream_offset);
264 
265     /* Use target data to calculate current stream unit size */
266     MPIR_Datatype_get_size_macro(rreq->dev.datatype, type_size);
267     total_len = type_size * rreq->dev.user_count;
268     MPIR_Datatype_get_size_macro(basic_type, predef_dtp_size);
269     MPIR_Datatype_get_extent_macro(basic_type, extent);
270     stream_data_len = MPL_MIN(total_len - (stream_offset / extent) * predef_dtp_size,
271                                (MPIDI_CH3U_SRBuf_size / extent) * predef_dtp_size);
272 
273     predef_count = stream_data_len / predef_dtp_size;
274     MPIR_Assert(predef_count > 0);
275 
276     MPIDI_Pkt_init(get_accum_resp_pkt, MPIDI_CH3_PKT_GET_ACCUM_RESP);
277     get_accum_resp_pkt->request_handle = rreq->dev.resp_request_handle;
278     get_accum_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
279     get_accum_resp_pkt->pkt_flags = MPIDI_CH3_PKT_FLAG_NONE;
280     if (rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
281         rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
282         get_accum_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
283     if ((rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
284         (rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
285         get_accum_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_ACK;
286 
287     /* check if data is contiguous and get true lb */
288     MPIR_Datatype_is_contig(rreq->dev.datatype, &is_contig);
289     MPIR_Datatype_get_true_lb(rreq->dev.datatype, &dt_true_lb);
290 
291     resp_req = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
292     MPIR_ERR_CHKANDJUMP(resp_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
293     MPIR_Object_set_ref(resp_req, 1);
294     MPIDI_Request_set_type(resp_req, MPIDI_REQUEST_TYPE_GET_ACCUM_RESP);
295 
296     MPIR_CHKPMEM_MALLOC(resp_req->dev.user_buf, void *, stream_data_len,
297                         mpi_errno, "GACC resp. buffer", MPL_MEM_BUFFER);
298 
299     /* NOTE: 'copy data + ACC' needs to be atomic */
300 
301     if (win_ptr->shm_allocated == TRUE) {
302         MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
303     }
304 
305     /* Copy data from target window to temporary buffer */
306 
307     if (is_contig) {
308         MPIR_Memcpy(resp_req->dev.user_buf,
309                     (void *) ((char *) rreq->dev.real_user_buf + dt_true_lb +
310                               stream_offset), stream_data_len);
311     }
312     else {
313         MPI_Aint actual_pack_bytes;
314         MPIR_Typerep_pack(rreq->dev.real_user_buf, rreq->dev.user_count, rreq->dev.datatype,
315                        stream_offset, resp_req->dev.user_buf, stream_data_len, &actual_pack_bytes);
316         MPIR_Assert(actual_pack_bytes == stream_data_len);
317     }
318 
319     /* accumulate data from tmp_buf into user_buf */
320     MPIR_Assert(predef_count == (int) predef_count);
321     mpi_errno = do_accumulate_op(rreq->dev.user_buf, (int) predef_count, basic_type,
322                                  rreq->dev.real_user_buf, rreq->dev.user_count, rreq->dev.datatype,
323                                  stream_offset, rreq->dev.op, MPIDI_RMA_ACC_SRCBUF_DEFAULT);
324 
325     if (win_ptr->shm_allocated == TRUE) {
326         MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
327     }
328 
329     MPIR_ERR_CHECK(mpi_errno);
330 
331     resp_req->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumSendComplete;
332     resp_req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumSendComplete;
333     resp_req->dev.target_win_handle = rreq->dev.target_win_handle;
334     resp_req->dev.pkt_flags = rreq->dev.pkt_flags;
335 
336     /* here we increment the Active Target counter to guarantee the GET-like
337      * operation are completed when counter reaches zero. */
338     win_ptr->at_completion_counter++;
339 
340     iov[0].iov_base = (void *) get_accum_resp_pkt;
341     iov[0].iov_len = sizeof(*get_accum_resp_pkt);
342     iov[1].iov_base = (void *) ((char *) resp_req->dev.user_buf);
343     iov[1].iov_len = stream_data_len;
344     iovcnt = 2;
345 
346     MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
347     mpi_errno = MPIDI_CH3_iSendv(vc, resp_req, iov, iovcnt);
348     MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
349 
350     MPIR_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
351 
352     /* Mark get portion as handled */
353     rreq->dev.resp_request_handle = MPI_REQUEST_NULL;
354 
355     MPIR_Assert(MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_GET_ACCUM_RECV);
356 
357     if (is_empty_origin == FALSE) {
358         /* free the temporary buffer.
359          * When origin data is zero, there
360          * is no temporary buffer allocated */
361         MPIDI_CH3U_SRBuf_free(rreq);
362     }
363 
364     /* mark data transfer as complete and decrement CC */
365     mpi_errno = MPID_Request_complete(rreq);
366     MPIR_ERR_CHECK(mpi_errno);
367 
368     *complete = TRUE;
369   fn_exit:
370     MPIR_CHKPMEM_COMMIT();
371     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMRECVCOMPLETE);
372     return mpi_errno;
373 
374     /* --BEGIN ERROR HANDLING-- */
375   fn_fail:
376     MPIR_CHKPMEM_REAP();
377     goto fn_exit;
378     /* --END ERROR HANDLING-- */
379 }
380 
381 
MPIDI_CH3_ReqHandler_FOPRecvComplete(MPIDI_VC_t * vc,MPIR_Request * rreq,int * complete)382 int MPIDI_CH3_ReqHandler_FOPRecvComplete(MPIDI_VC_t * vc, MPIR_Request * rreq, int *complete)
383 {
384     int mpi_errno = MPI_SUCCESS;
385     MPIR_Win *win_ptr = NULL;
386     MPI_Aint type_size;
387     MPIR_Request *resp_req = NULL;
388     struct iovec iov[MPL_IOV_LIMIT];
389     int iovcnt;
390     MPIDI_CH3_Pkt_t upkt;
391     MPIDI_CH3_Pkt_fop_resp_t *fop_resp_pkt = &upkt.fop_resp;
392     int is_contig;
393     int is_empty_origin = FALSE;
394     MPIR_CHKPMEM_DECL(1);
395     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_FOPRECVCOMPLETE);
396 
397     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_FOPRECVCOMPLETE);
398 
399     /* Judge if origin buffer is empty */
400     if (rreq->dev.op == MPI_NO_OP) {
401         is_empty_origin = TRUE;
402     }
403 
404     MPIR_Assert(MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_FOP_RECV);
405 
406     MPIR_Win_get_ptr(rreq->dev.target_win_handle, win_ptr);
407 
408     MPIR_Datatype_get_size_macro(rreq->dev.datatype, type_size);
409 
410     MPIR_Datatype_is_contig(rreq->dev.datatype, &is_contig);
411 
412     /* Create response request */
413     resp_req = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
414     MPIR_ERR_CHKANDJUMP(resp_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
415     MPIDI_Request_set_type(resp_req, MPIDI_REQUEST_TYPE_FOP_RESP);
416     MPIR_Object_set_ref(resp_req, 1);
417     resp_req->dev.OnFinal = MPIDI_CH3_ReqHandler_FOPSendComplete;
418     resp_req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_FOPSendComplete;
419     resp_req->dev.target_win_handle = rreq->dev.target_win_handle;
420     resp_req->dev.pkt_flags = rreq->dev.pkt_flags;
421 
422     MPIR_CHKPMEM_MALLOC(resp_req->dev.user_buf, void *, type_size, mpi_errno, "FOP resp. buffer", MPL_MEM_BUFFER);
423 
424     /* here we increment the Active Target counter to guarantee the GET-like
425      * operation are completed when counter reaches zero. */
426     win_ptr->at_completion_counter++;
427 
428     /* NOTE: 'copy data + ACC' needs to be atomic */
429 
430     if (win_ptr->shm_allocated == TRUE) {
431         MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
432     }
433 
434     /* Copy data into a temporary buffer in response request */
435     if (is_contig) {
436         MPIR_Memcpy(resp_req->dev.user_buf, rreq->dev.real_user_buf, type_size);
437     }
438     else {
439         MPI_Aint actual_pack_bytes;
440         MPIR_Typerep_pack(rreq->dev.real_user_buf, 1, rreq->dev.datatype, 0, resp_req->dev.user_buf,
441                        type_size, &actual_pack_bytes);
442         MPIR_Assert(actual_pack_bytes == type_size);
443     }
444 
445     /* Perform accumulate computation */
446     mpi_errno = do_accumulate_op(rreq->dev.user_buf, 1, rreq->dev.datatype,
447                                  rreq->dev.real_user_buf, 1, rreq->dev.datatype, 0, rreq->dev.op,
448                                  MPIDI_RMA_ACC_SRCBUF_DEFAULT);
449 
450     if (win_ptr->shm_allocated == TRUE) {
451         MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
452     }
453 
454     MPIR_ERR_CHECK(mpi_errno);
455 
456     /* Send back data */
457     MPIDI_Pkt_init(fop_resp_pkt, MPIDI_CH3_PKT_FOP_RESP);
458     fop_resp_pkt->request_handle = rreq->dev.resp_request_handle;
459     fop_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
460     fop_resp_pkt->pkt_flags = MPIDI_CH3_PKT_FLAG_NONE;
461     if (rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
462         rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
463         fop_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
464     if ((rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
465         (rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
466         fop_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_ACK;
467 
468     iov[0].iov_base = (void *) fop_resp_pkt;
469     iov[0].iov_len = sizeof(*fop_resp_pkt);
470     iov[1].iov_base = (void *) ((char *) resp_req->dev.user_buf);
471     iov[1].iov_len = type_size;
472     iovcnt = 2;
473 
474     MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
475     mpi_errno = MPIDI_CH3_iSendv(vc, resp_req, iov, iovcnt);
476     MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
477 
478     MPIR_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
479 
480     if (is_empty_origin == FALSE) {
481         /* free the temporary buffer.
482          * When origin data is zero, there
483          * is no temporary buffer allocated */
484         MPL_free((char *) rreq->dev.user_buf);
485     }
486 
487     /* mark data transfer as complete and decrement CC */
488     mpi_errno = MPID_Request_complete(rreq);
489     MPIR_ERR_CHECK(mpi_errno);
490 
491     *complete = TRUE;
492 
493   fn_exit:
494     MPIR_CHKPMEM_COMMIT();
495     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_FOPRECVCOMPLETE);
496     return mpi_errno;
497     /* --BEGIN ERROR HANDLING-- */
498   fn_fail:
499     MPIR_CHKPMEM_REAP();
500     goto fn_exit;
501     /* --END ERROR HANDLING-- */
502 }
503 
504 
MPIDI_CH3_ReqHandler_PutDerivedDTRecvComplete(MPIDI_VC_t * vc ATTRIBUTE ((unused)),MPIR_Request * rreq,int * complete)505 int MPIDI_CH3_ReqHandler_PutDerivedDTRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((unused)),
506                                                   MPIR_Request * rreq, int *complete)
507 {
508     int mpi_errno = MPI_SUCCESS;
509     MPIR_Datatype*new_dtp = NULL;
510     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_PUTDERIVEDDTRECVCOMPLETE);
511 
512     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_PUTDERIVEDDTRECVCOMPLETE);
513 
514     /* get data from extended header */
515     new_dtp = (MPIR_Datatype *) MPIR_Handle_obj_alloc(&MPIR_Datatype_mem);
516     if (!new_dtp) {
517         MPIR_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
518                              "MPIR_Datatype_mem");
519     }
520     /* Note: handle is filled in by MPIR_Handle_obj_alloc() */
521     MPIR_Object_set_ref(new_dtp, 1);
522     MPIR_Typerep_unflatten(new_dtp, rreq->dev.flattened_type);
523 
524     /* update request to get the data */
525     MPIDI_Request_set_type(rreq, MPIDI_REQUEST_TYPE_PUT_RECV);
526     rreq->dev.datatype = new_dtp->handle;
527     rreq->dev.recv_data_sz = new_dtp->size * rreq->dev.user_count;
528 
529     rreq->dev.datatype_ptr = new_dtp;
530 
531     rreq->dev.msg_offset = 0;
532     rreq->dev.msgsize = rreq->dev.recv_data_sz;
533 
534     mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq);
535     if (mpi_errno != MPI_SUCCESS) {
536         MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|loadrecviov");
537     }
538     if (!rreq->dev.OnDataAvail)
539         rreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PutRecvComplete;
540 
541     *complete = FALSE;
542   fn_fail:
543     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_PUTDERIVEDDTRECVCOMPLETE);
544     return mpi_errno;
545 }
546 
MPIDI_CH3_ReqHandler_AccumMetadataRecvComplete(MPIDI_VC_t * vc ATTRIBUTE ((unused)),MPIR_Request * rreq,int * complete)547 int MPIDI_CH3_ReqHandler_AccumMetadataRecvComplete(MPIDI_VC_t * vc ATTRIBUTE((unused)),
548                                                    MPIR_Request * rreq, int *complete)
549 {
550     int mpi_errno = MPI_SUCCESS;
551     MPIR_Datatype*new_dtp = NULL;
552     MPI_Aint basic_type_extent, basic_type_size;
553     MPI_Aint total_len, rest_len, stream_elem_count;
554     MPI_Aint stream_offset;
555     MPI_Aint type_size;
556     MPI_Datatype basic_dtp;
557     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMMETADATARECVCOMPLETE);
558 
559     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMMETADATARECVCOMPLETE);
560 
561     stream_offset = 0;
562 
563     /* get stream offset */
564     if (rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
565         MPIDI_CH3_Ext_pkt_stream_t *ext_hdr = NULL;
566         MPIR_Assert(rreq->dev.ext_hdr_ptr != NULL);
567         ext_hdr = ((MPIDI_CH3_Ext_pkt_stream_t *) rreq->dev.ext_hdr_ptr);
568         stream_offset = ext_hdr->stream_offset;
569     }
570 
571     if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_ACCUM_RECV_DERIVED_DT) {
572         new_dtp = (MPIR_Datatype *) MPIR_Handle_obj_alloc(&MPIR_Datatype_mem);
573         if (!new_dtp) {
574             MPIR_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
575                                  "MPIR_Datatype_mem");
576         }
577         /* Note: handle is filled in by MPIR_Handle_obj_alloc() */
578         MPIR_Object_set_ref(new_dtp, 1);
579         MPIR_Typerep_unflatten(new_dtp, rreq->dev.flattened_type);
580 
581         /* update new request to get the data */
582         MPIDI_Request_set_type(rreq, MPIDI_REQUEST_TYPE_ACCUM_RECV);
583 
584         MPIR_Assert(rreq->dev.datatype == MPI_DATATYPE_NULL);
585         rreq->dev.datatype = new_dtp->handle;
586         rreq->dev.datatype_ptr = new_dtp;
587 
588         type_size = new_dtp->size;
589 
590         basic_dtp = new_dtp->basic_type;
591     }
592     else {
593         MPIR_Assert(MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_ACCUM_RECV);
594         MPIR_Assert(rreq->dev.datatype != MPI_DATATYPE_NULL);
595 
596         MPIR_Datatype_get_size_macro(rreq->dev.datatype, type_size);
597 
598         basic_dtp = rreq->dev.datatype;
599     }
600 
601     MPIR_Datatype_get_size_macro(basic_dtp, basic_type_size);
602     MPIR_Datatype_get_extent_macro(basic_dtp, basic_type_extent);
603 
604     MPIR_Assert(!MPIDI_Request_get_srbuf_flag(rreq));
605     /* allocate a SRBuf for receiving stream unit */
606     MPIDI_CH3U_SRBuf_alloc(rreq, MPIDI_CH3U_SRBuf_size);
607     /* --BEGIN ERROR HANDLING-- */
608     if (rreq->dev.tmpbuf_sz == 0) {
609         MPL_DBG_MSG(MPIDI_CH3_DBG_CHANNEL, TYPICAL, "SRBuf allocation failure");
610         mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL,
611                                          __func__, __LINE__, MPI_ERR_OTHER, "**nomem",
612                                          "**nomem %d", MPIDI_CH3U_SRBuf_size);
613         rreq->status.MPI_ERROR = mpi_errno;
614         goto fn_fail;
615     }
616     /* --END ERROR HANDLING-- */
617 
618     rreq->dev.user_buf = rreq->dev.tmpbuf;
619 
620     total_len = type_size * rreq->dev.user_count;
621     rest_len = total_len - stream_offset;
622     stream_elem_count = MPIDI_CH3U_SRBuf_size / basic_type_extent;
623 
624     rreq->dev.recv_data_sz = MPL_MIN(rest_len, stream_elem_count * basic_type_size);
625 
626     rreq->dev.msg_offset = 0;
627     rreq->dev.msgsize = rreq->dev.recv_data_sz;
628 
629     MPI_Aint actual_iov_bytes, actual_iov_len;
630     MPIR_Typerep_to_iov(rreq->dev.tmpbuf, rreq->dev.recv_data_sz / basic_type_size, basic_dtp,
631                      0, rreq->dev.iov, MPL_IOV_LIMIT, rreq->dev.recv_data_sz,
632                      &actual_iov_len, &actual_iov_bytes);
633     rreq->dev.iov_count = (int) actual_iov_len;
634     rreq->dev.iov_offset = 0;
635 
636     rreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_AccumRecvComplete;
637 
638     *complete = FALSE;
639   fn_fail:
640     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_ACCUMMETADATARECVCOMPLETE);
641     return mpi_errno;
642 }
643 
644 
MPIDI_CH3_ReqHandler_GaccumMetadataRecvComplete(MPIDI_VC_t * vc,MPIR_Request * rreq,int * complete)645 int MPIDI_CH3_ReqHandler_GaccumMetadataRecvComplete(MPIDI_VC_t * vc,
646                                                     MPIR_Request * rreq, int *complete)
647 {
648     int mpi_errno = MPI_SUCCESS;
649     MPIR_Datatype*new_dtp = NULL;
650     MPI_Aint basic_type_extent, basic_type_size;
651     MPI_Aint total_len, rest_len, stream_elem_count;
652     MPI_Aint stream_offset;
653     MPI_Aint type_size;
654     MPI_Datatype basic_dtp;
655     int is_empty_origin = FALSE;
656     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMMETADATARECVCOMPLETE);
657 
658     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMMETADATARECVCOMPLETE);
659 
660     /* Judge if origin buffer is empty */
661     if (rreq->dev.op == MPI_NO_OP) {
662         is_empty_origin = TRUE;
663     }
664 
665     stream_offset = 0;
666 
667     /* get stream offset */
668     if (rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
669         MPIDI_CH3_Ext_pkt_stream_t *ext_hdr = NULL;
670         MPIR_Assert(rreq->dev.ext_hdr_ptr != NULL);
671         ext_hdr = ((MPIDI_CH3_Ext_pkt_stream_t *) rreq->dev.ext_hdr_ptr);
672         stream_offset = ext_hdr->stream_offset;
673     }
674 
675     if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_GET_ACCUM_RECV_DERIVED_DT) {
676         new_dtp = (MPIR_Datatype *) MPIR_Handle_obj_alloc(&MPIR_Datatype_mem);
677         if (!new_dtp) {
678             MPIR_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
679                                  "MPIR_Datatype_mem");
680         }
681         /* Note: handle is filled in by MPIR_Handle_obj_alloc() */
682         MPIR_Object_set_ref(new_dtp, 1);
683         MPIR_Typerep_unflatten(new_dtp, rreq->dev.flattened_type);
684 
685         /* update new request to get the data */
686         MPIDI_Request_set_type(rreq, MPIDI_REQUEST_TYPE_GET_ACCUM_RECV);
687 
688         MPIR_Assert(rreq->dev.datatype == MPI_DATATYPE_NULL);
689         rreq->dev.datatype = new_dtp->handle;
690         rreq->dev.datatype_ptr = new_dtp;
691 
692         type_size = new_dtp->size;
693 
694         basic_dtp = new_dtp->basic_type;
695     }
696     else {
697         MPIR_Assert(MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_GET_ACCUM_RECV);
698         MPIR_Assert(rreq->dev.datatype != MPI_DATATYPE_NULL);
699 
700         MPIR_Datatype_get_size_macro(rreq->dev.datatype, type_size);
701 
702         basic_dtp = rreq->dev.datatype;
703     }
704 
705     if (is_empty_origin == TRUE) {
706         rreq->dev.recv_data_sz = 0;
707 
708         /* There is no origin data coming, directly call final req handler. */
709         mpi_errno = MPIDI_CH3_ReqHandler_GaccumRecvComplete(vc, rreq, complete);
710         MPIR_ERR_CHECK(mpi_errno);
711     }
712     else {
713         MPIR_Datatype_get_size_macro(basic_dtp, basic_type_size);
714         MPIR_Datatype_get_extent_macro(basic_dtp, basic_type_extent);
715 
716         MPIR_Assert(!MPIDI_Request_get_srbuf_flag(rreq));
717         /* allocate a SRBuf for receiving stream unit */
718         MPIDI_CH3U_SRBuf_alloc(rreq, MPIDI_CH3U_SRBuf_size);
719         /* --BEGIN ERROR HANDLING-- */
720         if (rreq->dev.tmpbuf_sz == 0) {
721             MPL_DBG_MSG(MPIDI_CH3_DBG_CHANNEL, TYPICAL, "SRBuf allocation failure");
722             mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_FATAL,
723                                              __func__, __LINE__, MPI_ERR_OTHER, "**nomem",
724                                              "**nomem %d", MPIDI_CH3U_SRBuf_size);
725             rreq->status.MPI_ERROR = mpi_errno;
726             goto fn_fail;
727         }
728         /* --END ERROR HANDLING-- */
729 
730         rreq->dev.user_buf = rreq->dev.tmpbuf;
731 
732         total_len = type_size * rreq->dev.user_count;
733         rest_len = total_len - stream_offset;
734         stream_elem_count = MPIDI_CH3U_SRBuf_size / basic_type_extent;
735 
736         rreq->dev.recv_data_sz = MPL_MIN(rest_len, stream_elem_count * basic_type_size);
737 
738         rreq->dev.msg_offset = 0;
739         rreq->dev.msgsize = rreq->dev.recv_data_sz;
740 
741         MPI_Aint actual_iov_bytes, actual_iov_len;
742         MPIR_Typerep_to_iov(rreq->dev.tmpbuf, rreq->dev.recv_data_sz / basic_type_size, basic_dtp,
743                          0, rreq->dev.iov, MPL_IOV_LIMIT, rreq->dev.recv_data_sz,
744                          &actual_iov_len, &actual_iov_bytes);
745         rreq->dev.iov_count = actual_iov_len;
746         rreq->dev.iov_offset = 0;
747 
748         rreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumRecvComplete;
749 
750         *complete = FALSE;
751     }
752 
753   fn_fail:
754     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_GACCUMMETADATARECVCOMPLETE);
755     return mpi_errno;
756 }
757 
758 
759 
MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete(MPIDI_VC_t * vc,MPIR_Request * rreq,int * complete)760 int MPIDI_CH3_ReqHandler_GetDerivedDTRecvComplete(MPIDI_VC_t * vc,
761                                                   MPIR_Request * rreq, int *complete)
762 {
763     int mpi_errno = MPI_SUCCESS;
764     MPIR_Datatype*new_dtp = NULL;
765     MPIDI_CH3_Pkt_t upkt;
766     MPIDI_CH3_Pkt_get_resp_t *get_resp_pkt = &upkt.get_resp;
767     MPIR_Request *sreq;
768     MPIR_Win *win_ptr;
769     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_GETDERIVEDDTRECVCOMPLETE);
770 
771     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_GETDERIVEDDTRECVCOMPLETE);
772 
773     MPIR_Win_get_ptr(rreq->dev.target_win_handle, win_ptr);
774 
775     MPIR_Assert(!(rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP));
776 
777     /* get data from extended header */
778     new_dtp = (MPIR_Datatype *) MPIR_Handle_obj_alloc(&MPIR_Datatype_mem);
779     if (!new_dtp) {
780         MPIR_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem", "**nomem %s",
781                              "MPIR_Datatype_mem");
782     }
783     /* Note: handle is filled in by MPIR_Handle_obj_alloc() */
784     MPIR_Object_set_ref(new_dtp, 1);
785     MPIR_Typerep_unflatten(new_dtp, rreq->dev.flattened_type);
786 
787     /* create request for sending data */
788     sreq = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
789     MPIR_ERR_CHKANDJUMP(sreq == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
790 
791     sreq->kind = MPIR_REQUEST_KIND__SEND;
792     MPIDI_Request_set_type(sreq, MPIDI_REQUEST_TYPE_GET_RESP);
793     sreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GetSendComplete;
794     sreq->dev.OnFinal = MPIDI_CH3_ReqHandler_GetSendComplete;
795     sreq->dev.user_buf = rreq->dev.user_buf;
796     sreq->dev.user_count = rreq->dev.user_count;
797     sreq->dev.datatype = new_dtp->handle;
798     sreq->dev.datatype_ptr = new_dtp;
799     sreq->dev.target_win_handle = rreq->dev.target_win_handle;
800     sreq->dev.pkt_flags = rreq->dev.pkt_flags;
801 
802     MPIDI_Pkt_init(get_resp_pkt, MPIDI_CH3_PKT_GET_RESP);
803     get_resp_pkt->request_handle = rreq->dev.request_handle;
804     get_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
805     get_resp_pkt->pkt_flags = MPIDI_CH3_PKT_FLAG_NONE;
806     if (rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
807         rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
808         get_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
809     if ((rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
810         (rreq->dev.pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
811         get_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_ACK;
812 
813     sreq->dev.msg_offset = 0;
814     sreq->dev.msgsize = new_dtp->size * sreq->dev.user_count;
815 
816     /* Because this is in a packet handler, it is already within a critical section */
817     /* MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex); */
818     mpi_errno = vc->sendNoncontig_fn(vc, sreq, get_resp_pkt, sizeof(*get_resp_pkt),
819                                      NULL, 0);
820     /* MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex); */
821     /* --BEGIN ERROR HANDLING-- */
822     if (mpi_errno != MPI_SUCCESS) {
823         MPIR_Request_free(sreq);
824         sreq = NULL;
825         MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
826     }
827     /* --END ERROR HANDLING-- */
828 
829     /* mark receive data transfer as complete and decrement CC in receive
830      * request */
831     mpi_errno = MPID_Request_complete(rreq);
832     MPIR_ERR_CHECK(mpi_errno);
833 
834     *complete = TRUE;
835 
836   fn_exit:
837     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_GETDERIVEDDTRECVCOMPLETE);
838     return mpi_errno;
839   fn_fail:
840     goto fn_exit;
841 }
842 
843 
MPIDI_CH3_ReqHandler_UnpackUEBufComplete(MPIDI_VC_t * vc ATTRIBUTE ((unused)),MPIR_Request * rreq,int * complete)844 int MPIDI_CH3_ReqHandler_UnpackUEBufComplete(MPIDI_VC_t * vc ATTRIBUTE((unused)),
845                                              MPIR_Request * rreq, int *complete)
846 {
847     int recv_pending;
848     int mpi_errno = MPI_SUCCESS;
849     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_UNPACKUEBUFCOMPLETE);
850 
851     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_UNPACKUEBUFCOMPLETE);
852 
853     MPIDI_Request_decr_pending(rreq);
854     MPIDI_Request_check_pending(rreq, &recv_pending);
855     if (!recv_pending) {
856         if (rreq->dev.recv_data_sz > 0) {
857             MPIDI_CH3U_Request_unpack_uebuf(rreq);
858             MPL_free(rreq->dev.tmpbuf);
859         }
860     }
861     else {
862         /* The receive has not been posted yet.  MPID_{Recv/Irecv}()
863          * is responsible for unpacking the buffer. */
864     }
865 
866     /* mark data transfer as complete and decrement CC */
867     mpi_errno = MPID_Request_complete(rreq);
868     MPIR_ERR_CHECK(mpi_errno);
869 
870     *complete = TRUE;
871 
872   fn_exit:
873     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_UNPACKUEBUFCOMPLETE);
874     return mpi_errno;
875   fn_fail:
876     goto fn_exit;
877 }
878 
MPIDI_CH3_ReqHandler_UnpackSRBufComplete(MPIDI_VC_t * vc,MPIR_Request * rreq,int * complete)879 int MPIDI_CH3_ReqHandler_UnpackSRBufComplete(MPIDI_VC_t * vc, MPIR_Request * rreq, int *complete)
880 {
881     int mpi_errno = MPI_SUCCESS;
882     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_UNPACKSRBUFCOMPLETE);
883 
884     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_UNPACKSRBUFCOMPLETE);
885 
886     MPIDI_CH3U_Request_unpack_srbuf(rreq);
887 
888     if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_PUT_RECV) {
889         mpi_errno = MPIDI_CH3_ReqHandler_PutRecvComplete(vc, rreq, complete);
890     }
891     else if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_ACCUM_RECV) {
892         mpi_errno = MPIDI_CH3_ReqHandler_AccumRecvComplete(vc, rreq, complete);
893     }
894     else if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_GET_ACCUM_RECV) {
895         mpi_errno = MPIDI_CH3_ReqHandler_GaccumRecvComplete(vc, rreq, complete);
896     }
897     else if (MPIDI_Request_get_type(rreq) == MPIDI_REQUEST_TYPE_FOP_RECV) {
898         mpi_errno = MPIDI_CH3_ReqHandler_FOPRecvComplete(vc, rreq, complete);
899     }
900     else {
901         /* mark data transfer as complete and decrement CC */
902         mpi_errno = MPID_Request_complete(rreq);
903         MPIR_ERR_CHECK(mpi_errno);
904 
905         *complete = TRUE;
906     }
907 
908   fn_exit:
909     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_UNPACKSRBUFCOMPLETE);
910     return mpi_errno;
911   fn_fail:
912     goto fn_exit;
913 }
914 
MPIDI_CH3_ReqHandler_UnpackSRBufReloadIOV(MPIDI_VC_t * vc ATTRIBUTE ((unused)),MPIR_Request * rreq,int * complete)915 int MPIDI_CH3_ReqHandler_UnpackSRBufReloadIOV(MPIDI_VC_t * vc ATTRIBUTE((unused)),
916                                               MPIR_Request * rreq, int *complete)
917 {
918     int mpi_errno = MPI_SUCCESS;
919     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_UNPACKSRBUFRELOADIOV);
920 
921     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_UNPACKSRBUFRELOADIOV);
922 
923     MPIDI_CH3U_Request_unpack_srbuf(rreq);
924     mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq);
925     if (mpi_errno != MPI_SUCCESS) {
926         MPIR_ERR_SETFATALANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|loadrecviov");
927     }
928     *complete = FALSE;
929   fn_fail:
930     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_UNPACKSRBUFRELOADIOV);
931     return mpi_errno;
932 }
933 
MPIDI_CH3_ReqHandler_ReloadIOV(MPIDI_VC_t * vc ATTRIBUTE ((unused)),MPIR_Request * rreq,int * complete)934 int MPIDI_CH3_ReqHandler_ReloadIOV(MPIDI_VC_t * vc ATTRIBUTE((unused)),
935                                    MPIR_Request * rreq, int *complete)
936 {
937     int mpi_errno = MPI_SUCCESS;
938     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_RELOADIOV);
939 
940     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_RELOADIOV);
941 
942     mpi_errno = MPIDI_CH3U_Request_load_recv_iov(rreq);
943     if (mpi_errno != MPI_SUCCESS) {
944         MPIR_ERR_SETFATALANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|loadrecviov");
945     }
946     *complete = FALSE;
947   fn_fail:
948     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_RELOADIOV);
949     return mpi_errno;
950 }
951 
952 /* ----------------------------------------------------------------------- */
953 /* ----------------------------------------------------------------------- */
954 
perform_put_in_lock_queue(MPIR_Win * win_ptr,MPIDI_RMA_Target_lock_entry_t * target_lock_entry)955 static inline int perform_put_in_lock_queue(MPIR_Win * win_ptr,
956                                             MPIDI_RMA_Target_lock_entry_t * target_lock_entry)
957 {
958     MPIDI_CH3_Pkt_put_t *put_pkt = &((target_lock_entry->pkt).put);
959     int mpi_errno = MPI_SUCCESS;
960 
961     /* Piggyback candidate should have basic datatype for target datatype. */
962     MPIR_Assert(MPIR_DATATYPE_IS_PREDEFINED(put_pkt->datatype));
963 
964     /* Make sure that all data is received for this op. */
965     MPIR_Assert(target_lock_entry->all_data_recved == 1);
966 
967     if (put_pkt->type == MPIDI_CH3_PKT_PUT_IMMED) {
968         /* all data fits in packet header */
969         mpi_errno = MPIR_Localcopy((void *) &put_pkt->info.data, put_pkt->count, put_pkt->datatype,
970                                    put_pkt->addr, put_pkt->count, put_pkt->datatype);
971         MPIR_ERR_CHECK(mpi_errno);
972     }
973     else {
974         MPIR_Assert(put_pkt->type == MPIDI_CH3_PKT_PUT);
975 
976         mpi_errno = MPIR_Localcopy(target_lock_entry->data, put_pkt->count, put_pkt->datatype,
977                                    put_pkt->addr, put_pkt->count, put_pkt->datatype);
978         MPIR_ERR_CHECK(mpi_errno);
979     }
980 
981     /* do final action */
982     mpi_errno =
983         finish_op_on_target(win_ptr, target_lock_entry->vc, FALSE /* has no response data */ ,
984                             put_pkt->pkt_flags, put_pkt->source_win_handle);
985     MPIR_ERR_CHECK(mpi_errno);
986 
987   fn_exit:
988     return mpi_errno;
989   fn_fail:
990     goto fn_exit;
991 }
992 
perform_get_in_lock_queue(MPIR_Win * win_ptr,MPIDI_RMA_Target_lock_entry_t * target_lock_entry)993 static inline int perform_get_in_lock_queue(MPIR_Win * win_ptr,
994                                             MPIDI_RMA_Target_lock_entry_t * target_lock_entry)
995 {
996     MPIDI_CH3_Pkt_t upkt;
997     MPIDI_CH3_Pkt_get_resp_t *get_resp_pkt = &upkt.get_resp;
998     MPIDI_CH3_Pkt_get_t *get_pkt = &((target_lock_entry->pkt).get);
999     MPIR_Request *sreq = NULL;
1000     MPI_Aint type_size;
1001     size_t len;
1002     int iovcnt;
1003     struct iovec iov[MPL_IOV_LIMIT];
1004     int is_contig;
1005     int mpi_errno = MPI_SUCCESS;
1006 
1007     /* Piggyback candidate should have basic datatype for target datatype. */
1008     MPIR_Assert(MPIR_DATATYPE_IS_PREDEFINED(get_pkt->datatype));
1009 
1010     /* Make sure that all data is received for this op. */
1011     MPIR_Assert(target_lock_entry->all_data_recved == 1);
1012 
1013     sreq = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
1014     if (sreq == NULL) {
1015         MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**nomemreq");
1016     }
1017     MPIR_Object_set_ref(sreq, 1);
1018 
1019     MPIDI_Request_set_type(sreq, MPIDI_REQUEST_TYPE_GET_RESP);
1020     sreq->kind = MPIR_REQUEST_KIND__SEND;
1021     sreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GetSendComplete;
1022     sreq->dev.OnFinal = MPIDI_CH3_ReqHandler_GetSendComplete;
1023 
1024     sreq->dev.target_win_handle = win_ptr->handle;
1025     sreq->dev.pkt_flags = get_pkt->pkt_flags;
1026 
1027     /* here we increment the Active Target counter to guarantee the GET-like
1028      * operation are completed when counter reaches zero. */
1029     win_ptr->at_completion_counter++;
1030 
1031     if (get_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP) {
1032         MPIDI_Pkt_init(get_resp_pkt, MPIDI_CH3_PKT_GET_RESP_IMMED);
1033     }
1034     else {
1035         MPIDI_Pkt_init(get_resp_pkt, MPIDI_CH3_PKT_GET_RESP);
1036     }
1037     get_resp_pkt->request_handle = get_pkt->request_handle;
1038     get_resp_pkt->pkt_flags = MPIDI_CH3_PKT_FLAG_NONE;
1039     if (get_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
1040         get_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
1041         get_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
1042     if ((get_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
1043         (get_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
1044         get_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_ACK;
1045     get_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
1046 
1047     /* length of target data */
1048     MPIR_Datatype_get_size_macro(get_pkt->datatype, type_size);
1049     MPIR_Assign_trunc(len, get_pkt->count * type_size, size_t);
1050 
1051     MPIR_Datatype_is_contig(get_pkt->datatype, &is_contig);
1052 
1053     if (get_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_IMMED_RESP) {
1054         void *src = (void *) (get_pkt->addr), *dest = (void *) &get_resp_pkt->info.data;
1055         mpi_errno = immed_copy(src, dest, len);
1056         MPIR_ERR_CHECK(mpi_errno);
1057 
1058         /* All origin data is in packet header, issue the header. */
1059         iov[0].iov_base = (void *) get_resp_pkt;
1060         iov[0].iov_len = sizeof(*get_resp_pkt);
1061         iovcnt = 1;
1062 
1063         mpi_errno = MPIDI_CH3_iSendv(target_lock_entry->vc, sreq, iov, iovcnt);
1064         if (mpi_errno != MPI_SUCCESS) {
1065             MPIR_Request_free(sreq);
1066             MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
1067         }
1068     }
1069     else if (is_contig) {
1070         iov[0].iov_base = (void *) get_resp_pkt;
1071         iov[0].iov_len = sizeof(*get_resp_pkt);
1072         iov[1].iov_base = (void *) (get_pkt->addr);
1073         iov[1].iov_len = get_pkt->count * type_size;
1074         iovcnt = 2;
1075 
1076         mpi_errno = MPIDI_CH3_iSendv(target_lock_entry->vc, sreq, iov, iovcnt);
1077         if (mpi_errno != MPI_SUCCESS) {
1078             MPIR_Request_free(sreq);
1079             MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
1080         }
1081     }
1082     else {
1083         iov[0].iov_base = (void *) get_resp_pkt;
1084         iov[0].iov_len = sizeof(*get_resp_pkt);
1085 
1086         sreq->dev.user_buf = get_pkt->addr;
1087         sreq->dev.user_count = get_pkt->count;
1088         sreq->dev.datatype = get_pkt->datatype;
1089         sreq->dev.msg_offset = 0;
1090         sreq->dev.msgsize = get_pkt->count * type_size;
1091 
1092         mpi_errno = target_lock_entry->vc->sendNoncontig_fn(target_lock_entry->vc, sreq,
1093                                                             iov[0].iov_base, iov[0].iov_len,
1094                                                             NULL, 0);
1095         MPIR_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
1096     }
1097 
1098   fn_exit:
1099     return mpi_errno;
1100   fn_fail:
1101     goto fn_exit;
1102 }
1103 
1104 
perform_acc_in_lock_queue(MPIR_Win * win_ptr,MPIDI_RMA_Target_lock_entry_t * target_lock_entry)1105 static inline int perform_acc_in_lock_queue(MPIR_Win * win_ptr,
1106                                             MPIDI_RMA_Target_lock_entry_t * target_lock_entry)
1107 {
1108     MPIDI_CH3_Pkt_accum_t *acc_pkt = &((target_lock_entry->pkt).accum);
1109     int mpi_errno = MPI_SUCCESS;
1110 
1111     MPIR_Assert(target_lock_entry->all_data_recved == 1);
1112 
1113     /* Piggyback candidate should have basic datatype for target datatype. */
1114     MPIR_Assert(MPIR_DATATYPE_IS_PREDEFINED(acc_pkt->datatype));
1115 
1116     if (win_ptr->shm_allocated == TRUE) {
1117         MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
1118     }
1119 
1120     if (acc_pkt->type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
1121         /* All data fits in packet header */
1122         mpi_errno = do_accumulate_op((void *) &acc_pkt->info.data, acc_pkt->count,
1123                                      acc_pkt->datatype, acc_pkt->addr, acc_pkt->count,
1124                                      acc_pkt->datatype, 0, acc_pkt->op, MPIDI_RMA_ACC_SRCBUF_DEFAULT);
1125     }
1126     else {
1127         MPIR_Assert(acc_pkt->type == MPIDI_CH3_PKT_ACCUMULATE);
1128         MPI_Aint type_size, type_extent;
1129         MPI_Aint total_len, recv_count;
1130 
1131         MPIR_Datatype_get_size_macro(acc_pkt->datatype, type_size);
1132         MPIR_Datatype_get_extent_macro(acc_pkt->datatype, type_extent);
1133 
1134         total_len = type_size * acc_pkt->count;
1135         recv_count = MPL_MIN((total_len / type_size), (MPIDI_CH3U_SRBuf_size / type_extent));
1136         MPIR_Assert(recv_count > 0);
1137 
1138         /* Note: here stream_offset is 0 because when piggybacking LOCK, we must use
1139          * the first stream unit. */
1140         MPIR_Assert(recv_count == (int) recv_count);
1141         mpi_errno = do_accumulate_op(target_lock_entry->data, (int) recv_count, acc_pkt->datatype,
1142                                      acc_pkt->addr, acc_pkt->count, acc_pkt->datatype,
1143                                      0, acc_pkt->op, MPIDI_RMA_ACC_SRCBUF_DEFAULT);
1144     }
1145 
1146     if (win_ptr->shm_allocated == TRUE) {
1147         MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
1148     }
1149 
1150     MPIR_ERR_CHECK(mpi_errno);
1151 
1152     mpi_errno =
1153         finish_op_on_target(win_ptr, target_lock_entry->vc, FALSE /* has no response data */ ,
1154                             acc_pkt->pkt_flags, acc_pkt->source_win_handle);
1155     MPIR_ERR_CHECK(mpi_errno);
1156 
1157   fn_exit:
1158     return mpi_errno;
1159   fn_fail:
1160     goto fn_exit;
1161 }
1162 
1163 
perform_get_acc_in_lock_queue(MPIR_Win * win_ptr,MPIDI_RMA_Target_lock_entry_t * target_lock_entry)1164 static inline int perform_get_acc_in_lock_queue(MPIR_Win * win_ptr,
1165                                                 MPIDI_RMA_Target_lock_entry_t * target_lock_entry)
1166 {
1167     MPIDI_CH3_Pkt_t upkt;
1168     MPIDI_CH3_Pkt_get_accum_resp_t *get_accum_resp_pkt = &upkt.get_accum_resp;
1169     MPIDI_CH3_Pkt_get_accum_t *get_accum_pkt = &((target_lock_entry->pkt).get_accum);
1170     MPIR_Request *sreq = NULL;
1171     MPI_Aint type_size;
1172     size_t len;
1173     int iovcnt;
1174     struct iovec iov[MPL_IOV_LIMIT];
1175     int is_contig;
1176     int mpi_errno = MPI_SUCCESS;
1177     MPI_Aint type_extent;
1178     MPI_Aint total_len, recv_count;
1179 
1180     /* Piggyback candidate should have basic datatype for target datatype. */
1181     MPIR_Assert(MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype));
1182 
1183     /* Make sure that all data is received for this op. */
1184     MPIR_Assert(target_lock_entry->all_data_recved == 1);
1185 
1186     sreq = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
1187     if (sreq == NULL) {
1188         MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**nomemreq");
1189     }
1190     MPIR_Object_set_ref(sreq, 1);
1191 
1192     MPIDI_Request_set_type(sreq, MPIDI_REQUEST_TYPE_GET_ACCUM_RESP);
1193     sreq->kind = MPIR_REQUEST_KIND__SEND;
1194     sreq->dev.OnDataAvail = MPIDI_CH3_ReqHandler_GaccumSendComplete;
1195     sreq->dev.OnFinal = MPIDI_CH3_ReqHandler_GaccumSendComplete;
1196 
1197     sreq->dev.target_win_handle = win_ptr->handle;
1198     sreq->dev.pkt_flags = get_accum_pkt->pkt_flags;
1199 
1200     /* Copy data into a temporary buffer */
1201     MPIR_Datatype_get_size_macro(get_accum_pkt->datatype, type_size);
1202 
1203     /* length of target data */
1204     MPIR_Assign_trunc(len, get_accum_pkt->count * type_size, size_t);
1205 
1206     if (get_accum_pkt->type == MPIDI_CH3_PKT_GET_ACCUM_IMMED) {
1207         MPIDI_Pkt_init(get_accum_resp_pkt, MPIDI_CH3_PKT_GET_ACCUM_RESP_IMMED);
1208         get_accum_resp_pkt->request_handle = get_accum_pkt->request_handle;
1209         get_accum_resp_pkt->pkt_flags = MPIDI_CH3_PKT_FLAG_NONE;
1210         if (get_accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
1211             get_accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
1212             get_accum_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
1213         if ((get_accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
1214             (get_accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
1215             get_accum_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_ACK;
1216         get_accum_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
1217 
1218 
1219         /* NOTE: copy 'data + ACC' needs to be atomic */
1220 
1221         if (win_ptr->shm_allocated == TRUE) {
1222             MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
1223         }
1224 
1225         /* Copy data from target window to response packet header */
1226 
1227         void *src = (void *) (get_accum_pkt->addr), *dest =
1228             (void *) &(get_accum_resp_pkt->info.data);
1229         mpi_errno = immed_copy(src, dest, len);
1230         if (mpi_errno != MPI_SUCCESS) {
1231             if (win_ptr->shm_allocated == TRUE) {
1232                 MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
1233             }
1234             MPIR_ERR_POP(mpi_errno);
1235         }
1236 
1237         /* Perform ACCUMULATE OP */
1238 
1239         /* All data fits in packet header */
1240         mpi_errno =
1241             do_accumulate_op((void *) &get_accum_pkt->info.data, get_accum_pkt->count,
1242                              get_accum_pkt->datatype, get_accum_pkt->addr, get_accum_pkt->count,
1243                              get_accum_pkt->datatype, 0, get_accum_pkt->op,
1244                              MPIDI_RMA_ACC_SRCBUF_DEFAULT);
1245 
1246         if (win_ptr->shm_allocated == TRUE) {
1247             MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
1248         }
1249 
1250         MPIR_ERR_CHECK(mpi_errno);
1251 
1252         /* here we increment the Active Target counter to guarantee the GET-like
1253          * operation are completed when counter reaches zero. */
1254         win_ptr->at_completion_counter++;
1255 
1256         /* All origin data is in packet header, issue the header. */
1257         iov[0].iov_base = (void *) get_accum_resp_pkt;
1258         iov[0].iov_len = sizeof(*get_accum_resp_pkt);
1259         iovcnt = 1;
1260 
1261         mpi_errno = MPIDI_CH3_iSendv(target_lock_entry->vc, sreq, iov, iovcnt);
1262         if (mpi_errno != MPI_SUCCESS) {
1263             MPIR_Request_free(sreq);
1264             MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
1265         }
1266 
1267         goto fn_exit;
1268     }
1269 
1270     MPIR_Assert(get_accum_pkt->type == MPIDI_CH3_PKT_GET_ACCUM);
1271 
1272     MPIR_Datatype_get_extent_macro(get_accum_pkt->datatype, type_extent);
1273 
1274     total_len = type_size * get_accum_pkt->count;
1275     recv_count = MPL_MIN((total_len / type_size), (MPIDI_CH3U_SRBuf_size / type_extent));
1276     MPIR_Assert(recv_count > 0);
1277 
1278     sreq->dev.user_buf = (void *) MPL_malloc(recv_count * type_size, MPL_MEM_BUFFER);
1279 
1280     MPIR_Datatype_is_contig(get_accum_pkt->datatype, &is_contig);
1281 
1282     /* NOTE: 'copy data + ACC' needs to be atomic */
1283 
1284     if (win_ptr->shm_allocated == TRUE) {
1285         MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
1286     }
1287 
1288     /* Copy data from target window to temporary buffer */
1289 
1290     /* Note: here stream_offset is 0 because when piggybacking LOCK, we must use
1291      * the first stream unit. */
1292 
1293     if (is_contig) {
1294         MPIR_Memcpy(sreq->dev.user_buf, get_accum_pkt->addr, recv_count * type_size);
1295     }
1296     else {
1297         MPI_Aint actual_pack_bytes;
1298         MPIR_Typerep_pack(get_accum_pkt->addr, get_accum_pkt->count, get_accum_pkt->datatype,
1299                        0, sreq->dev.user_buf, type_size * recv_count, &actual_pack_bytes);
1300         MPIR_Assert(actual_pack_bytes == type_size * recv_count);
1301     }
1302 
1303     /* Perform ACCUMULATE OP */
1304 
1305     MPIR_Assert(recv_count == (int) recv_count);
1306     mpi_errno = do_accumulate_op(target_lock_entry->data, (int) recv_count, get_accum_pkt->datatype,
1307                                  get_accum_pkt->addr, get_accum_pkt->count, get_accum_pkt->datatype,
1308                                  0, get_accum_pkt->op, MPIDI_RMA_ACC_SRCBUF_DEFAULT);
1309 
1310     if (win_ptr->shm_allocated == TRUE) {
1311         MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
1312     }
1313 
1314     MPIR_ERR_CHECK(mpi_errno);
1315 
1316     /* here we increment the Active Target counter to guarantee the GET-like
1317      * operation are completed when counter reaches zero. */
1318     win_ptr->at_completion_counter++;
1319 
1320     MPIDI_Pkt_init(get_accum_resp_pkt, MPIDI_CH3_PKT_GET_ACCUM_RESP);
1321     get_accum_resp_pkt->request_handle = get_accum_pkt->request_handle;
1322     get_accum_resp_pkt->pkt_flags = MPIDI_CH3_PKT_FLAG_NONE;
1323     if (get_accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
1324         get_accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
1325         get_accum_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
1326     if ((get_accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
1327         (get_accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
1328         get_accum_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_ACK;
1329     get_accum_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
1330 
1331     iov[0].iov_base = (void *) get_accum_resp_pkt;
1332     iov[0].iov_len = sizeof(*get_accum_resp_pkt);
1333     iov[1].iov_base = (void *) ((char *) sreq->dev.user_buf);
1334     iov[1].iov_len = recv_count * type_size;
1335     iovcnt = 2;
1336 
1337     mpi_errno = MPIDI_CH3_iSendv(target_lock_entry->vc, sreq, iov, iovcnt);
1338     if (mpi_errno != MPI_SUCCESS) {
1339         MPIR_Request_free(sreq);
1340         MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
1341     }
1342 
1343   fn_exit:
1344     return mpi_errno;
1345   fn_fail:
1346     goto fn_exit;
1347 }
1348 
1349 
perform_fop_in_lock_queue(MPIR_Win * win_ptr,MPIDI_RMA_Target_lock_entry_t * target_lock_entry)1350 static inline int perform_fop_in_lock_queue(MPIR_Win * win_ptr,
1351                                             MPIDI_RMA_Target_lock_entry_t * target_lock_entry)
1352 {
1353     MPIDI_CH3_Pkt_t upkt;
1354     MPIDI_CH3_Pkt_fop_resp_t *fop_resp_pkt = &upkt.fop_resp;
1355     MPIDI_CH3_Pkt_fop_t *fop_pkt = &((target_lock_entry->pkt).fop);
1356     MPIR_Request *resp_req = NULL;
1357     MPI_Aint type_size;
1358     struct iovec iov[MPL_IOV_LIMIT];
1359     int iovcnt;
1360     int is_contig;
1361     int mpi_errno = MPI_SUCCESS;
1362 
1363     /* Piggyback candidate should have basic datatype for target datatype. */
1364     MPIR_Assert(MPIR_DATATYPE_IS_PREDEFINED(fop_pkt->datatype));
1365 
1366     /* Make sure that all data is received for this op. */
1367     MPIR_Assert(target_lock_entry->all_data_recved == 1);
1368 
1369     /* FIXME: this function is same with PktHandler_FOP(), should
1370      * do code refactoring on both of them. */
1371 
1372     MPIR_Datatype_get_size_macro(fop_pkt->datatype, type_size);
1373 
1374     MPIR_Datatype_is_contig(fop_pkt->datatype, &is_contig);
1375 
1376     if (fop_pkt->type == MPIDI_CH3_PKT_FOP_IMMED) {
1377         MPIDI_Pkt_init(fop_resp_pkt, MPIDI_CH3_PKT_FOP_RESP_IMMED);
1378     }
1379     else {
1380         MPIDI_Pkt_init(fop_resp_pkt, MPIDI_CH3_PKT_FOP_RESP);
1381     }
1382 
1383     fop_resp_pkt->request_handle = fop_pkt->request_handle;
1384     fop_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
1385     fop_resp_pkt->pkt_flags = MPIDI_CH3_PKT_FLAG_NONE;
1386     if (fop_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
1387         fop_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
1388         fop_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
1389     if ((fop_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
1390         (fop_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
1391         fop_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_ACK;
1392 
1393     if (fop_pkt->type == MPIDI_CH3_PKT_FOP) {
1394         resp_req = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
1395         if (resp_req == NULL) {
1396             MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**nomemreq");
1397         }
1398         MPIR_Object_set_ref(resp_req, 1);
1399 
1400         resp_req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_FOPSendComplete;
1401         resp_req->dev.OnFinal = MPIDI_CH3_ReqHandler_FOPSendComplete;
1402 
1403         resp_req->dev.target_win_handle = win_ptr->handle;
1404         resp_req->dev.pkt_flags = fop_pkt->pkt_flags;
1405 
1406         resp_req->dev.user_buf = (void *) MPL_malloc(type_size, MPL_MEM_BUFFER);
1407 
1408         /* here we increment the Active Target counter to guarantee the GET-like
1409          * operation are completed when counter reaches zero. */
1410         win_ptr->at_completion_counter++;
1411     }
1412 
1413     /* NOTE: 'copy data + ACC' needs to be atomic */
1414 
1415     if (win_ptr->shm_allocated == TRUE) {
1416         MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
1417     }
1418 
1419     /* Copy data from target window to temporary buffer / response packet header */
1420 
1421     if (fop_pkt->type == MPIDI_CH3_PKT_FOP_IMMED) {
1422         /* copy data to resp pkt header */
1423         void *src = fop_pkt->addr, *dest = (void *) &fop_resp_pkt->info.data;
1424         mpi_errno = immed_copy(src, dest, type_size);
1425         if (mpi_errno != MPI_SUCCESS) {
1426             if (win_ptr->shm_allocated == TRUE) {
1427                 MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
1428             }
1429             MPIR_ERR_POP(mpi_errno);
1430         }
1431     }
1432     else if (is_contig) {
1433         MPIR_Memcpy(resp_req->dev.user_buf, fop_pkt->addr, type_size);
1434     }
1435     else {
1436         MPI_Aint actual_pack_bytes;
1437         MPIR_Typerep_pack(fop_pkt->addr, 1, fop_pkt->datatype, 0, resp_req->dev.user_buf,
1438                        type_size, &actual_pack_bytes);
1439         MPIR_Assert(actual_pack_bytes == type_size);
1440     }
1441 
1442     /* Apply the op */
1443     if (fop_pkt->type == MPIDI_CH3_PKT_FOP_IMMED) {
1444         mpi_errno = do_accumulate_op((void *) &fop_pkt->info.data, 1, fop_pkt->datatype,
1445                                      fop_pkt->addr, 1, fop_pkt->datatype, 0, fop_pkt->op,
1446                                      MPIDI_RMA_ACC_SRCBUF_DEFAULT);
1447     }
1448     else {
1449         mpi_errno = do_accumulate_op(target_lock_entry->data, 1, fop_pkt->datatype,
1450                                      fop_pkt->addr, 1, fop_pkt->datatype, 0, fop_pkt->op,
1451                                      MPIDI_RMA_ACC_SRCBUF_DEFAULT);
1452     }
1453 
1454     if (win_ptr->shm_allocated == TRUE) {
1455         MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
1456     }
1457 
1458     MPIR_ERR_CHECK(mpi_errno);
1459 
1460     if (fop_pkt->type == MPIDI_CH3_PKT_FOP_IMMED) {
1461         /* send back the original data */
1462         MPID_THREAD_CS_ENTER(POBJ, target_lock_entry->vc->pobj_mutex);
1463         mpi_errno =
1464             MPIDI_CH3_iStartMsg(target_lock_entry->vc, fop_resp_pkt, sizeof(*fop_resp_pkt),
1465                                 &resp_req);
1466         MPID_THREAD_CS_EXIT(POBJ, target_lock_entry->vc->pobj_mutex);
1467         MPIR_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
1468 
1469         if (resp_req != NULL) {
1470             if (!MPIR_Request_is_complete(resp_req)) {
1471                 /* sending process is not completed, set proper OnDataAvail
1472                  * (it is initialized to NULL by lower layer) */
1473                 resp_req->dev.target_win_handle = fop_pkt->target_win_handle;
1474                 resp_req->dev.pkt_flags = fop_pkt->pkt_flags;
1475                 resp_req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_FOPSendComplete;
1476 
1477                 /* here we increment the Active Target counter to guarantee the GET-like
1478                  * operation are completed when counter reaches zero. */
1479                 win_ptr->at_completion_counter++;
1480 
1481                 MPIR_Request_free(resp_req);
1482                 goto fn_exit;
1483             }
1484             else {
1485                 MPIR_Request_free(resp_req);
1486             }
1487         }
1488     }
1489     else {
1490         iov[0].iov_base = (void *) fop_resp_pkt;
1491         iov[0].iov_len = sizeof(*fop_resp_pkt);
1492         iov[1].iov_base = (void *) ((char *) resp_req->dev.user_buf);
1493         iov[1].iov_len = type_size;
1494         iovcnt = 2;
1495 
1496         mpi_errno = MPIDI_CH3_iSendv(target_lock_entry->vc, resp_req, iov, iovcnt);
1497         if (mpi_errno != MPI_SUCCESS) {
1498             MPIR_Request_free(resp_req);
1499             MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
1500         }
1501         goto fn_exit;
1502     }
1503 
1504     /* do final action */
1505     mpi_errno = finish_op_on_target(win_ptr, target_lock_entry->vc, TRUE /* has response data */ ,
1506                                     fop_pkt->pkt_flags, MPI_WIN_NULL);
1507     MPIR_ERR_CHECK(mpi_errno);
1508 
1509   fn_exit:
1510     return mpi_errno;
1511   fn_fail:
1512     goto fn_exit;
1513 }
1514 
1515 
perform_cas_in_lock_queue(MPIR_Win * win_ptr,MPIDI_RMA_Target_lock_entry_t * target_lock_entry)1516 static inline int perform_cas_in_lock_queue(MPIR_Win * win_ptr,
1517                                             MPIDI_RMA_Target_lock_entry_t * target_lock_entry)
1518 {
1519     MPIDI_CH3_Pkt_t upkt;
1520     MPIDI_CH3_Pkt_cas_resp_t *cas_resp_pkt = &upkt.cas_resp;
1521     MPIDI_CH3_Pkt_cas_t *cas_pkt = &((target_lock_entry->pkt).cas);
1522     MPIR_Request *send_req = NULL;
1523     MPI_Aint len;
1524     int mpi_errno = MPI_SUCCESS;
1525 
1526     /* Piggyback candidate should have basic datatype for target datatype. */
1527     MPIR_Assert(MPIR_DATATYPE_IS_PREDEFINED(cas_pkt->datatype));
1528 
1529     /* Make sure that all data is received for this op. */
1530     MPIR_Assert(target_lock_entry->all_data_recved == 1);
1531 
1532     MPIDI_Pkt_init(cas_resp_pkt, MPIDI_CH3_PKT_CAS_RESP_IMMED);
1533     cas_resp_pkt->request_handle = cas_pkt->request_handle;
1534     cas_resp_pkt->target_rank = win_ptr->comm_ptr->rank;
1535     cas_resp_pkt->pkt_flags = MPIDI_CH3_PKT_FLAG_NONE;
1536     if (cas_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
1537         cas_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)
1538         cas_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
1539     if ((cas_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
1540         (cas_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))
1541         cas_resp_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_ACK;
1542 
1543     /* Copy old value into the response packet */
1544     MPIR_Datatype_get_size_macro(cas_pkt->datatype, len);
1545     MPIR_Assert(len <= sizeof(MPIDI_CH3_CAS_Immed_u));
1546 
1547     if (win_ptr->shm_allocated == TRUE) {
1548         MPIDI_CH3I_SHM_MUTEX_LOCK(win_ptr);
1549     }
1550 
1551     MPIR_Memcpy((void *) &cas_resp_pkt->info.data, cas_pkt->addr, len);
1552 
1553     /* Compare and replace if equal */
1554     if (MPIR_Compare_equal(&cas_pkt->compare_data, cas_pkt->addr, cas_pkt->datatype)) {
1555         MPIR_Memcpy(cas_pkt->addr, &cas_pkt->origin_data, len);
1556     }
1557 
1558     if (win_ptr->shm_allocated == TRUE) {
1559         MPIDI_CH3I_SHM_MUTEX_UNLOCK(win_ptr);
1560     }
1561 
1562     /* Send the response packet */
1563     MPID_THREAD_CS_ENTER(POBJ, target_lock_entry->vc->pobj_mutex);
1564     mpi_errno =
1565         MPIDI_CH3_iStartMsg(target_lock_entry->vc, cas_resp_pkt, sizeof(*cas_resp_pkt), &send_req);
1566     MPID_THREAD_CS_EXIT(POBJ, target_lock_entry->vc->pobj_mutex);
1567 
1568     MPIR_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
1569 
1570     if (send_req != NULL) {
1571         if (!MPIR_Request_is_complete(send_req)) {
1572             /* sending process is not completed, set proper OnDataAvail
1573              * (it is initialized to NULL by lower layer) */
1574             send_req->dev.target_win_handle = cas_pkt->target_win_handle;
1575             send_req->dev.pkt_flags = cas_pkt->pkt_flags;
1576             send_req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_CASSendComplete;
1577 
1578             /* here we increment the Active Target counter to guarantee the GET-like
1579              * operation are completed when counter reaches zero. */
1580             win_ptr->at_completion_counter++;
1581 
1582             MPIR_Request_free(send_req);
1583             goto fn_exit;
1584         }
1585         else
1586             MPIR_Request_free(send_req);
1587     }
1588 
1589     /* do final action */
1590     mpi_errno = finish_op_on_target(win_ptr, target_lock_entry->vc, TRUE /* has response data */ ,
1591                                     cas_pkt->pkt_flags, MPI_WIN_NULL);
1592     MPIR_ERR_CHECK(mpi_errno);
1593 
1594   fn_exit:
1595     return mpi_errno;
1596   fn_fail:
1597     goto fn_exit;
1598 }
1599 
1600 
perform_op_in_lock_queue(MPIR_Win * win_ptr,MPIDI_RMA_Target_lock_entry_t * target_lock_entry)1601 static inline int perform_op_in_lock_queue(MPIR_Win * win_ptr,
1602                                            MPIDI_RMA_Target_lock_entry_t * target_lock_entry)
1603 {
1604     int mpi_errno = MPI_SUCCESS;
1605 
1606     if (target_lock_entry->pkt.type == MPIDI_CH3_PKT_LOCK) {
1607 
1608         /* single LOCK request */
1609 
1610         MPIDI_CH3_Pkt_lock_t *lock_pkt = &(target_lock_entry->pkt.lock);
1611         MPIDI_VC_t *my_vc = NULL;
1612 
1613         MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &my_vc);
1614 
1615         if (target_lock_entry->vc == my_vc) {
1616             mpi_errno = handle_lock_ack(win_ptr, win_ptr->comm_ptr->rank,
1617                                         MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED);
1618             MPIR_ERR_CHECK(mpi_errno);
1619         }
1620         else {
1621             mpi_errno = MPIDI_CH3I_Send_lock_ack_pkt(target_lock_entry->vc, win_ptr,
1622                                                      MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED,
1623                                                      lock_pkt->source_win_handle,
1624                                                      lock_pkt->request_handle);
1625             MPIR_ERR_CHECK(mpi_errno);
1626         }
1627     }
1628     else {
1629         /* LOCK+OP packet */
1630         switch (target_lock_entry->pkt.type) {
1631         case (MPIDI_CH3_PKT_PUT):
1632         case (MPIDI_CH3_PKT_PUT_IMMED):
1633             mpi_errno = perform_put_in_lock_queue(win_ptr, target_lock_entry);
1634             MPIR_ERR_CHECK(mpi_errno);
1635             break;
1636         case (MPIDI_CH3_PKT_GET):
1637             mpi_errno = perform_get_in_lock_queue(win_ptr, target_lock_entry);
1638             MPIR_ERR_CHECK(mpi_errno);
1639             break;
1640         case (MPIDI_CH3_PKT_ACCUMULATE):
1641         case (MPIDI_CH3_PKT_ACCUMULATE_IMMED):
1642             mpi_errno = perform_acc_in_lock_queue(win_ptr, target_lock_entry);
1643             MPIR_ERR_CHECK(mpi_errno);
1644             break;
1645         case (MPIDI_CH3_PKT_GET_ACCUM):
1646         case (MPIDI_CH3_PKT_GET_ACCUM_IMMED):
1647             mpi_errno = perform_get_acc_in_lock_queue(win_ptr, target_lock_entry);
1648             MPIR_ERR_CHECK(mpi_errno);
1649             break;
1650         case (MPIDI_CH3_PKT_FOP):
1651         case (MPIDI_CH3_PKT_FOP_IMMED):
1652             mpi_errno = perform_fop_in_lock_queue(win_ptr, target_lock_entry);
1653             MPIR_ERR_CHECK(mpi_errno);
1654             break;
1655         case (MPIDI_CH3_PKT_CAS_IMMED):
1656             mpi_errno = perform_cas_in_lock_queue(win_ptr, target_lock_entry);
1657             MPIR_ERR_CHECK(mpi_errno);
1658             break;
1659         default:
1660             MPIR_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER,
1661                                  "**invalidpkt", "**invalidpkt %d", target_lock_entry->pkt.type);
1662         }
1663     }
1664 
1665   fn_exit:
1666     return mpi_errno;
1667   fn_fail:
1668     goto fn_exit;
1669 }
1670 
1671 
1672 static int entered_flag = 0;
1673 static int entered_count = 0;
1674 
1675 /* Release the current lock on the window and grant the next lock in the
1676    queue if any */
MPIDI_CH3I_Release_lock(MPIR_Win * win_ptr)1677 int MPIDI_CH3I_Release_lock(MPIR_Win * win_ptr)
1678 {
1679     MPIDI_RMA_Target_lock_entry_t *target_lock_entry, *target_lock_entry_next;
1680     int requested_lock, mpi_errno = MPI_SUCCESS, temp_entered_count;
1681     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3I_RELEASE_LOCK);
1682 
1683     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3I_RELEASE_LOCK);
1684 
1685     if (win_ptr->current_lock_type == MPI_LOCK_SHARED) {
1686         /* decr ref cnt */
1687         /* FIXME: MT: Must be done atomically */
1688         win_ptr->shared_lock_ref_cnt--;
1689     }
1690 
1691     /* If shared lock ref count is 0 (which is also true if the lock is an
1692      * exclusive lock), release the lock. */
1693     if (win_ptr->shared_lock_ref_cnt == 0) {
1694 
1695         /* This function needs to be reentrant even in the single-threaded case
1696          * because when going through the lock queue, pkt_handler() in
1697          * perform_op_in_lock_queue() may again call release_lock(). To handle
1698          * this possibility, we use an entered_flag.
1699          * If the flag is not 0, we simply increment the entered_count and return.
1700          * The loop through the lock queue is repeated if the entered_count has
1701          * changed while we are in the loop.
1702          */
1703         if (entered_flag != 0) {
1704             entered_count++;    /* Count how many times we re-enter */
1705             goto fn_exit;
1706         }
1707 
1708         entered_flag = 1;       /* Mark that we are now entering release_lock() */
1709         temp_entered_count = entered_count;
1710 
1711         do {
1712             if (temp_entered_count != entered_count)
1713                 temp_entered_count++;
1714 
1715             /* FIXME: MT: The setting of the lock type must be done atomically */
1716             win_ptr->current_lock_type = MPID_LOCK_NONE;
1717 
1718             /* If there is a lock queue, try to satisfy as many lock requests as
1719              * possible. If the first one is a shared lock, grant it and grant all
1720              * other shared locks. If the first one is an exclusive lock, grant
1721              * only that one. */
1722 
1723             /* FIXME: MT: All queue accesses need to be made atomic */
1724             target_lock_entry = (MPIDI_RMA_Target_lock_entry_t *) win_ptr->target_lock_queue_head;
1725             while (target_lock_entry) {
1726                 target_lock_entry_next = target_lock_entry->next;
1727 
1728                 if (target_lock_entry->all_data_recved) {
1729                     int pkt_flags;
1730                     MPIDI_CH3_PKT_RMA_GET_FLAGS(target_lock_entry->pkt, pkt_flags, mpi_errno);
1731                     if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED)
1732                         requested_lock = MPI_LOCK_SHARED;
1733                     else {
1734                         MPIR_Assert(pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE);
1735                         requested_lock = MPI_LOCK_EXCLUSIVE;
1736                     }
1737                     if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, requested_lock) == 1) {
1738                         /* dequeue entry from lock queue */
1739                         DL_DELETE(win_ptr->target_lock_queue_head, target_lock_entry);
1740 
1741                         /* perform this OP */
1742                         mpi_errno = perform_op_in_lock_queue(win_ptr, target_lock_entry);
1743                         MPIR_ERR_CHECK(mpi_errno);
1744 
1745                         /* free this entry */
1746                         mpi_errno =
1747                             MPIDI_CH3I_Win_target_lock_entry_free(win_ptr, target_lock_entry);
1748                         MPIR_ERR_CHECK(mpi_errno);
1749 
1750                         /* if the granted lock is exclusive,
1751                          * no need to continue */
1752                         if (requested_lock == MPI_LOCK_EXCLUSIVE)
1753                             break;
1754                     }
1755                 }
1756                 target_lock_entry = target_lock_entry_next;
1757             }
1758         } while (temp_entered_count != entered_count);
1759 
1760         entered_count = entered_flag = 0;
1761     }
1762 
1763   fn_exit:
1764     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3I_RELEASE_LOCK);
1765     return mpi_errno;
1766   fn_fail:
1767     goto fn_exit;
1768 }
1769 
1770 
1771 
MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete(MPIDI_VC_t * vc,MPIR_Request * rreq,int * complete)1772 int MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete(MPIDI_VC_t * vc,
1773                                                      MPIR_Request * rreq, int *complete)
1774 {
1775     int requested_lock;
1776     MPI_Win target_win_handle;
1777     MPIR_Win *win_ptr = NULL;
1778     int pkt_flags;
1779     MPIDI_RMA_Target_lock_entry_t *target_lock_queue_entry = rreq->dev.target_lock_queue_entry;
1780     int mpi_errno = MPI_SUCCESS;
1781     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3_REQHANDLER_PIGGYBACKLOCKOPRECVCOMPLETE);
1782 
1783     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3_REQHANDLER_PIGGYBACKLOCKOPRECVCOMPLETE);
1784 
1785     /* This handler is triggered when we received all data of a lock queue
1786      * entry */
1787 
1788     /* Note that if we decided to drop op data, here we just need to complete this
1789      * request; otherwise we try to get the lock again in this handler. */
1790     if (rreq->dev.target_lock_queue_entry != NULL) {
1791 
1792         /* Mark all data received in lock queue entry */
1793         target_lock_queue_entry->all_data_recved = 1;
1794 
1795         /* try to acquire the lock here */
1796         MPIDI_CH3_PKT_RMA_GET_FLAGS(target_lock_queue_entry->pkt, pkt_flags, mpi_errno);
1797         MPIDI_CH3_PKT_RMA_GET_TARGET_WIN_HANDLE(target_lock_queue_entry->pkt, target_win_handle,
1798                                                 mpi_errno);
1799         MPIR_Win_get_ptr(target_win_handle, win_ptr);
1800 
1801         if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM &&
1802             (rreq->dev.target_lock_queue_entry)->data != NULL) {
1803 
1804             MPIR_Assert(target_lock_queue_entry->pkt.type == MPIDI_CH3_PKT_ACCUMULATE ||
1805                         target_lock_queue_entry->pkt.type == MPIDI_CH3_PKT_GET_ACCUM);
1806 
1807             int ext_hdr_sz = sizeof(MPIDI_CH3_Ext_pkt_stream_t);
1808 
1809             /* here we drop the stream_offset received, because the stream unit that piggybacked with
1810              * LOCK must be the first stream unit, with stream_offset equals to 0. */
1811             rreq->dev.recv_data_sz -= ext_hdr_sz;
1812             memmove((rreq->dev.target_lock_queue_entry)->data,
1813                     (void *) ((char *) ((rreq->dev.target_lock_queue_entry)->data) + ext_hdr_sz),
1814                     rreq->dev.recv_data_sz);
1815         }
1816 
1817         if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED) {
1818             requested_lock = MPI_LOCK_SHARED;
1819         }
1820         else {
1821             MPIR_Assert(pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE);
1822             requested_lock = MPI_LOCK_EXCLUSIVE;
1823         }
1824 
1825         if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, requested_lock) == 1) {
1826             /* dequeue entry from lock queue */
1827             DL_DELETE(win_ptr->target_lock_queue_head, target_lock_queue_entry);
1828 
1829             /* perform this OP */
1830             mpi_errno = perform_op_in_lock_queue(win_ptr, target_lock_queue_entry);
1831             MPIR_ERR_CHECK(mpi_errno);
1832 
1833             /* free this entry */
1834             mpi_errno = MPIDI_CH3I_Win_target_lock_entry_free(win_ptr, target_lock_queue_entry);
1835             MPIR_ERR_CHECK(mpi_errno);
1836         }
1837         /* If try acquiring lock failed, just leave the lock queue entry in the queue with
1838          * all_data_recved marked as 1, release_lock() function will traverse the queue
1839          * and find entry with all_data_recved being 1 to grant the lock. */
1840     }
1841 
1842     /* mark receive data transfer as complete and decrement CC in receive
1843      * request */
1844     mpi_errno = MPID_Request_complete(rreq);
1845     MPIR_ERR_CHECK(mpi_errno);
1846 
1847     *complete = TRUE;
1848 
1849   fn_exit:
1850     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3_REQHANDLER_PIGGYBACKLOCKOPRECVCOMPLETE);
1851     return mpi_errno;
1852   fn_fail:
1853     goto fn_exit;
1854 }
1855