1 /*
2 * Copyright (C) by Argonne National Laboratory
3 * See COPYRIGHT in top-level directory
4 */
5
6 #ifndef MPIDRMA_H_INCLUDED
7 #define MPIDRMA_H_INCLUDED
8
9 #include "mpid_rma_types.h"
10 #include "mpid_rma_oplist.h"
11 #include "mpid_rma_shm.h"
12 #include "mpid_rma_issue.h"
13 #include "mpid_rma_lockqueue.h"
14
send_lock_msg(int dest,int lock_type,MPIR_Win * win_ptr)15 static inline int send_lock_msg(int dest, int lock_type, MPIR_Win * win_ptr)
16 {
17 int mpi_errno = MPI_SUCCESS;
18 MPIDI_CH3_Pkt_t upkt;
19 MPIDI_CH3_Pkt_lock_t *lock_pkt = &upkt.lock;
20 MPIR_Request *req = NULL;
21 MPIDI_VC_t *vc;
22 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_SEND_LOCK_MSG);
23 MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_SEND_LOCK_MSG);
24
25 MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dest, &vc);
26
27 MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
28 lock_pkt->target_win_handle = win_ptr->basic_info_table[dest].win_handle;
29 lock_pkt->source_win_handle = win_ptr->handle;
30 lock_pkt->request_handle = MPI_REQUEST_NULL;
31 lock_pkt->pkt_flags = MPIDI_CH3_PKT_FLAG_NONE;
32 if (lock_type == MPI_LOCK_SHARED)
33 lock_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED;
34 else {
35 MPIR_Assert(lock_type == MPI_LOCK_EXCLUSIVE);
36 lock_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE;
37 }
38
39 MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
40 mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_pkt, sizeof(*lock_pkt), &req);
41 MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
42 MPIR_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg");
43
44 /* release the request returned by iStartMsg */
45 if (req != NULL) {
46 MPIR_Request_free(req);
47 }
48
49 fn_exit:
50 MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_SEND_LOCK_MSG);
51 return mpi_errno;
52 /* --BEGIN ERROR HANDLING-- */
53 fn_fail:
54 goto fn_exit;
55 /* --END ERROR HANDLING-- */
56 }
57
send_unlock_msg(int dest,MPIR_Win * win_ptr,int pkt_flags)58 static inline int send_unlock_msg(int dest, MPIR_Win * win_ptr, int pkt_flags)
59 {
60 int mpi_errno = MPI_SUCCESS;
61 MPIDI_CH3_Pkt_t upkt;
62 MPIDI_CH3_Pkt_unlock_t *unlock_pkt = &upkt.unlock;
63 MPIR_Request *req = NULL;
64 MPIDI_VC_t *vc;
65 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_SEND_UNLOCK_MSG);
66 MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_SEND_UNLOCK_MSG);
67
68 MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dest, &vc);
69
70 /* Send a lock packet over to the target. wait for the lock_granted
71 * reply. Then do all the RMA ops. */
72
73 MPIDI_Pkt_init(unlock_pkt, MPIDI_CH3_PKT_UNLOCK);
74 unlock_pkt->target_win_handle = win_ptr->basic_info_table[dest].win_handle;
75 unlock_pkt->source_win_handle = win_ptr->handle;
76 unlock_pkt->pkt_flags = pkt_flags;
77
78 MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
79 mpi_errno = MPIDI_CH3_iStartMsg(vc, unlock_pkt, sizeof(*unlock_pkt), &req);
80 MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
81 MPIR_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg");
82
83 /* Release the request returned by iStartMsg */
84 if (req != NULL) {
85 MPIR_Request_free(req);
86 }
87
88 fn_exit:
89 MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_SEND_UNLOCK_MSG);
90 return mpi_errno;
91 /* --BEGIN ERROR HANDLING-- */
92 fn_fail:
93 goto fn_exit;
94 /* --END ERROR HANDLING-- */
95 }
96
97
MPIDI_CH3I_Send_lock_ack_pkt(MPIDI_VC_t * vc,MPIR_Win * win_ptr,int pkt_flags,MPI_Win source_win_handle,MPI_Request request_handle)98 static inline int MPIDI_CH3I_Send_lock_ack_pkt(MPIDI_VC_t * vc, MPIR_Win * win_ptr,
99 int pkt_flags,
100 MPI_Win source_win_handle,
101 MPI_Request request_handle)
102 {
103 MPIDI_CH3_Pkt_t upkt;
104 MPIDI_CH3_Pkt_lock_ack_t *lock_ack_pkt = &upkt.lock_ack;
105 MPIR_Request *req = NULL;
106 int mpi_errno;
107 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_LOCK_ACK_PKT);
108
109 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3I_SEND_LOCK_ACK_PKT);
110
111 MPIR_Assert(!(source_win_handle != MPI_WIN_NULL && request_handle != MPI_REQUEST_NULL));
112
113 /* send lock ack packet */
114 MPIDI_Pkt_init(lock_ack_pkt, MPIDI_CH3_PKT_LOCK_ACK);
115 lock_ack_pkt->source_win_handle = source_win_handle;
116 lock_ack_pkt->request_handle = request_handle;
117 lock_ack_pkt->target_rank = win_ptr->comm_ptr->rank;
118 lock_ack_pkt->pkt_flags = pkt_flags;
119
120 MPL_DBG_MSG_FMT(MPIDI_CH3_DBG_OTHER, VERBOSE,
121 (MPL_DBG_FDEST, "sending lock ack pkt on vc=%p, source_win_handle=%#08x",
122 vc, lock_ack_pkt->source_win_handle));
123
124 MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
125 mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_ack_pkt, sizeof(*lock_ack_pkt), &req);
126 MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
127 if (mpi_errno) {
128 MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
129 }
130
131 if (req != NULL) {
132 MPIR_Request_free(req);
133 }
134
135 fn_fail:
136 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3I_SEND_LOCK_ACK_PKT);
137
138 return mpi_errno;
139 }
140
MPIDI_CH3I_Send_lock_op_ack_pkt(MPIDI_VC_t * vc,MPIR_Win * win_ptr,int pkt_flags,MPI_Win source_win_handle,MPI_Request request_handle)141 static inline int MPIDI_CH3I_Send_lock_op_ack_pkt(MPIDI_VC_t * vc, MPIR_Win * win_ptr,
142 int pkt_flags,
143 MPI_Win source_win_handle,
144 MPI_Request request_handle)
145 {
146 MPIDI_CH3_Pkt_t upkt;
147 MPIDI_CH3_Pkt_lock_op_ack_t *lock_op_ack_pkt = &upkt.lock_op_ack;
148 MPIR_Request *req = NULL;
149 int mpi_errno;
150 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT);
151
152 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT);
153
154 MPIR_Assert(!(source_win_handle != MPI_WIN_NULL && request_handle != MPI_REQUEST_NULL));
155
156 /* send lock ack packet */
157 MPIDI_Pkt_init(lock_op_ack_pkt, MPIDI_CH3_PKT_LOCK_OP_ACK);
158 lock_op_ack_pkt->source_win_handle = source_win_handle;
159 lock_op_ack_pkt->request_handle = request_handle;
160 lock_op_ack_pkt->target_rank = win_ptr->comm_ptr->rank;
161 lock_op_ack_pkt->pkt_flags = pkt_flags;
162
163 MPL_DBG_MSG_FMT(MPIDI_CH3_DBG_OTHER, VERBOSE,
164 (MPL_DBG_FDEST, "sending lock op ack pkt on vc=%p, source_win_handle=%#08x",
165 vc, lock_op_ack_pkt->source_win_handle));
166
167 MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
168 mpi_errno = MPIDI_CH3_iStartMsg(vc, lock_op_ack_pkt, sizeof(*lock_op_ack_pkt), &req);
169 MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
170 if (mpi_errno) {
171 MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
172 }
173
174 if (req != NULL) {
175 MPIR_Request_free(req);
176 }
177
178 fn_fail:
179 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3I_SEND_LOCK_OP_ACK_PKT);
180 return mpi_errno;
181 }
182
183
MPIDI_CH3I_Send_ack_pkt(MPIDI_VC_t * vc,MPIR_Win * win_ptr,MPI_Win source_win_handle)184 static inline int MPIDI_CH3I_Send_ack_pkt(MPIDI_VC_t * vc, MPIR_Win * win_ptr,
185 MPI_Win source_win_handle)
186 {
187 MPIDI_CH3_Pkt_t upkt;
188 MPIDI_CH3_Pkt_ack_t *ack_pkt = &upkt.ack;
189 MPIR_Request *req;
190 int mpi_errno = MPI_SUCCESS;
191 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_MPIDI_CH3I_SEND_ACK_PKT);
192
193 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_MPIDI_CH3I_SEND_ACK_PKT);
194
195 MPIDI_Pkt_init(ack_pkt, MPIDI_CH3_PKT_ACK);
196 ack_pkt->source_win_handle = source_win_handle;
197 ack_pkt->target_rank = win_ptr->comm_ptr->rank;
198
199 /* Because this is in a packet handler, it is already within a critical section */
200 /* MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex); */
201 mpi_errno = MPIDI_CH3_iStartMsg(vc, ack_pkt, sizeof(*ack_pkt), &req);
202 /* MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex); */
203 if (mpi_errno != MPI_SUCCESS) {
204 MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
205 }
206
207 if (req != NULL) {
208 MPIR_Request_free(req);
209 }
210
211 fn_fail:
212 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_MPIDI_CH3I_SEND_ACK_PKT);
213 return mpi_errno;
214 }
215
216
send_decr_at_cnt_msg(int dst,MPIR_Win * win_ptr,int pkt_flags)217 static inline int send_decr_at_cnt_msg(int dst, MPIR_Win * win_ptr, int pkt_flags)
218 {
219 MPIDI_CH3_Pkt_t upkt;
220 MPIDI_CH3_Pkt_decr_at_counter_t *decr_at_cnt_pkt = &upkt.decr_at_cnt;
221 MPIDI_VC_t *vc;
222 MPIR_Request *request = NULL;
223 int mpi_errno = MPI_SUCCESS;
224 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_SEND_DECR_AT_CNT_MSG);
225 MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_SEND_DECR_AT_CNT_MSG);
226
227 MPIDI_Pkt_init(decr_at_cnt_pkt, MPIDI_CH3_PKT_DECR_AT_COUNTER);
228 decr_at_cnt_pkt->target_win_handle = win_ptr->basic_info_table[dst].win_handle;
229 decr_at_cnt_pkt->source_win_handle = win_ptr->handle;
230 decr_at_cnt_pkt->pkt_flags = pkt_flags;
231
232 MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dst, &vc);
233
234 MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
235 mpi_errno = MPIDI_CH3_iStartMsg(vc, decr_at_cnt_pkt, sizeof(*decr_at_cnt_pkt), &request);
236 MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
237 if (mpi_errno != MPI_SUCCESS) {
238 MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
239 }
240
241 if (request != NULL) {
242 MPIR_Request_free(request);
243 }
244
245 fn_exit:
246 MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_SEND_DECR_AT_CNT_MSG);
247 return mpi_errno;
248 /* --BEGIN ERROR HANDLING-- */
249 fn_fail:
250 goto fn_exit;
251 /* --END ERROR HANDLING-- */
252 }
253
254
send_flush_msg(int dest,MPIR_Win * win_ptr)255 static inline int send_flush_msg(int dest, MPIR_Win * win_ptr)
256 {
257 int mpi_errno = MPI_SUCCESS;
258 MPIDI_CH3_Pkt_t upkt;
259 MPIDI_CH3_Pkt_flush_t *flush_pkt = &upkt.flush;
260 MPIR_Request *req = NULL;
261 MPIDI_VC_t *vc;
262 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_SEND_FLUSH_MSG);
263 MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_SEND_FLUSH_MSG);
264
265 MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, dest, &vc);
266
267 MPIDI_Pkt_init(flush_pkt, MPIDI_CH3_PKT_FLUSH);
268 flush_pkt->target_win_handle = win_ptr->basic_info_table[dest].win_handle;
269 flush_pkt->source_win_handle = win_ptr->handle;
270
271 MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
272 mpi_errno = MPIDI_CH3_iStartMsg(vc, flush_pkt, sizeof(*flush_pkt), &req);
273 MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
274 MPIR_ERR_CHKANDJUMP(mpi_errno != MPI_SUCCESS, mpi_errno, MPI_ERR_OTHER, "**ch3|rma_msg");
275
276 /* Release the request returned by iStartMsg */
277 if (req != NULL) {
278 MPIR_Request_free(req);
279 }
280
281 fn_exit:
282 MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_SEND_FLUSH_MSG);
283 return mpi_errno;
284 /* --BEGIN ERROR HANDLING-- */
285 fn_fail:
286 goto fn_exit;
287 /* --END ERROR HANDLING-- */
288 }
289
290
291 /* enqueue an unsatisfied origin in passive target at target side. */
enqueue_lock_origin(MPIR_Win * win_ptr,MPIDI_VC_t * vc,MPIDI_CH3_Pkt_t * pkt,void * data,intptr_t * buflen,MPIR_Request ** reqp)292 static inline int enqueue_lock_origin(MPIR_Win * win_ptr, MPIDI_VC_t * vc,
293 MPIDI_CH3_Pkt_t * pkt, void * data,
294 intptr_t * buflen, MPIR_Request ** reqp)
295 {
296 MPIDI_RMA_Target_lock_entry_t *new_ptr = NULL;
297 int flag;
298 MPI_Win source_win_handle;
299 MPI_Request request_handle;
300 int lock_discarded = 0, data_discarded = 0;
301 int mpi_errno = MPI_SUCCESS;
302
303 (*reqp) = NULL;
304
305 new_ptr = MPIDI_CH3I_Win_target_lock_entry_alloc(win_ptr, pkt);
306 if (new_ptr != NULL) {
307 MPIDI_RMA_Target_lock_entry_t **head_ptr =
308 (MPIDI_RMA_Target_lock_entry_t **) (&(win_ptr->target_lock_queue_head));
309 DL_APPEND((*head_ptr), new_ptr);
310 new_ptr->vc = vc;
311 }
312 else {
313 lock_discarded = 1;
314 }
315
316 if (MPIDI_CH3I_RMA_PKT_IS_IMMED_OP(*pkt) || pkt->type == MPIDI_CH3_PKT_LOCK ||
317 pkt->type == MPIDI_CH3_PKT_GET) {
318 /* return bytes of data processed in this pkt handler */
319 (*buflen) = 0;
320
321 if (new_ptr != NULL)
322 new_ptr->all_data_recved = 1;
323
324 goto issue_ack;
325 }
326 else {
327 MPI_Aint type_size = 0;
328 MPI_Aint type_extent;
329 intptr_t recv_data_sz = 0;
330 intptr_t buf_size = 0;
331 MPIR_Request *req = NULL;
332 MPI_Datatype target_dtp;
333 int target_count;
334 int complete = 0;
335 intptr_t data_len;
336 int pkt_flags;
337
338 /* This is PUT, ACC, GACC, FOP */
339
340 MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE((*pkt), target_dtp, mpi_errno);
341 MPIDI_CH3_PKT_RMA_GET_TARGET_COUNT((*pkt), target_count, mpi_errno);
342 MPIDI_CH3_PKT_RMA_GET_FLAGS((*pkt), pkt_flags, mpi_errno);
343
344 MPIR_Datatype_get_extent_macro(target_dtp, type_extent);
345 MPIR_Datatype_get_size_macro(target_dtp, type_size);
346
347 if (pkt->type == MPIDI_CH3_PKT_PUT) {
348 recv_data_sz = type_size * target_count;
349 buf_size = type_extent * target_count;
350 }
351 else {
352 MPI_Aint stream_elem_count;
353 MPI_Aint total_len;
354 MPI_Op op;
355
356 MPIDI_CH3_PKT_RMA_GET_OP((*pkt), op, mpi_errno);
357 if (op != MPI_NO_OP) {
358 stream_elem_count = MPIDI_CH3U_SRBuf_size / type_extent;
359 total_len = type_size * target_count;
360 recv_data_sz = MPL_MIN(total_len, type_size * stream_elem_count);
361 buf_size = type_extent * (recv_data_sz / type_size);
362 }
363 }
364
365 if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
366 MPIR_Assert(pkt->type == MPIDI_CH3_PKT_ACCUMULATE ||
367 pkt->type == MPIDI_CH3_PKT_GET_ACCUM);
368
369 recv_data_sz += sizeof(MPIDI_CH3_Ext_pkt_stream_t);
370 buf_size += sizeof(MPIDI_CH3_Ext_pkt_stream_t);
371 }
372
373 if (new_ptr != NULL) {
374 if (win_ptr->current_target_lock_data_bytes + buf_size <
375 MPIR_CVAR_CH3_RMA_TARGET_LOCK_DATA_BYTES) {
376 new_ptr->data = MPL_malloc(buf_size, MPL_MEM_BUFFER);
377 }
378
379 if (new_ptr->data == NULL) {
380 /* Note that there are two possible reasons to make new_ptr->data to be NULL:
381 * (1) win_ptr->current_target_lock_data_bytes + buf_size >= MPIR_CVAR_CH3_RMA_TARGET_LOCK_DATA_BYTES;
382 * (2) MPL_malloc(buf_size) failed.
383 * In such cases, we cannot allocate memory for lock data, so we give up
384 * buffering lock data, however, we still buffer lock request.
385 */
386 MPIDI_CH3_Pkt_t new_pkt;
387 MPIDI_CH3_Pkt_lock_t *lock_pkt = &new_pkt.lock;
388 MPI_Win target_win_handle;
389
390 MPIDI_CH3_PKT_RMA_GET_TARGET_WIN_HANDLE((*pkt), target_win_handle, mpi_errno);
391
392 if (!MPIDI_CH3I_RMA_PKT_IS_READ_OP(*pkt)) {
393 MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno);
394 request_handle = MPI_REQUEST_NULL;
395 }
396 else {
397 source_win_handle = MPI_WIN_NULL;
398 MPIDI_CH3_PKT_RMA_GET_REQUEST_HANDLE((*pkt), request_handle, mpi_errno);
399 }
400
401 MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
402 lock_pkt->target_win_handle = target_win_handle;
403 lock_pkt->source_win_handle = source_win_handle;
404 lock_pkt->request_handle = request_handle;
405 lock_pkt->pkt_flags = pkt_flags;
406
407 /* replace original pkt with lock pkt */
408 new_ptr->pkt = new_pkt;
409 new_ptr->all_data_recved = 1;
410
411 data_discarded = 1;
412 }
413 else {
414 win_ptr->current_target_lock_data_bytes += buf_size;
415 new_ptr->buf_size = buf_size;
416 }
417 }
418
419 /* create request to receive upcoming requests */
420 req = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
421 MPIR_Object_set_ref(req, 1);
422
423 /* fill in area in req that will be used in Receive_data_found() */
424 if (lock_discarded || data_discarded) {
425 req->dev.drop_data = TRUE;
426 req->dev.user_buf = NULL;
427 req->dev.user_count = target_count;
428 req->dev.datatype = target_dtp;
429 req->dev.recv_data_sz = recv_data_sz;
430 req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
431 req->dev.OnFinal = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
432 req->dev.target_lock_queue_entry = new_ptr;
433
434 data_len = *buflen;
435 MPIR_Assert(req->dev.recv_data_sz >= 0);
436 }
437 else {
438 req->dev.user_buf = new_ptr->data;
439 req->dev.user_count = target_count;
440 req->dev.datatype = target_dtp;
441 req->dev.recv_data_sz = recv_data_sz;
442 req->dev.OnDataAvail = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
443 req->dev.OnFinal = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete;
444 req->dev.target_lock_queue_entry = new_ptr;
445
446 data_len = *buflen;
447 MPIR_Assert(req->dev.recv_data_sz >= 0);
448 }
449
450 mpi_errno = MPIDI_CH3U_Receive_data_found(req, data, &data_len, &complete);
451 MPIR_ERR_CHECK(mpi_errno);
452
453 /* return bytes of data processed in this pkt handler */
454 (*buflen) = data_len;
455
456 if (complete) {
457 mpi_errno = MPIDI_CH3_ReqHandler_PiggybackLockOpRecvComplete(vc, req, &complete);
458 MPIR_ERR_CHECK(mpi_errno);
459 if (complete) {
460 goto issue_ack;
461 }
462 }
463
464 (*reqp) = req;
465 }
466
467 issue_ack:
468 if (pkt->type == MPIDI_CH3_PKT_LOCK) {
469 if (lock_discarded)
470 flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED;
471 else
472 flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED;
473
474 MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno);
475 MPIDI_CH3_PKT_RMA_GET_REQUEST_HANDLE((*pkt), request_handle, mpi_errno);
476
477 mpi_errno =
478 MPIDI_CH3I_Send_lock_ack_pkt(vc, win_ptr, flag, source_win_handle, request_handle);
479 MPIR_ERR_CHECK(mpi_errno);
480 }
481 else {
482 if (lock_discarded)
483 flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED;
484 else if (data_discarded)
485 flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_DISCARDED;
486 else
487 flag = MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_QUEUED;
488
489 if (!MPIDI_CH3I_RMA_PKT_IS_READ_OP(*pkt)) {
490 MPIDI_CH3_PKT_RMA_GET_SOURCE_WIN_HANDLE((*pkt), source_win_handle, mpi_errno);
491 request_handle = MPI_REQUEST_NULL;
492 }
493 else {
494 source_win_handle = MPI_WIN_NULL;
495 MPIDI_CH3_PKT_RMA_GET_REQUEST_HANDLE((*pkt), request_handle, mpi_errno);
496 }
497
498 mpi_errno =
499 MPIDI_CH3I_Send_lock_op_ack_pkt(vc, win_ptr, flag, source_win_handle, request_handle);
500 MPIR_ERR_CHECK(mpi_errno);
501 }
502
503 fn_exit:
504 return mpi_errno;
505 fn_fail:
506 goto fn_exit;
507 }
508
509
handle_lock_ack(MPIR_Win * win_ptr,int target_rank,int pkt_flags)510 static inline int handle_lock_ack(MPIR_Win * win_ptr, int target_rank, int pkt_flags)
511 {
512 MPIDI_RMA_Target_t *t = NULL;
513 int mpi_errno = MPI_SUCCESS;
514
515 MPIR_Assert(win_ptr->states.access_state == MPIDI_RMA_PER_TARGET ||
516 win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED ||
517 win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_ISSUED);
518
519 if (win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_CALLED) {
520 MPIDI_VC_t *orig_vc = NULL, *target_vc = NULL;
521 MPIDI_Comm_get_vc(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &orig_vc);
522 MPIDI_Comm_get_vc(win_ptr->comm_ptr, target_rank, &target_vc);
523 if (win_ptr->comm_ptr->rank == target_rank ||
524 (win_ptr->shm_allocated == TRUE && orig_vc->node_id == target_vc->node_id)) {
525 if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
526 win_ptr->outstanding_locks--;
527 MPIR_Assert(win_ptr->outstanding_locks >= 0);
528 }
529 else if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) {
530 /* re-send lock request message. */
531 mpi_errno = send_lock_msg(target_rank, MPI_LOCK_SHARED, win_ptr);
532 MPIR_ERR_CHECK(mpi_errno);
533 }
534 goto fn_exit;
535 }
536 }
537 else if (win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_ISSUED) {
538 if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
539 win_ptr->outstanding_locks--;
540 MPIR_Assert(win_ptr->outstanding_locks >= 0);
541 if (win_ptr->outstanding_locks == 0) {
542 win_ptr->states.access_state = MPIDI_RMA_LOCK_ALL_GRANTED;
543
544 if (win_ptr->num_targets_with_pending_net_ops) {
545 mpi_errno = MPIDI_CH3I_Win_set_active(win_ptr);
546 MPIR_ERR_CHECK(mpi_errno);
547 }
548 }
549 }
550 else if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) {
551 /* re-send lock request message. */
552 mpi_errno = send_lock_msg(target_rank, MPI_LOCK_SHARED, win_ptr);
553 MPIR_ERR_CHECK(mpi_errno);
554 }
555 goto fn_exit;
556 }
557
558 mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &t);
559 MPIR_ERR_CHECK(mpi_errno);
560 MPIR_Assert(t != NULL);
561
562 if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
563 t->access_state = MPIDI_RMA_LOCK_GRANTED;
564 if (t->pending_net_ops_list_head)
565 MPIDI_CH3I_Win_set_active(win_ptr);
566 }
567
568 if (win_ptr->states.access_state == MPIDI_RMA_LOCK_ALL_GRANTED ||
569 t->access_state == MPIDI_RMA_LOCK_GRANTED) {
570 if (t->pending_net_ops_list_head == NULL) {
571 int made_progress ATTRIBUTE((unused)) = 0;
572 mpi_errno =
573 MPIDI_CH3I_RMA_Make_progress_target(win_ptr, t->target_rank, &made_progress);
574 MPIR_ERR_CHECK(mpi_errno);
575 }
576 }
577
578 if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED)
579 t->access_state = MPIDI_RMA_LOCK_CALLED;
580
581 fn_exit:
582 return mpi_errno;
583 fn_fail:
584 goto fn_exit;
585 }
586
587
check_and_set_req_completion(MPIR_Win * win_ptr,MPIDI_RMA_Target_t * target,MPIDI_RMA_Op_t * rma_op,int * op_completed)588 static inline int check_and_set_req_completion(MPIR_Win * win_ptr, MPIDI_RMA_Target_t * target,
589 MPIDI_RMA_Op_t * rma_op, int *op_completed)
590 {
591 int i, mpi_errno = MPI_SUCCESS;
592 int incomplete_req_cnt = 0;
593 MPIR_Request **req = NULL;
594 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_CHECK_AND_SET_REQ_COMPLETION);
595
596 MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_CHECK_AND_SET_REQ_COMPLETION);
597
598 (*op_completed) = FALSE;
599
600 for (i = 0; i < rma_op->reqs_size; i++) {
601 if (rma_op->reqs_size == 1)
602 req = &(rma_op->single_req);
603 else
604 req = &(rma_op->multi_reqs[i]);
605
606 if ((*req) == NULL)
607 continue;
608
609 if (MPIR_Request_is_complete((*req))) {
610 MPIR_Request_free((*req));
611 (*req) = NULL;
612 }
613 else {
614 (*req)->dev.request_completed_cb = MPIDI_CH3_Req_handler_rma_op_complete;
615 (*req)->dev.source_win_handle = win_ptr->handle;
616 (*req)->dev.rma_target_ptr = target;
617
618 incomplete_req_cnt++;
619
620 if (rma_op->ureq != NULL) {
621 MPIR_cc_set(&(rma_op->ureq->cc), incomplete_req_cnt);
622 (*req)->dev.request_handle = rma_op->ureq->handle;
623 }
624
625 MPIR_Request_free((*req));
626 }
627 }
628
629 if (incomplete_req_cnt == 0) {
630 if (rma_op->ureq != NULL) {
631 mpi_errno = MPID_Request_complete(rma_op->ureq);
632 MPIR_ERR_CHECK(mpi_errno);
633 }
634 (*op_completed) = TRUE;
635 }
636 else {
637 MPIDI_CH3I_RMA_Active_req_cnt += incomplete_req_cnt;
638 target->num_pkts_wait_for_local_completion += incomplete_req_cnt;
639 }
640
641 MPIDI_CH3I_RMA_Ops_free_elem(win_ptr, &(target->pending_net_ops_list_head), rma_op);
642
643 if (target->pending_net_ops_list_head == NULL) {
644 win_ptr->num_targets_with_pending_net_ops--;
645 MPIR_Assert(win_ptr->num_targets_with_pending_net_ops >= 0);
646 if (win_ptr->num_targets_with_pending_net_ops == 0) {
647 MPIDI_CH3I_Win_set_inactive(win_ptr);
648 }
649 }
650
651 fn_exit:
652 MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_CHECK_AND_SET_REQ_COMPLETION);
653 return mpi_errno;
654 fn_fail:
655 goto fn_exit;
656 }
657
658
handle_lock_ack_with_op(MPIR_Win * win_ptr,int target_rank,int pkt_flags)659 static inline int handle_lock_ack_with_op(MPIR_Win * win_ptr,
660 int target_rank, int pkt_flags)
661 {
662 MPIDI_RMA_Target_t *target = NULL;
663 MPIDI_RMA_Op_t *op = NULL;
664 int op_flags = MPIDI_CH3_PKT_FLAG_NONE;
665 int op_completed ATTRIBUTE((unused)) = FALSE;
666 int mpi_errno = MPI_SUCCESS;
667
668 mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &target);
669 MPIR_ERR_CHECK(mpi_errno);
670 MPIR_Assert(target != NULL);
671
672 /* Here the next_op_to_issue pointer should still point to the OP piggybacked
673 * with LOCK */
674 op = target->next_op_to_issue;
675 MPIR_Assert(op != NULL);
676
677 MPIDI_CH3_PKT_RMA_GET_FLAGS(op->pkt, op_flags, mpi_errno);
678 MPIR_Assert(op_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
679 op_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE);
680
681 if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED) {
682
683 if ((op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE || op->pkt.type == MPIDI_CH3_PKT_GET_ACCUM)
684 && op->issued_stream_count != ALL_STREAM_UNITS_ISSUED) {
685 /* Now we successfully issue out the first stream unit,
686 * keep next_op_to_issue still stick to the current op
687 * since we need to issue the following stream units. */
688 goto fn_exit;
689 }
690
691 /* We are done with the current operation, make next_op_to_issue points to the
692 * next operation. */
693 target->next_op_to_issue = op->next;
694
695 if (target->next_op_to_issue == NULL) {
696 if (((target->sync.sync_flag == MPIDI_RMA_SYNC_FLUSH) &&
697 (op_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH)) ||
698 ((target->sync.sync_flag == MPIDI_RMA_SYNC_UNLOCK) &&
699 (op_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK))) {
700 /* We are done with ending sync, unset target's sync_flag. */
701 target->sync.sync_flag = MPIDI_RMA_SYNC_NONE;
702 }
703 }
704
705 check_and_set_req_completion(win_ptr, target, op, &op_completed);
706 }
707 else if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_QUEUED_DATA_DISCARDED ||
708 pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED) {
709 /* We need to re-transmit this operation, so we destroy
710 * the internal request and erase all pkt_flags in current
711 * operation. */
712 if (op->reqs_size == 1) {
713 MPIR_Assert(op->single_req != NULL);
714 MPIR_Request_free(op->single_req);
715 op->single_req = NULL;
716 op->reqs_size = 0;
717 }
718 else if (op->reqs_size > 1) {
719 MPIR_Assert(op->multi_reqs != NULL && op->multi_reqs[0] != NULL);
720 MPIR_Request_free(op->multi_reqs[0]);
721 /* free req array in this op */
722 MPL_free(op->multi_reqs);
723 op->multi_reqs = NULL;
724 op->reqs_size = 0;
725 }
726 MPIDI_CH3_PKT_RMA_ERASE_FLAGS(op->pkt, mpi_errno);
727
728 op->issued_stream_count = 0;
729 }
730
731 fn_exit:
732 return mpi_errno;
733 fn_fail:
734 goto fn_exit;
735 }
736
737
acquire_local_lock(MPIR_Win * win_ptr,int lock_type)738 static inline int acquire_local_lock(MPIR_Win * win_ptr, int lock_type)
739 {
740 int mpi_errno = MPI_SUCCESS;
741 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_ACQUIRE_LOCAL_LOCK);
742 MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_ACQUIRE_LOCAL_LOCK);
743
744 MPIR_T_PVAR_TIMER_START(RMA, rma_winlock_getlocallock);
745
746 if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_type) == 1) {
747 mpi_errno = handle_lock_ack(win_ptr, win_ptr->comm_ptr->rank,
748 MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED);
749 MPIR_ERR_CHECK(mpi_errno);
750 }
751 else {
752 /* Queue the lock information. */
753 MPIDI_CH3_Pkt_t pkt;
754 MPIDI_CH3_Pkt_lock_t *lock_pkt = &pkt.lock;
755 MPIDI_RMA_Target_lock_entry_t *new_ptr = NULL;
756 MPIDI_VC_t *my_vc;
757 MPIDI_RMA_Target_lock_entry_t **head_ptr =
758 (MPIDI_RMA_Target_lock_entry_t **) (&(win_ptr->target_lock_queue_head));
759
760 MPIDI_Pkt_init(lock_pkt, MPIDI_CH3_PKT_LOCK);
761 lock_pkt->pkt_flags = MPIDI_CH3_PKT_FLAG_NONE;
762 if (lock_type == MPI_LOCK_SHARED)
763 lock_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED;
764 else {
765 MPIR_Assert(lock_type == MPI_LOCK_EXCLUSIVE);
766 lock_pkt->pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE;
767 }
768
769 new_ptr = MPIDI_CH3I_Win_target_lock_entry_alloc(win_ptr, &pkt);
770 if (new_ptr == NULL) {
771 mpi_errno = handle_lock_ack(win_ptr, win_ptr->comm_ptr->rank,
772 MPIDI_CH3_PKT_FLAG_RMA_LOCK_DISCARDED);
773 MPIR_ERR_CHECK(mpi_errno);
774 goto fn_exit;
775 }
776 DL_APPEND((*head_ptr), new_ptr);
777 MPIDI_Comm_get_vc_set_active(win_ptr->comm_ptr, win_ptr->comm_ptr->rank, &my_vc);
778 new_ptr->vc = my_vc;
779
780 new_ptr->all_data_recved = 1;
781 }
782
783 fn_exit:
784 MPIR_T_PVAR_TIMER_END(RMA, rma_winlock_getlocallock);
785 MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_ACQUIRE_LOCAL_LOCK);
786 return mpi_errno;
787 /* --BEGIN ERROR HANDLING-- */
788 fn_fail:
789 goto fn_exit;
790 /* --END ERROR HANDLING-- */
791 }
792
793
MPIDI_CH3I_RMA_Handle_ack(MPIR_Win * win_ptr,int target_rank)794 static inline int MPIDI_CH3I_RMA_Handle_ack(MPIR_Win * win_ptr, int target_rank)
795 {
796 int mpi_errno = MPI_SUCCESS;
797 MPIDI_RMA_Target_t *t;
798
799 mpi_errno = MPIDI_CH3I_Win_find_target(win_ptr, target_rank, &t);
800 MPIR_ERR_CHECK(mpi_errno);
801
802 t->sync.outstanding_acks--;
803 MPIR_Assert(t->sync.outstanding_acks >= 0);
804
805 win_ptr->outstanding_acks--;
806 MPIR_Assert(win_ptr->outstanding_acks >= 0);
807
808 fn_exit:
809 return mpi_errno;
810 fn_fail:
811 goto fn_exit;
812 }
813
814
do_accumulate_op(void * source_buf,int source_count,MPI_Datatype source_dtp,void * target_buf,int target_count,MPI_Datatype target_dtp,MPI_Aint stream_offset,MPI_Op acc_op,MPIDI_RMA_Acc_srcbuf_kind_t srckind)815 static inline int do_accumulate_op(void *source_buf, int source_count, MPI_Datatype source_dtp,
816 void *target_buf, int target_count, MPI_Datatype target_dtp,
817 MPI_Aint stream_offset, MPI_Op acc_op,
818 MPIDI_RMA_Acc_srcbuf_kind_t srckind)
819 {
820 int mpi_errno = MPI_SUCCESS;
821 MPI_User_function *uop = NULL;
822 MPI_Aint source_dtp_size = 0, source_dtp_extent = 0;
823 int is_empty_source = FALSE;
824 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_DO_ACCUMULATE_OP);
825
826 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_DO_ACCUMULATE_OP);
827
828 /* first Judge if source buffer is empty */
829 if (acc_op == MPI_NO_OP)
830 is_empty_source = TRUE;
831
832 if (is_empty_source == FALSE) {
833 MPIR_Assert(MPIR_DATATYPE_IS_PREDEFINED(source_dtp));
834 MPIR_Datatype_get_size_macro(source_dtp, source_dtp_size);
835 MPIR_Datatype_get_extent_macro(source_dtp, source_dtp_extent);
836 }
837
838 if ((HANDLE_IS_BUILTIN(acc_op))
839 && ((*MPIR_OP_HDL_TO_DTYPE_FN(acc_op)) (source_dtp) == MPI_SUCCESS)){
840 /* get the function by indexing into the op table */
841 uop = MPIR_OP_HDL_TO_FN(acc_op);
842 }
843 else {
844 /* --BEGIN ERROR HANDLING-- */
845 mpi_errno = MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
846 __func__, __LINE__, MPI_ERR_OP,
847 "**opnotpredefined", "**opnotpredefined %d", acc_op);
848 return mpi_errno;
849 /* --END ERROR HANDLING-- */
850 }
851
852
853 if (is_empty_source == TRUE || MPIR_DATATYPE_IS_PREDEFINED(target_dtp)) {
854 /* directly apply op if target dtp is predefined dtp OR source buffer is empty */
855 MPI_Aint real_stream_offset;
856 void *curr_target_buf;
857
858 if (is_empty_source == FALSE) {
859 MPIR_Assert(source_dtp == target_dtp);
860 real_stream_offset = (stream_offset / source_dtp_size) * source_dtp_extent;
861 curr_target_buf = (void *) ((char *) target_buf + real_stream_offset);
862 }
863 else {
864 curr_target_buf = target_buf;
865 }
866
867 (*uop) (source_buf, curr_target_buf, &source_count, &source_dtp);
868 }
869 else {
870 /* derived datatype */
871 struct iovec *typerep_vec;
872 int i, count;
873 MPI_Aint vec_len, type_extent, type_size, src_type_stride;
874 MPI_Datatype type;
875 MPIR_Datatype*dtp;
876 MPI_Aint curr_len;
877 void *curr_loc;
878 int accumulated_count;
879
880 MPIR_Datatype_get_ptr(target_dtp, dtp);
881 vec_len = dtp->typerep.num_contig_blocks * target_count + 1;
882 /* +1 needed because Rob says so */
883 typerep_vec = (struct iovec *)
884 MPL_malloc(vec_len * sizeof(struct iovec), MPL_MEM_DATATYPE);
885 /* --BEGIN ERROR HANDLING-- */
886 if (!typerep_vec) {
887 mpi_errno =
888 MPIR_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE, __func__, __LINE__,
889 MPI_ERR_OTHER, "**nomem", 0);
890 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_DO_ACCUMULATE_OP);
891 return mpi_errno;
892 }
893 /* --END ERROR HANDLING-- */
894
895 MPI_Aint max_iov_len = vec_len;
896 MPI_Aint actual_iov_bytes;
897 MPIR_Typerep_to_iov(NULL, target_count, target_dtp, stream_offset, typerep_vec, max_iov_len,
898 source_count * source_dtp_size, &vec_len, &actual_iov_bytes);
899
900 type = dtp->basic_type;
901 MPIR_Assert(type != MPI_DATATYPE_NULL);
902
903 MPIR_Assert(type == source_dtp);
904 type_size = source_dtp_size;
905 type_extent = source_dtp_extent;
906 /* If the source buffer has been packed by the caller, the distance between
907 * two elements can be smaller than extent. E.g., predefined pairtype may
908 * have larger extent than size.*/
909 if (srckind == MPIDI_RMA_ACC_SRCBUF_PACKED)
910 src_type_stride = type_size;
911 else
912 src_type_stride = type_extent;
913
914 i = 0;
915 curr_loc = typerep_vec[0].iov_base;
916 curr_len = typerep_vec[0].iov_len;
917 accumulated_count = 0;
918 while (i != vec_len) {
919 if (curr_len < type_size) {
920 MPIR_Assert(i != vec_len);
921 i++;
922 curr_len += typerep_vec[i].iov_len;
923 continue;
924 }
925
926 MPIR_Assign_trunc(count, curr_len / type_size, int);
927
928 (*uop) ((char *) source_buf + src_type_stride * accumulated_count,
929 (char *) target_buf + MPIR_Ptr_to_aint(curr_loc), &count, &type);
930
931 if (curr_len % type_size == 0) {
932 i++;
933 if (i != vec_len) {
934 curr_loc = typerep_vec[i].iov_base;
935 curr_len = typerep_vec[i].iov_len;
936 }
937 }
938 else {
939 curr_loc = (void *) ((char *) curr_loc + type_extent * count);
940 curr_len -= type_size * count;
941 }
942
943 accumulated_count += count;
944 }
945
946 MPL_free(typerep_vec);
947 }
948
949 fn_exit:
950 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_DO_ACCUMULATE_OP);
951
952 return mpi_errno;
953 fn_fail:
954 goto fn_exit;
955 }
956
957
check_piggyback_lock(MPIR_Win * win_ptr,MPIDI_VC_t * vc,MPIDI_CH3_Pkt_t * pkt,void * data,intptr_t * buflen,int * acquire_lock_fail,MPIR_Request ** reqp)958 static inline int check_piggyback_lock(MPIR_Win * win_ptr, MPIDI_VC_t * vc,
959 MPIDI_CH3_Pkt_t * pkt, void * data,
960 intptr_t * buflen,
961 int *acquire_lock_fail, MPIR_Request ** reqp)
962 {
963 int lock_type;
964 int pkt_flags;
965 int mpi_errno = MPI_SUCCESS;
966
967 (*acquire_lock_fail) = 0;
968 (*reqp) = NULL;
969
970 MPIDI_CH3_PKT_RMA_GET_FLAGS((*pkt), pkt_flags, mpi_errno);
971 if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED || pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
972
973 if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED)
974 lock_type = MPI_LOCK_SHARED;
975 else {
976 MPIR_Assert(pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE);
977 lock_type = MPI_LOCK_EXCLUSIVE;
978 }
979
980 if (MPIDI_CH3I_Try_acquire_win_lock(win_ptr, lock_type) == 0) {
981 /* cannot acquire the lock, queue up this operation. */
982 mpi_errno = enqueue_lock_origin(win_ptr, vc, pkt, data, buflen, reqp);
983 MPIR_ERR_CHECK(mpi_errno);
984 (*acquire_lock_fail) = 1;
985 }
986 }
987
988 fn_exit:
989 return mpi_errno;
990 fn_fail:
991 goto fn_exit;
992 }
993
finish_op_on_target(MPIR_Win * win_ptr,MPIDI_VC_t * vc,int has_response_data,int pkt_flags,MPI_Win source_win_handle)994 static inline int finish_op_on_target(MPIR_Win * win_ptr, MPIDI_VC_t * vc,
995 int has_response_data,
996 int pkt_flags, MPI_Win source_win_handle)
997 {
998 int mpi_errno = MPI_SUCCESS;
999
1000 if (!has_response_data) {
1001 /* This is PUT or ACC */
1002 if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
1003 pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
1004 int flags = MPIDI_CH3_PKT_FLAG_RMA_LOCK_GRANTED;
1005 if ((pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) ||
1006 (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK)) {
1007 flags |= MPIDI_CH3_PKT_FLAG_RMA_ACK;
1008 }
1009 MPIR_Assert(source_win_handle != MPI_WIN_NULL);
1010 mpi_errno = MPIDI_CH3I_Send_lock_op_ack_pkt(vc, win_ptr, flags,
1011 source_win_handle, MPI_REQUEST_NULL);
1012 MPIR_ERR_CHECK(mpi_errno);
1013 MPIDI_CH3_Progress_signal_completion();
1014 }
1015 if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_FLUSH) {
1016 if (!(pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
1017 pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)) {
1018 /* If op is piggybacked with both LOCK and FLUSH,
1019 * we only send LOCK ACK back, do not send FLUSH ACK. */
1020 mpi_errno = MPIDI_CH3I_Send_ack_pkt(vc, win_ptr, source_win_handle);
1021 MPIR_ERR_CHECK(mpi_errno);
1022 }
1023 MPIDI_CH3_Progress_signal_completion();
1024 }
1025 if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER) {
1026 win_ptr->at_completion_counter--;
1027 MPIR_Assert(win_ptr->at_completion_counter >= 0);
1028 /* Signal the local process when the op counter reaches 0. */
1029 if (win_ptr->at_completion_counter == 0)
1030 MPIDI_CH3_Progress_signal_completion();
1031 }
1032 if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK) {
1033 if (!(pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
1034 pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE)) {
1035 /* If op is piggybacked with both LOCK and UNLOCK,
1036 * we only send LOCK ACK back, do not send FLUSH (UNLOCK) ACK. */
1037 mpi_errno = MPIDI_CH3I_Send_ack_pkt(vc, win_ptr, source_win_handle);
1038 MPIR_ERR_CHECK(mpi_errno);
1039 }
1040 mpi_errno = MPIDI_CH3I_Release_lock(win_ptr);
1041 MPIR_ERR_CHECK(mpi_errno);
1042 MPIDI_CH3_Progress_signal_completion();
1043 }
1044 }
1045 else {
1046 /* This is GACC / GET / CAS / FOP */
1047
1048 if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_UNLOCK) {
1049 mpi_errno = MPIDI_CH3I_Release_lock(win_ptr);
1050 MPIR_ERR_CHECK(mpi_errno);
1051 MPIDI_CH3_Progress_signal_completion();
1052 }
1053
1054 if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER) {
1055 win_ptr->at_completion_counter--;
1056 MPIR_Assert(win_ptr->at_completion_counter >= 0);
1057 /* Signal the local process when the op counter reaches 0. */
1058 if (win_ptr->at_completion_counter == 0)
1059 MPIDI_CH3_Progress_signal_completion();
1060 }
1061 }
1062
1063 fn_exit:
1064 return mpi_errno;
1065 fn_fail:
1066 goto fn_exit;
1067 }
1068
1069
fill_ranks_in_win_grp(MPIR_Win * win_ptr,MPIR_Group * group_ptr,int * ranks_in_win_grp)1070 static inline int fill_ranks_in_win_grp(MPIR_Win * win_ptr, MPIR_Group * group_ptr,
1071 int *ranks_in_win_grp)
1072 {
1073 int mpi_errno = MPI_SUCCESS;
1074 int i, *ranks_in_grp;
1075 MPIR_Group *win_grp_ptr;
1076 MPIR_CHKLMEM_DECL(1);
1077 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_FILL_RANKS_IN_WIN_GRP);
1078
1079 MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_FILL_RANKS_IN_WIN_GRP);
1080
1081 MPIR_CHKLMEM_MALLOC(ranks_in_grp, int *, group_ptr->size * sizeof(int),
1082 mpi_errno, "ranks_in_grp", MPL_MEM_RMA);
1083 for (i = 0; i < group_ptr->size; i++)
1084 ranks_in_grp[i] = i;
1085
1086 mpi_errno = MPIR_Comm_group_impl(win_ptr->comm_ptr, &win_grp_ptr);
1087 MPIR_ERR_CHECK(mpi_errno);
1088
1089 mpi_errno = MPIR_Group_translate_ranks_impl(group_ptr, group_ptr->size,
1090 ranks_in_grp, win_grp_ptr, ranks_in_win_grp);
1091 MPIR_ERR_CHECK(mpi_errno);
1092
1093 mpi_errno = MPIR_Group_free_impl(win_grp_ptr);
1094 MPIR_ERR_CHECK(mpi_errno);
1095
1096 fn_exit:
1097 MPIR_CHKLMEM_FREEALL();
1098 MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_FILL_RANKS_IN_WIN_GRP);
1099 return mpi_errno;
1100 fn_fail:
1101 goto fn_exit;
1102 }
1103
1104
wait_progress_engine(void)1105 static inline int wait_progress_engine(void)
1106 {
1107 int mpi_errno = MPI_SUCCESS;
1108 MPID_Progress_state progress_state;
1109
1110 MPID_Progress_start(&progress_state);
1111 mpi_errno = MPID_Progress_wait(&progress_state);
1112 /* --BEGIN ERROR HANDLING-- */
1113 if (mpi_errno != MPI_SUCCESS) {
1114 MPID_Progress_end(&progress_state);
1115 MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**winnoprogress");
1116 }
1117 /* --END ERROR HANDLING-- */
1118 MPID_Progress_end(&progress_state);
1119
1120 fn_exit:
1121 return mpi_errno;
1122 fn_fail:
1123 goto fn_exit;
1124 }
1125
poke_progress_engine(void)1126 static inline int poke_progress_engine(void)
1127 {
1128 int mpi_errno = MPI_SUCCESS;
1129 MPID_Progress_state progress_state;
1130
1131 MPID_Progress_start(&progress_state);
1132 mpi_errno = MPID_Progress_poke();
1133 MPIR_ERR_CHECK(mpi_errno);
1134 MPID_Progress_end(&progress_state);
1135
1136 fn_exit:
1137 return mpi_errno;
1138 fn_fail:
1139 goto fn_exit;
1140 }
1141
MPIDI_CH3_ExtPkt_Accum_get_stream(int pkt_flags,int is_derived_dt,void * ext_hdr_ptr,MPI_Aint * stream_offset)1142 static inline void MPIDI_CH3_ExtPkt_Accum_get_stream(int pkt_flags,
1143 int is_derived_dt, void *ext_hdr_ptr,
1144 MPI_Aint * stream_offset)
1145 {
1146 if (pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) {
1147 MPIR_Assert(ext_hdr_ptr != NULL);
1148 (*stream_offset) = ((MPIDI_CH3_Ext_pkt_stream_t *) ext_hdr_ptr)->stream_offset;
1149 }
1150 }
1151
MPIDI_CH3_ExtPkt_Gaccum_get_stream(int pkt_flags,int is_derived_dt,void * ext_hdr_ptr,MPI_Aint * stream_offset)1152 static inline void MPIDI_CH3_ExtPkt_Gaccum_get_stream(int pkt_flags,
1153 int is_derived_dt, void *ext_hdr_ptr,
1154 MPI_Aint * stream_offset)
1155 {
1156 /* We do not check packet match here, because error must have already been
1157 * reported at header init time (on origin) and at packet receive time (on target). */
1158 MPIDI_CH3_ExtPkt_Accum_get_stream(pkt_flags, is_derived_dt, ext_hdr_ptr, stream_offset);
1159 }
1160
1161 #endif /* MPIDRMA_H_INCLUDED */
1162