1 /*
2  * Copyright (C) by Argonne National Laboratory
3  *     See COPYRIGHT in top-level directory
4  */
5 
6 #ifndef MPIDRMA_H_INCLUDED
7 #define MPIDRMA_H_INCLUDED
8 
9 #include "mpid_rma_types.h"
10 #include "mpid_rma_oplist.h"
11 #include "mpid_rma_shm.h"
12 #include "mpid_rma_issue.h"
13 #include "mpid_rma_lockqueue.h"
14 
send_lock_msg(int dest,int lock_type,MPIR_Win * win_ptr)15 static inline int send_lock_msg(int dest, int lock_type, MPIR_Win * win_ptr)
16 {
17     int mpi_errno = MPI_SUCCESS;
18     MPIDI_CH3_Pkt_t upkt;
19     MPIDI_CH3_Pkt_lock_t *lock_pkt = &upkt.lock;
20     MPIR_Request *req = NULL;
21     MPIDI_VC_t *vc;
22     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_SEND_LOCK_MSG);
23     MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_SEND_LOCK_MSG);
24 
25     MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dest, &vc);
26 
27     MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
28     lock_pkt->target_win_handle = win_ptr->basic_info_table[dest].win_handle;
29     lock_pkt->source_win_handle = win_ptr->handle;
30     lock_pkt->request_handle = MPI_REQUEST_NULL;
31     lock_pkt->pkt_flags = MPIDI_CH3_PKT_FLAG_NONE;
32     if (lock_type == MPI_LOCK_SHARED)
33         lock_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED;
34     else {
35         MPIR_Assert(lock_type == MPI_LOCK_EXCLUSIVE);
36         lock_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE;
37     }
38 
39     MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
40     mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_pkt, sizeof(*lock_pkt), &req);
41     MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
42     MPIR_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg");
43 
44     /* release the request returned by iStartMsg */
45     if (req != NULL) {
46         MPIR_Request_free(req);
47     }
48 
49   fn_exit:
50     MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_SEND_LOCK_MSG);
51     return mpi_errno;
52     /* --BEGIN ERROR HANDLING-- */
53   fn_fail:
54     goto fn_exit;
55     /* --END ERROR HANDLING-- */
56 }
57 
send_unlock_msg(int dest,MPIR_Win * win_ptr,int pkt_flags)58 static inline int send_unlock_msg(int dest, MPIR_Win * win_ptr, int pkt_flags)
59 {
60     int mpi_errno = MPI_SUCCESS;
61     MPIDI_CH3_Pkt_t upkt;
62     MPIDI_CH3_Pkt_unlock_t *unlock_pkt = &upkt.unlock;
63     MPIR_Request *req = NULL;
64     MPIDI_VC_t *vc;
65     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_SEND_UNLOCK_MSG);
66     MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_SEND_UNLOCK_MSG);
67 
68     MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dest, &vc);
69 
70     /* Send a lock packet over to the target. wait for the lock_granted
71      * reply. Then do all the RMA ops. */
72 
73     MPIDI_Pkt_init(unlock_pkt, MPIDI_CH3_PKT_UNLOCK);
74     unlock_pkt->target_win_handle = win_ptr->basic_info_table[dest].win_handle;
75     unlock_pkt->source_win_handle = win_ptr->handle;
76     unlock_pkt->pkt_flags = pkt_flags;
77 
78     MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
79     mpi_errno = MPIDI_CH3_iStartMsg(vc, unlock_pkt, sizeof(*unlock_pkt), &req);
80     MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
81     MPIR_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg");
82 
83     /* Release the request returned by iStartMsg */
84     if (req != NULL) {
85         MPIR_Request_free(req);
86     }
87 
88   fn_exit:
89     MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_SEND_UNLOCK_MSG);
90     return mpi_errno;
91     /* --BEGIN ERROR HANDLING-- */
92   fn_fail:
93     goto fn_exit;
94     /* --END ERROR HANDLING-- */
95 }
96 
97 
MPIDI_CH3I_Send_lock_ack_pkt(MPIDI_VC_t * vc,MPIR_Win * win_ptr,int pkt_flags,MPI_Win source_win_handle,MPI_Request request_handle)98 static inline int MPIDI_CH3I_Send_lock_ack_pkt(MPIDI_VC_t * vc, MPIR_Win * win_ptr,
99                                                int pkt_flags,
100                                                MPI_Win source_win_handle,
101                                                MPI_Request request_handle)
102 {
103     MPIDI_CH3_Pkt_t upkt;
104     MPIDI_CH3_Pkt_lock_ack_t *lock_ack_pkt = &upkt.lock_ack;
105     MPIR_Request *req = NULL;
106     int mpi_errno;
107     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_LOCK_ACK_PKT);
108 
109     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3I_SEND_LOCK_ACK_PKT);
110 
111     MPIR_Assert(!(source_win_handle != MPI_WIN_NULL && request_handle != MPI_REQUEST_NULL));
112 
113     /* send lock ack packet */
114     MPIDI_Pkt_init(lock_ack_pkt, MPIDI_CH3_PKT_LOCK_ACK);
115     lock_ack_pkt->source_win_handle = source_win_handle;
116     lock_ack_pkt->request_handle = request_handle;
117     lock_ack_pkt->target_rank = win_ptr->comm_ptr->rank;
118     lock_ack_pkt->pkt_flags = pkt_flags;
119 
120     MPL_DBG_MSG_FMT(MPIDI_CH3_DBG_OTHER, VERBOSE,
121                      (MPL_DBG_FDEST, "sending lock ack pkt on vc=%p, source_win_handle=%#08x",
122                       vc, lock_ack_pkt->source_win_handle));
123 
124     MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
125     mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_ack_pkt, sizeof(*lock_ack_pkt), &req);
126     MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
127     if (mpi_errno) {
128         MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
129     }
130 
131     if (req != NULL) {
132         MPIR_Request_free(req);
133     }
134 
135   fn_fail:
136     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3I_SEND_LOCK_ACK_PKT);
137 
138     return mpi_errno;
139 }
140 
MPIDI_CH3I_Send_lock_op_ack_pkt(MPIDI_VC_t * vc,MPIR_Win * win_ptr,int pkt_flags,MPI_Win source_win_handle,MPI_Request request_handle)141 static inline int MPIDI_CH3I_Send_lock_op_ack_pkt(MPIDI_VC_t * vc, MPIR_Win * win_ptr,
142                                                   int pkt_flags,
143                                                   MPI_Win source_win_handle,
144                                                   MPI_Request request_handle)
145 {
146     MPIDI_CH3_Pkt_t upkt;
147     MPIDI_CH3_Pkt_lock_op_ack_t *lock_op_ack_pkt = &upkt.lock_op_ack;
148     MPIR_Request *req = NULL;
149     int mpi_errno;
150     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT);
151 
152     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT);
153 
154     MPIR_Assert(!(source_win_handle != MPI_WIN_NULL && request_handle != MPI_REQUEST_NULL));
155 
156     /* send lock ack packet */
157     MPIDI_Pkt_init(lock_op_ack_pkt, MPIDI_CH3_PKT_LOCK_OP_ACK);
158     lock_op_ack_pkt->source_win_handle = source_win_handle;
159     lock_op_ack_pkt->request_handle = request_handle;
160     lock_op_ack_pkt->target_rank = win_ptr->comm_ptr->rank;
161     lock_op_ack_pkt->pkt_flags = pkt_flags;
162 
163     MPL_DBG_MSG_FMT(MPIDI_CH3_DBG_OTHER, VERBOSE,
164                      (MPL_DBG_FDEST, "sending lock op ack pkt on vc=%p, source_win_handle=%#08x",
165                       vc, lock_op_ack_pkt->source_win_handle));
166 
167     MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
168     mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_op_ack_pkt, sizeof(*lock_op_ack_pkt), &req);
169     MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
170     if (mpi_errno) {
171         MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
172     }
173 
174     if (req != NULL) {
175         MPIR_Request_free(req);
176     }
177 
178   fn_fail:
179     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT);
180     return mpi_errno;
181 }
182 
183 
MPIDI_CH3I_Send_ack_pkt(MPIDI_VC_t * vc,MPIR_Win * win_ptr,MPI_Win source_win_handle)184 static inline int MPIDI_CH3I_Send_ack_pkt(MPIDI_VC_t * vc, MPIR_Win * win_ptr,
185                                           MPI_Win source_win_handle)
186 {
187     MPIDI_CH3_Pkt_t upkt;
188     MPIDI_CH3_Pkt_ack_t *ack_pkt = &upkt.ack;
189     MPIR_Request *req;
190     int mpi_errno = MPI_SUCCESS;
191     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_ACK_PKT);
192 
193     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3I_SEND_ACK_PKT);
194 
195     MPIDI_Pkt_init(ack_pkt, MPIDI_CH3_PKT_ACK);
196     ack_pkt->source_win_handle = source_win_handle;
197     ack_pkt->target_rank = win_ptr->comm_ptr->rank;
198 
199     /* Because this is in a packet handler, it is already within a critical section */
200     /* MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex); */
201     mpi_errno = MPIDI_CH3_iStartMsg(vc, ack_pkt, sizeof(*ack_pkt), &req);
202     /* MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex); */
203     if (mpi_errno != MPI_SUCCESS) {
204         MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
205     }
206 
207     if (req != NULL) {
208         MPIR_Request_free(req);
209     }
210 
211   fn_fail:
212     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3I_SEND_ACK_PKT);
213     return mpi_errno;
214 }
215 
216 
send_decr_at_cnt_msg(int dst,MPIR_Win * win_ptr,int pkt_flags)217 static inline int send_decr_at_cnt_msg(int dst, MPIR_Win * win_ptr, int pkt_flags)
218 {
219     MPIDI_CH3_Pkt_t upkt;
220     MPIDI_CH3_Pkt_decr_at_counter_t *decr_at_cnt_pkt = &upkt.decr_at_cnt;
221     MPIDI_VC_t *vc;
222     MPIR_Request *request = NULL;
223     int mpi_errno = MPI_SUCCESS;
224     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_SEND_DECR_AT_CNT_MSG);
225     MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_SEND_DECR_AT_CNT_MSG);
226 
227     MPIDI_Pkt_init(decr_at_cnt_pkt, MPIDI_CH3_PKT_DECR_AT_COUNTER);
228     decr_at_cnt_pkt->target_win_handle = win_ptr->basic_info_table[dst].win_handle;
229     decr_at_cnt_pkt->source_win_handle = win_ptr->handle;
230     decr_at_cnt_pkt->pkt_flags = pkt_flags;
231 
232     MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dst, &vc);
233 
234     MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
235     mpi_errno = MPIDI_CH3_iStartMsg(vc, decr_at_cnt_pkt, sizeof(*decr_at_cnt_pkt), &request);
236     MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
237     if (mpi_errno != MPI_SUCCESS) {
238         MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
239     }
240 
241     if (request != NULL) {
242         MPIR_Request_free(request);
243     }
244 
245   fn_exit:
246     MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_SEND_DECR_AT_CNT_MSG);
247     return mpi_errno;
248     /* --BEGIN ERROR HANDLING-- */
249   fn_fail:
250     goto fn_exit;
251     /* --END ERROR HANDLING-- */
252 }
253 
254 
send_flush_msg(int dest,MPIR_Win * win_ptr)255 static inline int send_flush_msg(int dest, MPIR_Win * win_ptr)
256 {
257     int mpi_errno = MPI_SUCCESS;
258     MPIDI_CH3_Pkt_t upkt;
259     MPIDI_CH3_Pkt_flush_t *flush_pkt = &upkt.flush;
260     MPIR_Request *req = NULL;
261     MPIDI_VC_t *vc;
262     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_SEND_FLUSH_MSG);
263     MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_SEND_FLUSH_MSG);
264 
265     MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dest, &vc);
266 
267     MPIDI_Pkt_init(flush_pkt, MPIDI_CH3_PKT_FLUSH);
268     flush_pkt->target_win_handle = win_ptr->basic_info_table[dest].win_handle;
269     flush_pkt->source_win_handle = win_ptr->handle;
270 
271     MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
272     mpi_errno = MPIDI_CH3_iStartMsg(vc, flush_pkt, sizeof(*flush_pkt), &req);
273     MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
274     MPIR_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg");
275 
276     /* Release the request returned by iStartMsg */
277     if (req != NULL) {
278         MPIR_Request_free(req);
279     }
280 
281   fn_exit:
282     MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_SEND_FLUSH_MSG);
283     return mpi_errno;
284     /* --BEGIN ERROR HANDLING-- */
285   fn_fail:
286     goto fn_exit;
287     /* --END ERROR HANDLING-- */
288 }
289 
290 
291 /* enqueue an unsatisfied origin in passive target at target side. */
enqueue_lock_origin(MPIR_Win * win_ptr,MPIDI_VC_t * vc,MPIDI_CH3_Pkt_t * pkt,void * data,intptr_t * buflen,MPIR_Request ** reqp)292 static inline int enqueue_lock_origin(MPIR_Win * win_ptr, MPIDI_VC_t * vc,
293                                       MPIDI_CH3_Pkt_t * pkt, void * data,
294                                       intptr_t * buflen, MPIR_Request ** reqp)
295 {
296     MPIDI_RMA_Target_lock_entry_t *new_ptr = NULL;
297     int flag;
298     MPI_Win source_win_handle;
299     MPI_Request request_handle;
300     int lock_discarded = 0, data_discarded = 0;
301     int mpi_errno = MPI_SUCCESS;
302 
303     (*reqp) = NULL;
304 
305     new_ptr = MPIDI_CH3I_Win_target_lock_entry_alloc(win_ptr, pkt);
306     if (new_ptr != NULL) {
307         MPIDI_RMA_Target_lock_entry_t **head_ptr =
308             (MPIDI_RMA_Target_lock_entry_t **) (&(win_ptr->target_lock_queue_head));
309         DL_APPEND((*head_ptr), new_ptr);
310         new_ptr->vc = vc;
311     }
312     else {
313         lock_discarded = 1;
314     }
315 
316     if (MPIDI_CH3I_RMA_PKT_IS_IMMED_OP(*pkt) || pkt->type == MPIDI_CH3_PKT_LOCK ||
317         pkt->type == MPIDI_CH3_PKT_GET) {
318         /* return bytes of data processed in this pkt handler */
319         (*buflen) = 0;
320 
321         if (new_ptr != NULL)
322             new_ptr->all_data_recved = 1;
323 
324         goto issue_ack;
325     }
326     else {
327         MPI_Aint type_size = 0;
328         MPI_Aint type_extent;
329         intptr_t recv_data_sz = 0;
330         intptr_t buf_size = 0;
331         MPIR_Request *req = NULL;
332         MPI_Datatype target_dtp;
333         int target_count;
334         int complete = 0;
335         intptr_t data_len;
336         int pkt_flags;
337 
338         /* This is PUT, ACC, GACC, FOP */
339 
340         MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE((*pkt), target_dtp, mpi_errno);
341         MPIDI_CH3_PKT_RMA_GET_TARGET_COUNT((*pkt), target_count, mpi_errno);
342         MPIDI_CH3_PKT_RMA_GET_FLAGS((*pkt), pkt_flags, mpi_errno);
343 
344         MPIR_Datatype_get_extent_macro(target_dtp, type_extent);
345         MPIR_Datatype_get_size_macro(target_dtp, type_size);
346 
347         if (pkt->type == MPIDI_CH3_PKT_PUT) {
348             recv_data_sz = type_size * target_count;
349             buf_size = type_extent * target_count;
350         }
351         else {
352             MPI_Aint stream_elem_count;
353             MPI_Aint total_len;
354             MPI_Op op;
355 
356             MPIDI_CH3_PKT_RMA_GET_OP((*pkt), op, mpi_errno);
357             if (op != MPI_NO_OP) {
358                 stream_elem_count = MPIDI_CH3U_SRBuf_size / type_extent;
359                 total_len = type_size * target_count;
360                 recv_data_sz = MPL_MIN(total_len, type_size * stream_elem_count);
361                 buf_size = type_extent * (recv_data_sz / type_size);
362             }
363         }
364 
365         if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
366             MPIR_Assert(pkt->type == MPIDI_CH3_PKT_ACCUMULATE ||
367                         pkt->type == MPIDI_CH3_PKT_GET_ACCUM);
368 
369             recv_data_sz += sizeof(MPIDI_CH3_Ext_pkt_stream_t);
370             buf_size += sizeof(MPIDI_CH3_Ext_pkt_stream_t);
371         }
372 
373         if (new_ptr != NULL) {
374             if (win_ptr->current_target_lock_data_bytes + buf_size <
375                 MPIR_CVAR_CH3_RMA_TARGET_LOCK_DATA_BYTES) {
376                 new_ptr->data = MPL_malloc(buf_size, MPL_MEM_BUFFER);
377             }
378 
379             if (new_ptr->data == NULL) {
380                 /* Note that there are two possible reasons to make new_ptr->data to be NULL:
381                  * (1) win_ptr->current_target_lock_data_bytes + buf_size >= MPIR_CVAR_CH3_RMA_TARGET_LOCK_DATA_BYTES;
382                  * (2) MPL_malloc(buf_size) failed.
383                  * In such cases, we cannot allocate memory for lock data, so we give up
384                  * buffering lock data, however, we still buffer lock request.
385                  */
386                 MPIDI_CH3_Pkt_t new_pkt;
387                 MPIDI_CH3_Pkt_lock_t *lock_pkt = &new_pkt.lock;
388                 MPI_Win target_win_handle;
389 
390                 MPIDI_CH3_PKT_RMA_GET_TARGET_WIN_HANDLE((*pkt), target_win_handle, mpi_errno);
391 
392                 if (!MPIDI_CH3I_RMA_PKT_IS_READ_OP(*pkt)) {
393                     MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno);
394                     request_handle = MPI_REQUEST_NULL;
395                 }
396                 else {
397                     source_win_handle = MPI_WIN_NULL;
398                     MPIDI_CH3_PKT_RMA_GET_REQUEST_HANDLE((*pkt), request_handle, mpi_errno);
399                 }
400 
401                 MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
402                 lock_pkt->target_win_handle = target_win_handle;
403                 lock_pkt->source_win_handle = source_win_handle;
404                 lock_pkt->request_handle = request_handle;
405                 lock_pkt->pkt_flags = pkt_flags;
406 
407                 /* replace original pkt with lock pkt */
408                 new_ptr->pkt = new_pkt;
409                 new_ptr->all_data_recved = 1;
410 
411                 data_discarded = 1;
412             }
413             else {
414                 win_ptr->current_target_lock_data_bytes += buf_size;
415                 new_ptr->buf_size = buf_size;
416             }
417         }
418 
419         /* create request to receive upcoming requests */
420         req = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
421         MPIR_Object_set_ref(req, 1);
422 
423         /* fill in area in req that will be used in Receive_data_found() */
424         if (lock_discarded || data_discarded) {
425             req->dev.drop_data = TRUE;
426             req->dev.user_buf = NULL;
427             req->dev.user_count = target_count;
428             req->dev.datatype = target_dtp;
429             req->dev.recv_data_sz = recv_data_sz;
430             req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
431             req->dev.OnFinal = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
432             req->dev.target_lock_queue_entry = new_ptr;
433 
434             data_len = *buflen;
435             MPIR_Assert(req->dev.recv_data_sz >= 0);
436         }
437         else {
438             req->dev.user_buf = new_ptr->data;
439             req->dev.user_count = target_count;
440             req->dev.datatype = target_dtp;
441             req->dev.recv_data_sz = recv_data_sz;
442             req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
443             req->dev.OnFinal = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
444             req->dev.target_lock_queue_entry = new_ptr;
445 
446             data_len = *buflen;
447             MPIR_Assert(req->dev.recv_data_sz >= 0);
448         }
449 
450         mpi_errno = MPIDI_CH3U_Receive_data_found(req, data, &data_len, &complete);
451         MPIR_ERR_CHECK(mpi_errno);
452 
453         /* return bytes of data processed in this pkt handler */
454         (*buflen) = data_len;
455 
456         if (complete) {
457             mpi_errno = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete(vc, req, &complete);
458             MPIR_ERR_CHECK(mpi_errno);
459             if (complete) {
460                 goto issue_ack;
461             }
462         }
463 
464         (*reqp) = req;
465     }
466 
467   issue_ack:
468     if (pkt->type == MPIDI_CH3_PKT_LOCK) {
469         if (lock_discarded)
470             flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED;
471         else
472             flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED;
473 
474         MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno);
475         MPIDI_CH3_PKT_RMA_GET_REQUEST_HANDLE((*pkt), request_handle, mpi_errno);
476 
477         mpi_errno =
478             MPIDI_CH3I_Send_lock_ack_pkt(vc, win_ptr, flag, source_win_handle, request_handle);
479         MPIR_ERR_CHECK(mpi_errno);
480     }
481     else {
482         if (lock_discarded)
483             flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED;
484         else if (data_discarded)
485             flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_DISCARDED;
486         else
487             flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED;
488 
489         if (!MPIDI_CH3I_RMA_PKT_IS_READ_OP(*pkt)) {
490             MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno);
491             request_handle = MPI_REQUEST_NULL;
492         }
493         else {
494             source_win_handle = MPI_WIN_NULL;
495             MPIDI_CH3_PKT_RMA_GET_REQUEST_HANDLE((*pkt), request_handle, mpi_errno);
496         }
497 
498         mpi_errno =
499             MPIDI_CH3I_Send_lock_op_ack_pkt(vc, win_ptr, flag, source_win_handle, request_handle);
500         MPIR_ERR_CHECK(mpi_errno);
501     }
502 
503   fn_exit:
504     return mpi_errno;
505   fn_fail:
506     goto fn_exit;
507 }
508 
509 
handle_lock_ack(MPIR_Win * win_ptr,int target_rank,int pkt_flags)510 static inline int handle_lock_ack(MPIR_Win * win_ptr, int target_rank, int pkt_flags)
511 {
512     MPIDI_RMA_Target_t *t = NULL;
513     int mpi_errno = MPI_SUCCESS;
514 
515     MPIR_Assert(win_ptr->states.access_state == MPIDI_RMA_PER_TARGET ||
516                 win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED ||
517                 win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_ISSUED);
518 
519     if (win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED) {
520         MPIDI_VC_t *orig_vc = NULL, *target_vc = NULL;
521         MPIDI_Comm_get_vc(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &orig_vc);
522         MPIDI_Comm_get_vc(win_ptr->comm_ptr, target_rank, &target_vc);
523         if (win_ptr->comm_ptr->rank == target_rank ||
524             (win_ptr->shm_allocated == TRUE && orig_vc->node_id == target_vc->node_id)) {
525             if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
526                 win_ptr->outstanding_locks--;
527                 MPIR_Assert(win_ptr->outstanding_locks >= 0);
528             }
529             else if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) {
530                 /* re-send lock request message. */
531                 mpi_errno = send_lock_msg(target_rank, MPI_LOCK_SHARED, win_ptr);
532                 MPIR_ERR_CHECK(mpi_errno);
533             }
534             goto fn_exit;
535         }
536     }
537     else if (win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_ISSUED) {
538         if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
539             win_ptr->outstanding_locks--;
540             MPIR_Assert(win_ptr->outstanding_locks >= 0);
541             if (win_ptr->outstanding_locks == 0) {
542                 win_ptr->states.access_state = MPIDI_RMA_LOCK_ALL_GRANTED;
543 
544                 if (win_ptr->num_targets_with_pending_net_ops) {
545                     mpi_errno = MPIDI_CH3I_Win_set_active(win_ptr);
546                     MPIR_ERR_CHECK(mpi_errno);
547                 }
548             }
549         }
550         else if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) {
551             /* re-send lock request message. */
552             mpi_errno = send_lock_msg(target_rank, MPI_LOCK_SHARED, win_ptr);
553             MPIR_ERR_CHECK(mpi_errno);
554         }
555         goto fn_exit;
556     }
557 
558     mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &t);
559     MPIR_ERR_CHECK(mpi_errno);
560     MPIR_Assert(t != NULL);
561 
562     if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
563         t->access_state = MPIDI_RMA_LOCK_GRANTED;
564         if (t->pending_net_ops_list_head)
565             MPIDI_CH3I_Win_set_active(win_ptr);
566     }
567 
568     if (win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_GRANTED ||
569         t->access_state == MPIDI_RMA_LOCK_GRANTED) {
570         if (t->pending_net_ops_list_head == NULL) {
571             int made_progress ATTRIBUTE((unused)) = 0;
572             mpi_errno =
573                 MPIDI_CH3I_RMA_Make_progress_target(win_ptr, t->target_rank, &made_progress);
574             MPIR_ERR_CHECK(mpi_errno);
575         }
576     }
577 
578     if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED)
579         t->access_state = MPIDI_RMA_LOCK_CALLED;
580 
581   fn_exit:
582     return mpi_errno;
583   fn_fail:
584     goto fn_exit;
585 }
586 
587 
check_and_set_req_completion(MPIR_Win * win_ptr,MPIDI_RMA_Target_t * target,MPIDI_RMA_Op_t * rma_op,int * op_completed)588 static inline int check_and_set_req_completion(MPIR_Win * win_ptr, MPIDI_RMA_Target_t * target,
589                                                MPIDI_RMA_Op_t * rma_op, int *op_completed)
590 {
591     int i, mpi_errno = MPI_SUCCESS;
592     int incomplete_req_cnt = 0;
593     MPIR_Request **req = NULL;
594     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_CHECK_AND_SET_REQ_COMPLETION);
595 
596     MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_CHECK_AND_SET_REQ_COMPLETION);
597 
598     (*op_completed) = FALSE;
599 
600     for (i = 0; i < rma_op->reqs_size; i++) {
601         if (rma_op->reqs_size == 1)
602             req = &(rma_op->single_req);
603         else
604             req = &(rma_op->multi_reqs[i]);
605 
606         if ((*req) == NULL)
607             continue;
608 
609         if (MPIR_Request_is_complete((*req))) {
610             MPIR_Request_free((*req));
611             (*req) = NULL;
612         }
613         else {
614             (*req)->dev.request_completed_cb = MPIDI_CH3_Req_handler_rma_op_complete;
615             (*req)->dev.source_win_handle = win_ptr->handle;
616             (*req)->dev.rma_target_ptr = target;
617 
618             incomplete_req_cnt++;
619 
620             if (rma_op->ureq != NULL) {
621                 MPIR_cc_set(&(rma_op->ureq->cc), incomplete_req_cnt);
622                 (*req)->dev.request_handle = rma_op->ureq->handle;
623             }
624 
625             MPIR_Request_free((*req));
626         }
627     }
628 
629     if (incomplete_req_cnt == 0) {
630         if (rma_op->ureq != NULL) {
631             mpi_errno = MPID_Request_complete(rma_op->ureq);
632             MPIR_ERR_CHECK(mpi_errno);
633         }
634         (*op_completed) = TRUE;
635     }
636     else {
637         MPIDI_CH3I_RMA_Active_req_cnt += incomplete_req_cnt;
638         target->num_pkts_wait_for_local_completion += incomplete_req_cnt;
639     }
640 
641     MPIDI_CH3I_RMA_Ops_free_elem(win_ptr, &(target->pending_net_ops_list_head), rma_op);
642 
643     if (target->pending_net_ops_list_head == NULL) {
644         win_ptr->num_targets_with_pending_net_ops--;
645         MPIR_Assert(win_ptr->num_targets_with_pending_net_ops >= 0);
646         if (win_ptr->num_targets_with_pending_net_ops == 0) {
647             MPIDI_CH3I_Win_set_inactive(win_ptr);
648         }
649     }
650 
651   fn_exit:
652     MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_CHECK_AND_SET_REQ_COMPLETION);
653     return mpi_errno;
654   fn_fail:
655     goto fn_exit;
656 }
657 
658 
handle_lock_ack_with_op(MPIR_Win * win_ptr,int target_rank,int pkt_flags)659 static inline int handle_lock_ack_with_op(MPIR_Win * win_ptr,
660                                           int target_rank, int pkt_flags)
661 {
662     MPIDI_RMA_Target_t *target = NULL;
663     MPIDI_RMA_Op_t *op = NULL;
664     int op_flags = MPIDI_CH3_PKT_FLAG_NONE;
665     int op_completed ATTRIBUTE((unused)) = FALSE;
666     int mpi_errno = MPI_SUCCESS;
667 
668     mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &target);
669     MPIR_ERR_CHECK(mpi_errno);
670     MPIR_Assert(target != NULL);
671 
672     /* Here the next_op_to_issue pointer should still point to the OP piggybacked
673      * with LOCK */
674     op = target->next_op_to_issue;
675     MPIR_Assert(op != NULL);
676 
677     MPIDI_CH3_PKT_RMA_GET_FLAGS(op->pkt, op_flags, mpi_errno);
678     MPIR_Assert(op_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
679                 op_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE);
680 
681     if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
682 
683         if ((op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE || op->pkt.type == MPIDI_CH3_PKT_GET_ACCUM)
684             && op->issued_stream_count != ALL_STREAM_UNITS_ISSUED) {
685             /* Now we successfully issue out the first stream unit,
686              * keep next_op_to_issue still stick to the current op
687              * since we need to issue the following stream units. */
688             goto fn_exit;
689         }
690 
691         /* We are done with the current operation, make next_op_to_issue points to the
692          * next operation. */
693         target->next_op_to_issue = op->next;
694 
695         if (target->next_op_to_issue == NULL) {
696             if (((target->sync.sync_flag == MPIDI_RMA_SYNC_FLUSH) &&
697                  (op_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH)) ||
698                 ((target->sync.sync_flag == MPIDI_RMA_SYNC_UNLOCK) &&
699                  (op_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))) {
700                 /* We are done with ending sync, unset target's sync_flag. */
701                 target->sync.sync_flag = MPIDI_RMA_SYNC_NONE;
702             }
703         }
704 
705         check_and_set_req_completion(win_ptr, target, op, &op_completed);
706     }
707     else if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_DISCARDED ||
708              pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) {
709         /* We need to re-transmit this operation, so we destroy
710          * the internal request and erase all pkt_flags in current
711          * operation. */
712         if (op->reqs_size == 1) {
713             MPIR_Assert(op->single_req != NULL);
714             MPIR_Request_free(op->single_req);
715             op->single_req = NULL;
716             op->reqs_size = 0;
717         }
718         else if (op->reqs_size > 1) {
719             MPIR_Assert(op->multi_reqs != NULL && op->multi_reqs[0] != NULL);
720             MPIR_Request_free(op->multi_reqs[0]);
721             /* free req array in this op */
722             MPL_free(op->multi_reqs);
723             op->multi_reqs = NULL;
724             op->reqs_size = 0;
725         }
726         MPIDI_CH3_PKT_RMA_ERASE_FLAGS(op->pkt, mpi_errno);
727 
728         op->issued_stream_count = 0;
729     }
730 
731   fn_exit:
732     return mpi_errno;
733   fn_fail:
734     goto fn_exit;
735 }
736 
737 
acquire_local_lock(MPIR_Win * win_ptr,int lock_type)738 static inline int acquire_local_lock(MPIR_Win * win_ptr, int lock_type)
739 {
740     int mpi_errno = MPI_SUCCESS;
741     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_ACQUIRE_LOCAL_LOCK);
742     MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_ACQUIRE_LOCAL_LOCK);
743 
744     MPIR_T_PVAR_TIMER_START(RMA, rma_winlock_getlocallock);
745 
746     if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_type) == 1) {
747         mpi_errno = handle_lock_ack(win_ptr, win_ptr->comm_ptr->rank,
748                                     MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED);
749         MPIR_ERR_CHECK(mpi_errno);
750     }
751     else {
752         /* Queue the lock information. */
753         MPIDI_CH3_Pkt_t pkt;
754         MPIDI_CH3_Pkt_lock_t *lock_pkt = &pkt.lock;
755         MPIDI_RMA_Target_lock_entry_t *new_ptr = NULL;
756         MPIDI_VC_t *my_vc;
757         MPIDI_RMA_Target_lock_entry_t **head_ptr =
758             (MPIDI_RMA_Target_lock_entry_t **) (&(win_ptr->target_lock_queue_head));
759 
760         MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
761         lock_pkt->pkt_flags = MPIDI_CH3_PKT_FLAG_NONE;
762         if (lock_type == MPI_LOCK_SHARED)
763             lock_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED;
764         else {
765             MPIR_Assert(lock_type == MPI_LOCK_EXCLUSIVE);
766             lock_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE;
767         }
768 
769         new_ptr = MPIDI_CH3I_Win_target_lock_entry_alloc(win_ptr, &pkt);
770         if (new_ptr == NULL) {
771             mpi_errno = handle_lock_ack(win_ptr, win_ptr->comm_ptr->rank,
772                                         MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED);
773             MPIR_ERR_CHECK(mpi_errno);
774             goto fn_exit;
775         }
776         DL_APPEND((*head_ptr), new_ptr);
777         MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &my_vc);
778         new_ptr->vc = my_vc;
779 
780         new_ptr->all_data_recved = 1;
781     }
782 
783   fn_exit:
784     MPIR_T_PVAR_TIMER_END(RMA, rma_winlock_getlocallock);
785     MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_ACQUIRE_LOCAL_LOCK);
786     return mpi_errno;
787     /* --BEGIN ERROR HANDLING-- */
788   fn_fail:
789     goto fn_exit;
790     /* --END ERROR HANDLING-- */
791 }
792 
793 
MPIDI_CH3I_RMA_Handle_ack(MPIR_Win * win_ptr,int target_rank)794 static inline int MPIDI_CH3I_RMA_Handle_ack(MPIR_Win * win_ptr, int target_rank)
795 {
796     int mpi_errno = MPI_SUCCESS;
797     MPIDI_RMA_Target_t *t;
798 
799     mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &t);
800     MPIR_ERR_CHECK(mpi_errno);
801 
802     t->sync.outstanding_acks--;
803     MPIR_Assert(t->sync.outstanding_acks >= 0);
804 
805     win_ptr->outstanding_acks--;
806     MPIR_Assert(win_ptr->outstanding_acks >= 0);
807 
808   fn_exit:
809     return mpi_errno;
810   fn_fail:
811     goto fn_exit;
812 }
813 
814 
do_accumulate_op(void * source_buf,int source_count,MPI_Datatype source_dtp,void * target_buf,int target_count,MPI_Datatype target_dtp,MPI_Aint stream_offset,MPI_Op acc_op,MPIDI_RMA_Acc_srcbuf_kind_t srckind)815 static inline int do_accumulate_op(void *source_buf, int source_count, MPI_Datatype source_dtp,
816                                    void *target_buf, int target_count, MPI_Datatype target_dtp,
817                                    MPI_Aint stream_offset, MPI_Op acc_op,
818                                    MPIDI_RMA_Acc_srcbuf_kind_t srckind)
819 {
820     int mpi_errno = MPI_SUCCESS;
821     MPI_User_function *uop = NULL;
822     MPI_Aint source_dtp_size = 0, source_dtp_extent = 0;
823     int is_empty_source = FALSE;
824     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_DO_ACCUMULATE_OP);
825 
826     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_DO_ACCUMULATE_OP);
827 
828     /* first Judge if source buffer is empty */
829     if (acc_op == MPI_NO_OP)
830         is_empty_source = TRUE;
831 
832     if (is_empty_source == FALSE) {
833         MPIR_Assert(MPIR_DATATYPE_IS_PREDEFINED(source_dtp));
834         MPIR_Datatype_get_size_macro(source_dtp, source_dtp_size);
835         MPIR_Datatype_get_extent_macro(source_dtp, source_dtp_extent);
836     }
837 
838     if ((HANDLE_IS_BUILTIN(acc_op))
839         && ((*MPIR_OP_HDL_TO_DTYPE_FN(acc_op)) (source_dtp) == MPI_SUCCESS)){
840         /* get the function by indexing into the op table */
841         uop = MPIR_OP_HDL_TO_FN(acc_op);
842     }
843     else {
844         /* --BEGIN ERROR HANDLING-- */
845         mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
846                                          __func__, __LINE__, MPI_ERR_OP,
847                                          "**opnotpredefined", "**opnotpredefined %d", acc_op);
848         return mpi_errno;
849         /* --END ERROR HANDLING-- */
850     }
851 
852 
853     if (is_empty_source == TRUE || MPIR_DATATYPE_IS_PREDEFINED(target_dtp)) {
854         /* directly apply op if target dtp is predefined dtp OR source buffer is empty */
855         MPI_Aint real_stream_offset;
856         void *curr_target_buf;
857 
858         if (is_empty_source == FALSE) {
859             MPIR_Assert(source_dtp == target_dtp);
860             real_stream_offset = (stream_offset / source_dtp_size) * source_dtp_extent;
861             curr_target_buf = (void *) ((char *) target_buf + real_stream_offset);
862         }
863         else {
864             curr_target_buf = target_buf;
865         }
866 
867         (*uop) (source_buf, curr_target_buf, &source_count, &source_dtp);
868     }
869     else {
870         /* derived datatype */
871         struct iovec *typerep_vec;
872         int i, count;
873         MPI_Aint vec_len, type_extent, type_size, src_type_stride;
874         MPI_Datatype type;
875         MPIR_Datatype*dtp;
876         MPI_Aint curr_len;
877         void *curr_loc;
878         int accumulated_count;
879 
880         MPIR_Datatype_get_ptr(target_dtp, dtp);
881         vec_len = dtp->typerep.num_contig_blocks * target_count + 1;
882         /* +1 needed because Rob says so */
883         typerep_vec = (struct iovec *)
884             MPL_malloc(vec_len * sizeof(struct iovec), MPL_MEM_DATATYPE);
885         /* --BEGIN ERROR HANDLING-- */
886         if (!typerep_vec) {
887             mpi_errno =
888                 MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, __func__, __LINE__,
889                                      MPI_ERR_OTHER, "**nomem", 0);
890             MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_DO_ACCUMULATE_OP);
891             return mpi_errno;
892         }
893         /* --END ERROR HANDLING-- */
894 
895         MPI_Aint max_iov_len = vec_len;
896         MPI_Aint actual_iov_bytes;
897         MPIR_Typerep_to_iov(NULL, target_count, target_dtp, stream_offset, typerep_vec, max_iov_len,
898                          source_count * source_dtp_size, &vec_len, &actual_iov_bytes);
899 
900         type = dtp->basic_type;
901         MPIR_Assert(type != MPI_DATATYPE_NULL);
902 
903         MPIR_Assert(type == source_dtp);
904         type_size = source_dtp_size;
905         type_extent = source_dtp_extent;
906         /* If the source buffer has been packed by the caller, the distance between
907          * two elements can be smaller than extent. E.g., predefined pairtype may
908          * have larger extent than size.*/
909         if (srckind == MPIDI_RMA_ACC_SRCBUF_PACKED)
910             src_type_stride = type_size;
911         else
912             src_type_stride = type_extent;
913 
914         i = 0;
915         curr_loc = typerep_vec[0].iov_base;
916         curr_len = typerep_vec[0].iov_len;
917         accumulated_count = 0;
918         while (i != vec_len) {
919             if (curr_len < type_size) {
920                 MPIR_Assert(i != vec_len);
921                 i++;
922                 curr_len += typerep_vec[i].iov_len;
923                 continue;
924             }
925 
926             MPIR_Assign_trunc(count, curr_len / type_size, int);
927 
928             (*uop) ((char *) source_buf + src_type_stride * accumulated_count,
929                     (char *) target_buf + MPIR_Ptr_to_aint(curr_loc), &count, &type);
930 
931             if (curr_len % type_size == 0) {
932                 i++;
933                 if (i != vec_len) {
934                     curr_loc = typerep_vec[i].iov_base;
935                     curr_len = typerep_vec[i].iov_len;
936                 }
937             }
938             else {
939                 curr_loc = (void *) ((char *) curr_loc + type_extent * count);
940                 curr_len -= type_size * count;
941             }
942 
943             accumulated_count += count;
944         }
945 
946         MPL_free(typerep_vec);
947     }
948 
949   fn_exit:
950     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_DO_ACCUMULATE_OP);
951 
952     return mpi_errno;
953   fn_fail:
954     goto fn_exit;
955 }
956 
957 
check_piggyback_lock(MPIR_Win * win_ptr,MPIDI_VC_t * vc,MPIDI_CH3_Pkt_t * pkt,void * data,intptr_t * buflen,int * acquire_lock_fail,MPIR_Request ** reqp)958 static inline int check_piggyback_lock(MPIR_Win * win_ptr, MPIDI_VC_t * vc,
959                                        MPIDI_CH3_Pkt_t * pkt, void * data,
960                                        intptr_t * buflen,
961                                        int *acquire_lock_fail, MPIR_Request ** reqp)
962 {
963     int lock_type;
964     int pkt_flags;
965     int mpi_errno = MPI_SUCCESS;
966 
967     (*acquire_lock_fail) = 0;
968     (*reqp) = NULL;
969 
970     MPIDI_CH3_PKT_RMA_GET_FLAGS((*pkt), pkt_flags, mpi_errno);
971     if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED || pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
972 
973         if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED)
974             lock_type = MPI_LOCK_SHARED;
975         else {
976             MPIR_Assert(pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE);
977             lock_type = MPI_LOCK_EXCLUSIVE;
978         }
979 
980         if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_type) == 0) {
981             /* cannot acquire the lock, queue up this operation. */
982             mpi_errno = enqueue_lock_origin(win_ptr, vc, pkt, data, buflen, reqp);
983             MPIR_ERR_CHECK(mpi_errno);
984             (*acquire_lock_fail) = 1;
985         }
986     }
987 
988   fn_exit:
989     return mpi_errno;
990   fn_fail:
991     goto fn_exit;
992 }
993 
finish_op_on_target(MPIR_Win * win_ptr,MPIDI_VC_t * vc,int has_response_data,int pkt_flags,MPI_Win source_win_handle)994 static inline int finish_op_on_target(MPIR_Win * win_ptr, MPIDI_VC_t * vc,
995                                       int has_response_data,
996                                       int pkt_flags, MPI_Win source_win_handle)
997 {
998     int mpi_errno = MPI_SUCCESS;
999 
1000     if (!has_response_data) {
1001         /* This is PUT or ACC */
1002         if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
1003             pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
1004             int flags = MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
1005             if ((pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
1006                 (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK)) {
1007                 flags |= MPIDI_CH3_PKT_FLAG_RMA_ACK;
1008             }
1009             MPIR_Assert(source_win_handle != MPI_WIN_NULL);
1010             mpi_errno = MPIDI_CH3I_Send_lock_op_ack_pkt(vc, win_ptr, flags,
1011                                                         source_win_handle, MPI_REQUEST_NULL);
1012             MPIR_ERR_CHECK(mpi_errno);
1013             MPIDI_CH3_Progress_signal_completion();
1014         }
1015         if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) {
1016             if (!(pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
1017                   pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)) {
1018                 /* If op is piggybacked with both LOCK and FLUSH,
1019                  * we only send LOCK ACK back, do not send FLUSH ACK. */
1020                 mpi_errno = MPIDI_CH3I_Send_ack_pkt(vc, win_ptr, source_win_handle);
1021                 MPIR_ERR_CHECK(mpi_errno);
1022             }
1023             MPIDI_CH3_Progress_signal_completion();
1024         }
1025         if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER) {
1026             win_ptr->at_completion_counter--;
1027             MPIR_Assert(win_ptr->at_completion_counter >= 0);
1028             /* Signal the local process when the op counter reaches 0. */
1029             if (win_ptr->at_completion_counter == 0)
1030                 MPIDI_CH3_Progress_signal_completion();
1031         }
1032         if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK) {
1033             if (!(pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
1034                   pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)) {
1035                 /* If op is piggybacked with both LOCK and UNLOCK,
1036                  * we only send LOCK ACK back, do not send FLUSH (UNLOCK) ACK. */
1037                 mpi_errno = MPIDI_CH3I_Send_ack_pkt(vc, win_ptr, source_win_handle);
1038                 MPIR_ERR_CHECK(mpi_errno);
1039             }
1040             mpi_errno = MPIDI_CH3I_Release_lock(win_ptr);
1041             MPIR_ERR_CHECK(mpi_errno);
1042             MPIDI_CH3_Progress_signal_completion();
1043         }
1044     }
1045     else {
1046         /* This is GACC / GET / CAS / FOP */
1047 
1048         if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK) {
1049             mpi_errno = MPIDI_CH3I_Release_lock(win_ptr);
1050             MPIR_ERR_CHECK(mpi_errno);
1051             MPIDI_CH3_Progress_signal_completion();
1052         }
1053 
1054         if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER) {
1055             win_ptr->at_completion_counter--;
1056             MPIR_Assert(win_ptr->at_completion_counter >= 0);
1057             /* Signal the local process when the op counter reaches 0. */
1058             if (win_ptr->at_completion_counter == 0)
1059                 MPIDI_CH3_Progress_signal_completion();
1060         }
1061     }
1062 
1063   fn_exit:
1064     return mpi_errno;
1065   fn_fail:
1066     goto fn_exit;
1067 }
1068 
1069 
fill_ranks_in_win_grp(MPIR_Win * win_ptr,MPIR_Group * group_ptr,int * ranks_in_win_grp)1070 static inline int fill_ranks_in_win_grp(MPIR_Win * win_ptr, MPIR_Group * group_ptr,
1071                                         int *ranks_in_win_grp)
1072 {
1073     int mpi_errno = MPI_SUCCESS;
1074     int i, *ranks_in_grp;
1075     MPIR_Group *win_grp_ptr;
1076     MPIR_CHKLMEM_DECL(1);
1077     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_FILL_RANKS_IN_WIN_GRP);
1078 
1079     MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_FILL_RANKS_IN_WIN_GRP);
1080 
1081     MPIR_CHKLMEM_MALLOC(ranks_in_grp, int *, group_ptr->size * sizeof(int),
1082                         mpi_errno, "ranks_in_grp", MPL_MEM_RMA);
1083     for (i = 0; i < group_ptr->size; i++)
1084         ranks_in_grp[i] = i;
1085 
1086     mpi_errno = MPIR_Comm_group_impl(win_ptr->comm_ptr, &win_grp_ptr);
1087     MPIR_ERR_CHECK(mpi_errno);
1088 
1089     mpi_errno = MPIR_Group_translate_ranks_impl(group_ptr, group_ptr->size,
1090                                                 ranks_in_grp, win_grp_ptr, ranks_in_win_grp);
1091     MPIR_ERR_CHECK(mpi_errno);
1092 
1093     mpi_errno = MPIR_Group_free_impl(win_grp_ptr);
1094     MPIR_ERR_CHECK(mpi_errno);
1095 
1096   fn_exit:
1097     MPIR_CHKLMEM_FREEALL();
1098     MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_FILL_RANKS_IN_WIN_GRP);
1099     return mpi_errno;
1100   fn_fail:
1101     goto fn_exit;
1102 }
1103 
1104 
wait_progress_engine(void)1105 static inline int wait_progress_engine(void)
1106 {
1107     int mpi_errno = MPI_SUCCESS;
1108     MPID_Progress_state progress_state;
1109 
1110     MPID_Progress_start(&progress_state);
1111     mpi_errno = MPID_Progress_wait(&progress_state);
1112     /* --BEGIN ERROR HANDLING-- */
1113     if (mpi_errno != MPI_SUCCESS) {
1114         MPID_Progress_end(&progress_state);
1115         MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**winnoprogress");
1116     }
1117     /* --END ERROR HANDLING-- */
1118     MPID_Progress_end(&progress_state);
1119 
1120   fn_exit:
1121     return mpi_errno;
1122   fn_fail:
1123     goto fn_exit;
1124 }
1125 
poke_progress_engine(void)1126 static inline int poke_progress_engine(void)
1127 {
1128     int mpi_errno = MPI_SUCCESS;
1129     MPID_Progress_state progress_state;
1130 
1131     MPID_Progress_start(&progress_state);
1132     mpi_errno = MPID_Progress_poke();
1133     MPIR_ERR_CHECK(mpi_errno);
1134     MPID_Progress_end(&progress_state);
1135 
1136   fn_exit:
1137     return mpi_errno;
1138   fn_fail:
1139     goto fn_exit;
1140 }
1141 
MPIDI_CH3_ExtPkt_Accum_get_stream(int pkt_flags,int is_derived_dt,void * ext_hdr_ptr,MPI_Aint * stream_offset)1142 static inline void MPIDI_CH3_ExtPkt_Accum_get_stream(int pkt_flags,
1143                                                      int is_derived_dt, void *ext_hdr_ptr,
1144                                                      MPI_Aint * stream_offset)
1145 {
1146     if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
1147         MPIR_Assert(ext_hdr_ptr != NULL);
1148         (*stream_offset) = ((MPIDI_CH3_Ext_pkt_stream_t *) ext_hdr_ptr)->stream_offset;
1149     }
1150 }
1151 
MPIDI_CH3_ExtPkt_Gaccum_get_stream(int pkt_flags,int is_derived_dt,void * ext_hdr_ptr,MPI_Aint * stream_offset)1152 static inline void MPIDI_CH3_ExtPkt_Gaccum_get_stream(int pkt_flags,
1153                                                       int is_derived_dt, void *ext_hdr_ptr,
1154                                                       MPI_Aint * stream_offset)
1155 {
1156     /* We do not check packet match here, because error must have already been
1157      * reported at header init time (on origin) and at packet receive time (on target).  */
1158     MPIDI_CH3_ExtPkt_Accum_get_stream(pkt_flags, is_derived_dt, ext_hdr_ptr, stream_offset);
1159 }
1160 
1161 #endif /* MPIDRMA_H_INCLUDED */
1162