1 /*
2 * Copyright (C) by Argonne National Laboratory
3 * See COPYRIGHT in top-level directory
4 */
5
6 #ifndef MPID_RMA_ISSUE_H_INCLUDED
7 #define MPID_RMA_ISSUE_H_INCLUDED
8
9 #include "utlist.h"
10 #include "mpid_rma_types.h"
11
12 /* =========================================================== */
13 /* auxiliary functions */
14 /* =========================================================== */
15
16 /* immed_copy() copys data from origin buffer to
17 IMMED packet header. */
immed_copy(void * src,void * dest,size_t len)18 static inline int immed_copy(void *src, void *dest, size_t len)
19 {
20 int mpi_errno = MPI_SUCCESS;
21 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_IMMED_COPY);
22
23 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_IMMED_COPY);
24
25 if (src == NULL || dest == NULL || len == 0)
26 goto fn_exit;
27
28 switch (len) {
29 case 1:
30 *(uint8_t *) dest = *(uint8_t *) src;
31 break;
32 #ifndef NEEDS_STRICT_ALIGNMENT
33 /* Following copy is unsafe on platforms that require strict
34 * alignment (e.g., SPARC). Because the buffers may not be aligned
35 * for data type access except char. */
36 case 2:
37 *(uint16_t *) dest = *(uint16_t *) src;
38 break;
39 case 4:
40 *(uint32_t *) dest = *(uint32_t *) src;
41 break;
42 case 8:
43 *(uint64_t *) dest = *(uint64_t *) src;
44 break;
45 #endif
46 default:
47 MPIR_Memcpy(dest, (void *) src, len);
48 }
49
50 fn_exit:
51 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_IMMED_COPY);
52 return mpi_errno;
53 fn_fail:
54 goto fn_exit;
55 }
56
57 /* =========================================================== */
58 /* extended packet functions */
59 /* =========================================================== */
60
61 /* Set extended header for ACC operation and return its real size. */
init_stream_dtype_ext_pkt(int pkt_flags,MPIR_Datatype * target_dtp,intptr_t stream_offset,void ** ext_hdr_ptr,MPI_Aint * ext_hdr_sz,int * flattened_type_size)62 static int init_stream_dtype_ext_pkt(int pkt_flags,
63 MPIR_Datatype* target_dtp, intptr_t stream_offset,
64 void **ext_hdr_ptr, MPI_Aint * ext_hdr_sz, int *flattened_type_size)
65 {
66 MPI_Aint _total_sz = 0, stream_hdr_sz = 0;
67 void *flattened_type, *total_hdr;
68 int mpi_errno = MPI_SUCCESS;
69
70 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_INIT_ACCUM_EXT_PKT);
71 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_INIT_ACCUM_EXT_PKT);
72
73 /*
74 * The extended header consists of two parts:
75 *
76 * 1. Stream header: if the size of the data is large and needs
77 * to be chunked into multiple pieces.
78 *
79 * 2. Flattened datatype: if the target is a derived datatype.
80 */
81
82 if ((pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM))
83 stream_hdr_sz = sizeof(MPIDI_CH3_Ext_pkt_stream_t);
84 else
85 stream_hdr_sz = 0;
86
87 if (target_dtp != NULL)
88 MPIR_Typerep_flatten_size(target_dtp, flattened_type_size);
89 else
90 *flattened_type_size = 0;
91
92 _total_sz = stream_hdr_sz + *flattened_type_size;
93 if (_total_sz) {
94 total_hdr = MPL_malloc(_total_sz, MPL_MEM_RMA);
95 if (total_hdr == NULL) {
96 MPIR_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem",
97 "**nomem %d", _total_sz);
98 }
99 MPL_VG_MEM_INIT(total_hdr, _total_sz);
100 }
101 else {
102 total_hdr = NULL;
103 }
104
105 if ((pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM)) {
106 ((MPIDI_CH3_Ext_pkt_stream_t *) total_hdr)->stream_offset = stream_offset;
107 }
108 if (target_dtp != NULL) {
109 flattened_type = (void *) ((char *) total_hdr + stream_hdr_sz);
110 MPIR_Typerep_flatten(target_dtp, flattened_type);
111 }
112
113 (*ext_hdr_ptr) = total_hdr;
114 (*ext_hdr_sz) = _total_sz;
115
116 fn_exit:
117 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_INIT_ACCUM_EXT_PKT);
118 return mpi_errno;
119 fn_fail:
120 MPL_free((*ext_hdr_ptr));
121 (*ext_hdr_ptr) = NULL;
122 (*ext_hdr_sz) = 0;
123 goto fn_exit;
124 }
125
126 /* =========================================================== */
127 /* issuinng functions */
128 /* =========================================================== */
129
130 /* issue_from_origin_buffer() issues data from origin
131 buffer (i.e. non-IMMED operation). */
issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op,MPIDI_VC_t * vc,void * ext_hdr_ptr,MPI_Aint ext_hdr_sz,intptr_t stream_offset,intptr_t stream_size,MPIR_Request ** req_ptr)132 static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
133 void *ext_hdr_ptr, MPI_Aint ext_hdr_sz,
134 intptr_t stream_offset, intptr_t stream_size,
135 MPIR_Request ** req_ptr)
136 {
137 MPI_Datatype target_datatype;
138 MPIR_Datatype*target_dtp = NULL, *origin_dtp = NULL;
139 int is_origin_contig;
140 struct iovec iov[MPL_IOV_LIMIT];
141 int iovcnt = 0;
142 MPIR_Request *req = NULL;
143 MPI_Aint dt_true_lb;
144 int pkt_flags;
145 int is_empty_origin = FALSE;
146 int mpi_errno = MPI_SUCCESS;
147 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
148
149 MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
150
151 /* Judge if origin buffer is empty (this can only happens for
152 * GACC and FOP when op is MPI_NO_OP). */
153 if ((rma_op->pkt).type == MPIDI_CH3_PKT_GET_ACCUM || (rma_op->pkt).type == MPIDI_CH3_PKT_FOP) {
154 MPI_Op op;
155 MPIDI_CH3_PKT_RMA_GET_OP(rma_op->pkt, op, mpi_errno);
156 if (op == MPI_NO_OP)
157 is_empty_origin = TRUE;
158 }
159
160 /* Judge if target datatype is derived datatype. */
161 MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE(rma_op->pkt, target_datatype, mpi_errno);
162 if (!MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) {
163 MPIR_Datatype_get_ptr(target_datatype, target_dtp);
164 }
165
166 if (is_empty_origin == FALSE) {
167 /* Judge if origin datatype is derived datatype. */
168 if (!MPIR_DATATYPE_IS_PREDEFINED(rma_op->origin_datatype)) {
169 MPIR_Datatype_get_ptr(rma_op->origin_datatype, origin_dtp);
170 }
171
172 /* check if origin data is contiguous and get true lb */
173 MPIR_Datatype_is_contig(rma_op->origin_datatype, &is_origin_contig);
174 MPIR_Datatype_get_true_lb(rma_op->origin_datatype, &dt_true_lb);
175 }
176 else {
177 /* origin buffer is empty, mark origin data as contig and true_lb as 0. */
178 is_origin_contig = 1;
179 dt_true_lb = 0;
180 }
181
182 iov[iovcnt].iov_base = (void *) & (rma_op->pkt);
183 iov[iovcnt].iov_len = sizeof(rma_op->pkt);
184 iovcnt++;
185
186 MPIDI_CH3_PKT_RMA_GET_FLAGS(rma_op->pkt, pkt_flags, mpi_errno);
187 if (!(pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) && target_dtp == NULL && is_origin_contig) {
188 /* Fast path --- use iStartMsgv() to issue the data, which does not need a request
189 * to be passed in:
190 * (1) non-streamed op (do not need to send extended packet header);
191 * (2) target datatype is predefined (do not need to send derived datatype info);
192 * (3) origin datatype is contiguous (do not need to pack the data and send);
193 */
194
195 if (is_empty_origin == FALSE) {
196 iov[iovcnt].iov_base =
197 (void *) ((char *) rma_op->origin_addr + dt_true_lb + stream_offset);
198 iov[iovcnt].iov_len = stream_size;
199 iovcnt++;
200 }
201
202 MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
203 mpi_errno = MPIDI_CH3_iStartMsgv(vc, iov, iovcnt, &req);
204 MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
205 MPIR_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
206
207 if (origin_dtp != NULL) {
208 if (req == NULL) {
209 MPIR_Datatype_ptr_release(origin_dtp);
210 }
211 else {
212 /* this will cause the datatype to be freed when the request
213 * is freed. */
214 req->dev.datatype_ptr = origin_dtp;
215 }
216 }
217
218 goto fn_exit;
219 }
220
221 /* Normal path: use iSendv() and sendNoncontig_fn() to issue the data, which
222 * always need a request to be passed in. */
223
224 /* create a new request */
225 req = MPIR_Request_create(MPIR_REQUEST_KIND__SEND);
226 MPIR_ERR_CHKANDJUMP(req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
227
228 MPIR_Object_set_ref(req, 2);
229
230 /* set extended packet header, it is freed when the request is freed. */
231 if (ext_hdr_sz > 0) {
232 req->dev.ext_hdr_sz = ext_hdr_sz;
233 req->dev.ext_hdr_ptr = ext_hdr_ptr;
234 req->dev.flattened_type = NULL;
235
236 iov[iovcnt].iov_base = (void *) req->dev.ext_hdr_ptr;
237 iov[iovcnt].iov_len = ext_hdr_sz;
238 iovcnt++;
239 }
240
241 if (origin_dtp != NULL) {
242 req->dev.datatype_ptr = origin_dtp;
243 /* this will cause the datatype to be freed when the request
244 * is freed. */
245 }
246
247 if (is_origin_contig) {
248 /* origin data is contiguous */
249 if (is_empty_origin == FALSE) {
250 iov[iovcnt].iov_base =
251 (void *) ((char *) rma_op->origin_addr + dt_true_lb + stream_offset);
252 iov[iovcnt].iov_len = stream_size;
253 iovcnt++;
254 }
255
256 MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
257 mpi_errno = MPIDI_CH3_iSendv(vc, req, iov, iovcnt);
258 MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
259 MPIR_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
260 }
261 else {
262 /* origin data is non-contiguous */
263 req->dev.user_buf = rma_op->origin_addr;
264 req->dev.user_count = rma_op->origin_count;
265 req->dev.datatype = rma_op->origin_datatype;
266 req->dev.msg_offset = stream_offset;
267 req->dev.msgsize = stream_offset + stream_size;
268
269 req->dev.OnFinal = 0;
270 req->dev.OnDataAvail = 0;
271
272 MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
273 mpi_errno = vc->sendNoncontig_fn(vc, req, iov[0].iov_base, iov[0].iov_len,
274 &iov[1], iovcnt - 1);
275 MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
276 MPIR_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
277 }
278
279 fn_exit:
280 /* release the target datatype */
281 if (target_dtp)
282 MPIR_Datatype_ptr_release(target_dtp);
283 (*req_ptr) = req;
284
285 MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
286 return mpi_errno;
287 fn_fail:
288 if (req) {
289 if (req->dev.datatype_ptr)
290 MPIR_Datatype_ptr_release(req->dev.datatype_ptr);
291 MPL_free(req->dev.ext_hdr_ptr);
292 MPIR_Request_free(req);
293 }
294
295 (*req_ptr) = NULL;
296 goto fn_exit;
297 }
298
299
300 /* issue_put_op() issues PUT packet header and data. */
issue_put_op(MPIDI_RMA_Op_t * rma_op,MPIR_Win * win_ptr,MPIDI_RMA_Target_t * target_ptr,int pkt_flags)301 static int issue_put_op(MPIDI_RMA_Op_t * rma_op, MPIR_Win * win_ptr,
302 MPIDI_RMA_Target_t * target_ptr, int pkt_flags)
303 {
304 MPIDI_VC_t *vc = NULL;
305 MPIR_Comm *comm_ptr = win_ptr->comm_ptr;
306 MPIDI_CH3_Pkt_put_t *put_pkt = &rma_op->pkt.put;
307 MPIR_Request *curr_req = NULL;
308 MPI_Datatype target_datatype;
309 MPIR_Datatype*target_dtp_ptr = NULL;
310 int mpi_errno = MPI_SUCCESS;
311 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_ISSUE_PUT_OP);
312
313 MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_ISSUE_PUT_OP);
314
315 put_pkt->pkt_flags |= pkt_flags;
316
317 MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
318
319 if (rma_op->pkt.type == MPIDI_CH3_PKT_PUT_IMMED) {
320 /* All origin data is in packet header, issue the header. */
321 MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
322 mpi_errno = MPIDI_CH3_iStartMsg(vc, put_pkt, sizeof(*put_pkt), &curr_req);
323 MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
324 MPIR_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
325 }
326 else {
327 MPI_Aint origin_type_size;
328 void *ext_hdr_ptr = NULL;
329 MPI_Aint ext_hdr_sz = 0;
330 MPIR_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
331
332 /* If derived datatype on target, add extended packet header. */
333 MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE(rma_op->pkt, target_datatype, mpi_errno);
334 if (!MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) {
335 MPIR_Datatype_get_ptr(target_datatype, target_dtp_ptr);
336 MPIR_Typerep_flatten_size(target_dtp_ptr, &put_pkt->info.flattened_type_size);
337
338 ext_hdr_ptr = MPL_malloc(put_pkt->info.flattened_type_size, MPL_MEM_RMA);
339 if (ext_hdr_ptr == NULL) {
340 MPIR_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem",
341 "**nomem %d", put_pkt->info.flattened_type_size);
342 }
343 MPL_VG_MEM_INIT(ext_hdr_ptr, put_pkt->info.flattened_type_size);
344
345 MPIR_Typerep_flatten(target_dtp_ptr, ext_hdr_ptr);
346 ext_hdr_sz = put_pkt->info.flattened_type_size;
347 }
348
349 mpi_errno = issue_from_origin_buffer(rma_op, vc, ext_hdr_ptr, ext_hdr_sz,
350 0, rma_op->origin_count * origin_type_size, &curr_req);
351 MPIR_ERR_CHECK(mpi_errno);
352 }
353
354 if (curr_req != NULL) {
355 rma_op->reqs_size = 1;
356
357 rma_op->single_req = curr_req;
358 }
359
360 fn_exit:
361 MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_ISSUE_PUT_OP);
362 return mpi_errno;
363 /* --BEGIN ERROR HANDLING-- */
364 fn_fail:
365 rma_op->single_req = NULL;
366 rma_op->reqs_size = 0;
367 goto fn_exit;
368 /* --END ERROR HANDLING-- */
369 }
370
371 #define ALL_STREAM_UNITS_ISSUED (-1)
372
373 /* issue_acc_op() send ACC packet header and data. */
issue_acc_op(MPIDI_RMA_Op_t * rma_op,MPIR_Win * win_ptr,MPIDI_RMA_Target_t * target_ptr,int pkt_flags)374 static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPIR_Win * win_ptr,
375 MPIDI_RMA_Target_t * target_ptr, int pkt_flags)
376 {
377 MPIDI_VC_t *vc = NULL;
378 MPIR_Comm *comm_ptr = win_ptr->comm_ptr;
379 MPIDI_CH3_Pkt_accum_t *accum_pkt = &rma_op->pkt.accum;
380 int i, j;
381 MPI_Aint stream_elem_count, stream_unit_count;
382 MPI_Aint predefined_dtp_size, predefined_dtp_extent, predefined_dtp_count;
383 MPI_Aint total_len, rest_len;
384 MPI_Aint origin_dtp_size;
385 MPIR_Datatype*origin_dtp_ptr = NULL;
386 MPIR_Datatype*target_dtp_ptr = NULL;
387 void *ext_hdr_ptr = NULL;
388 MPI_Aint ext_hdr_sz = 0;
389 int mpi_errno = MPI_SUCCESS;
390 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_ISSUE_ACC_OP);
391
392 MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_ISSUE_ACC_OP);
393
394 MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
395
396 if (rma_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
397 MPIR_Request *curr_req = NULL;
398
399 accum_pkt->pkt_flags |= pkt_flags;
400
401 /* All origin data is in packet header, issue the header. */
402 MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
403 mpi_errno = MPIDI_CH3_iStartMsg(vc, accum_pkt, sizeof(*accum_pkt), &curr_req);
404 MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
405 MPIR_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
406
407 if (curr_req != NULL) {
408 MPIR_Assert(rma_op->reqs_size == 0 && rma_op->single_req == NULL);
409
410 rma_op->reqs_size = 1;
411 rma_op->single_req = curr_req;
412 }
413 goto fn_exit;
414 }
415
416 /* Get total length of origin data */
417 MPIR_Datatype_get_size_macro(rma_op->origin_datatype, origin_dtp_size);
418 total_len = origin_dtp_size * rma_op->origin_count;
419
420 /* Get size and count for predefined datatype elements */
421 if (MPIR_DATATYPE_IS_PREDEFINED(rma_op->origin_datatype)) {
422 predefined_dtp_size = origin_dtp_size;
423 predefined_dtp_count = rma_op->origin_count;
424 MPIR_Datatype_get_extent_macro(rma_op->origin_datatype, predefined_dtp_extent);
425 }
426 else {
427 MPIR_Datatype_get_ptr(rma_op->origin_datatype, origin_dtp_ptr);
428 MPIR_Assert(origin_dtp_ptr != NULL && origin_dtp_ptr->basic_type != MPI_DATATYPE_NULL);
429 MPIR_Datatype_get_size_macro(origin_dtp_ptr->basic_type, predefined_dtp_size);
430 predefined_dtp_count = total_len / predefined_dtp_size;
431 MPIR_Datatype_get_extent_macro(origin_dtp_ptr->basic_type, predefined_dtp_extent);
432 }
433 MPIR_Assert(predefined_dtp_count > 0 && predefined_dtp_size > 0 && predefined_dtp_extent > 0);
434
435 /* Calculate number of predefined elements in each stream unit, and
436 * total number of stream units. */
437 stream_elem_count = MPIDI_CH3U_Acc_stream_size / predefined_dtp_extent;
438 stream_unit_count = (predefined_dtp_count - 1) / stream_elem_count + 1;
439 MPIR_Assert(stream_elem_count > 0 && stream_unit_count > 0);
440
441 /* If there are more than one stream unit, mark the current packet
442 * as stream packet */
443 if (stream_unit_count > 1)
444 pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_STREAM;
445
446 /* Get target datatype */
447 if (!MPIR_DATATYPE_IS_PREDEFINED(accum_pkt->datatype))
448 MPIR_Datatype_get_ptr(accum_pkt->datatype, target_dtp_ptr);
449
450 rest_len = total_len;
451 MPIR_Assert(rma_op->issued_stream_count >= 0);
452 for (j = 0; j < stream_unit_count; j++) {
453 intptr_t stream_offset, stream_size;
454 MPIR_Request *curr_req = NULL;
455
456 if (j < rma_op->issued_stream_count)
457 continue;
458
459 accum_pkt->pkt_flags |= pkt_flags;
460
461 if (j != 0) {
462 accum_pkt->pkt_flags &= ~MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED;
463 accum_pkt->pkt_flags &= ~MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE;
464 }
465 if (j != stream_unit_count - 1) {
466 accum_pkt->pkt_flags &= ~MPIDI_CH3_PKT_FLAG_RMA_UNLOCK;
467 accum_pkt->pkt_flags &= ~MPIDI_CH3_PKT_FLAG_RMA_FLUSH;
468 accum_pkt->pkt_flags &= ~MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER;
469 }
470
471 stream_offset = j * stream_elem_count * predefined_dtp_size;
472 stream_size = MPL_MIN(stream_elem_count * predefined_dtp_size, rest_len);
473 rest_len -= stream_size;
474
475 /* Set extended packet header if needed. */
476 init_stream_dtype_ext_pkt(pkt_flags, target_dtp_ptr, stream_offset, &ext_hdr_ptr, &ext_hdr_sz,
477 &accum_pkt->info.flattened_type_size);
478
479 mpi_errno = issue_from_origin_buffer(rma_op, vc, ext_hdr_ptr, ext_hdr_sz,
480 stream_offset, stream_size, &curr_req);
481 MPIR_ERR_CHECK(mpi_errno);
482
483 if (curr_req != NULL) {
484 if (rma_op->reqs_size == 0) {
485 MPIR_Assert(rma_op->single_req == NULL && rma_op->multi_reqs == NULL);
486 rma_op->reqs_size = stream_unit_count;
487
488 if (stream_unit_count > 1) {
489 rma_op->multi_reqs =
490 (MPIR_Request **) MPL_malloc(sizeof(MPIR_Request *) * rma_op->reqs_size, MPL_MEM_RMA);
491 for (i = 0; i < rma_op->reqs_size; i++)
492 rma_op->multi_reqs[i] = NULL;
493 }
494 }
495
496 if (rma_op->reqs_size == 1)
497 rma_op->single_req = curr_req;
498 else
499 rma_op->multi_reqs[j] = curr_req;
500 }
501
502 rma_op->issued_stream_count++;
503
504 if (accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
505 accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
506 /* if piggybacked with LOCK flag, we
507 * only issue the first streaming unit */
508 MPIR_Assert(j == 0);
509 break;
510 }
511 } /* end of for loop */
512
513 if (rma_op->issued_stream_count == stream_unit_count) {
514 /* Mark that all stream units have been issued */
515 rma_op->issued_stream_count = ALL_STREAM_UNITS_ISSUED;
516 }
517
518 fn_exit:
519 MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_ISSUE_ACC_OP);
520 return mpi_errno;
521 fn_fail:
522 if (rma_op->reqs_size == 1) {
523 rma_op->single_req = NULL;
524 }
525 else if (rma_op->reqs_size > 1) {
526 MPL_free(rma_op->multi_reqs);
527 rma_op->multi_reqs = NULL;
528 }
529 rma_op->reqs_size = 0;
530 goto fn_exit;
531 }
532
533
534 /* issue_get_acc_op() send GACC packet header and data. */
issue_get_acc_op(MPIDI_RMA_Op_t * rma_op,MPIR_Win * win_ptr,MPIDI_RMA_Target_t * target_ptr,int pkt_flags)535 static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPIR_Win * win_ptr,
536 MPIDI_RMA_Target_t * target_ptr, int pkt_flags)
537 {
538 MPIDI_VC_t *vc = NULL;
539 MPIR_Comm *comm_ptr = win_ptr->comm_ptr;
540 MPIDI_CH3_Pkt_get_accum_t *get_accum_pkt = &rma_op->pkt.get_accum;
541 int i, j;
542 MPI_Aint stream_elem_count, stream_unit_count;
543 MPI_Aint predefined_dtp_size, predefined_dtp_count, predefined_dtp_extent;
544 MPI_Aint total_len, rest_len;
545 MPI_Aint target_dtp_size;
546 MPIR_Datatype*target_dtp_ptr = NULL;
547 void *ext_hdr_ptr = NULL;
548 MPI_Aint ext_hdr_sz = 0;
549 int mpi_errno = MPI_SUCCESS;
550 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_ISSUE_GET_ACC_OP);
551
552 MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_ISSUE_GET_ACC_OP);
553
554 MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
555
556 if (rma_op->pkt.type == MPIDI_CH3_PKT_GET_ACCUM_IMMED) {
557 MPIR_Request *resp_req = NULL;
558 MPIR_Request *curr_req = NULL;
559
560 get_accum_pkt->pkt_flags |= pkt_flags;
561
562 rma_op->reqs_size = 1;
563
564 /* Create a request for the GACC response. Store the response buf, count, and
565 * datatype in it, and pass the request's handle in the GACC packet. When the
566 * response comes from the target, it will contain the request handle. */
567 resp_req = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
568 MPIR_ERR_CHKANDJUMP(resp_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
569
570 MPIR_Object_set_ref(resp_req, 2);
571
572 resp_req->dev.user_buf = rma_op->result_addr;
573 resp_req->dev.user_count = rma_op->result_count;
574 resp_req->dev.datatype = rma_op->result_datatype;
575 resp_req->dev.target_win_handle = MPI_WIN_NULL;
576 resp_req->dev.source_win_handle = win_ptr->handle;
577
578 /* Note: Get_accumulate uses the same packet type as accumulate */
579 get_accum_pkt->request_handle = resp_req->handle;
580
581 /* All origin data is in packet header, issue the header. */
582 MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
583 mpi_errno = MPIDI_CH3_iStartMsg(vc, get_accum_pkt, sizeof(*get_accum_pkt), &curr_req);
584 MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
585 MPIR_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
586
587 if (curr_req != NULL) {
588 MPIR_Request_free(curr_req);
589 }
590
591 rma_op->single_req = resp_req;
592
593 goto fn_exit;
594 }
595
596 /* Get total length of target data */
597 MPIR_Datatype_get_size_macro(get_accum_pkt->datatype, target_dtp_size);
598 total_len = target_dtp_size * get_accum_pkt->count;
599
600 /* Get size and count for predefined datatype elements */
601 if (MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype)) {
602 predefined_dtp_size = target_dtp_size;
603 predefined_dtp_count = get_accum_pkt->count;
604 MPIR_Datatype_get_extent_macro(get_accum_pkt->datatype, predefined_dtp_extent);
605 }
606 else {
607 MPIR_Datatype_get_ptr(get_accum_pkt->datatype, target_dtp_ptr);
608 MPIR_Assert(target_dtp_ptr != NULL && target_dtp_ptr->basic_type != MPI_DATATYPE_NULL);
609 MPIR_Datatype_get_size_macro(target_dtp_ptr->basic_type, predefined_dtp_size);
610 predefined_dtp_count = total_len / predefined_dtp_size;
611 MPIR_Datatype_get_extent_macro(target_dtp_ptr->basic_type, predefined_dtp_extent);
612 }
613 MPIR_Assert(predefined_dtp_count > 0 && predefined_dtp_size > 0 && predefined_dtp_extent > 0);
614
615 /* Calculate number of predefined elements in each stream unit, and
616 * total number of stream units. */
617 stream_elem_count = MPIDI_CH3U_Acc_stream_size / predefined_dtp_extent;
618 stream_unit_count = (predefined_dtp_count - 1) / stream_elem_count + 1;
619 MPIR_Assert(stream_elem_count > 0 && stream_unit_count > 0);
620
621 /* If there are more than one stream unit, mark the current packet
622 * as stream packet */
623 if (stream_unit_count > 1)
624 pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_STREAM;
625
626 rest_len = total_len;
627
628 rma_op->reqs_size = stream_unit_count;
629
630 if (rma_op->reqs_size > 1) {
631 rma_op->multi_reqs =
632 (MPIR_Request **) MPL_malloc(sizeof(MPIR_Request *) * rma_op->reqs_size, MPL_MEM_RMA);
633 for (i = 0; i < rma_op->reqs_size; i++)
634 rma_op->multi_reqs[i] = NULL;
635 }
636
637 MPIR_Assert(rma_op->issued_stream_count >= 0);
638
639 for (j = 0; j < stream_unit_count; j++) {
640 intptr_t stream_offset, stream_size;
641 MPIR_Request *resp_req = NULL;
642 MPIR_Request *curr_req = NULL;
643
644 if (j < rma_op->issued_stream_count)
645 continue;
646
647 get_accum_pkt->pkt_flags |= pkt_flags;
648
649 if (j != 0) {
650 get_accum_pkt->pkt_flags &= ~MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED;
651 get_accum_pkt->pkt_flags &= ~MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE;
652 }
653 if (j != stream_unit_count - 1) {
654 get_accum_pkt->pkt_flags &= ~MPIDI_CH3_PKT_FLAG_RMA_UNLOCK;
655 get_accum_pkt->pkt_flags &= ~MPIDI_CH3_PKT_FLAG_RMA_FLUSH;
656 get_accum_pkt->pkt_flags &= ~MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER;
657 }
658
659 /* Create a request for the GACC response. Store the response buf, count, and
660 * datatype in it, and pass the request's handle in the GACC packet. When the
661 * response comes from the target, it will contain the request handle. */
662 resp_req = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
663 MPIR_ERR_CHKANDJUMP(resp_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
664
665 MPIR_Object_set_ref(resp_req, 2);
666
667 resp_req->dev.user_buf = rma_op->result_addr;
668 resp_req->dev.user_count = rma_op->result_count;
669 resp_req->dev.datatype = rma_op->result_datatype;
670 resp_req->dev.target_win_handle = MPI_WIN_NULL;
671 resp_req->dev.source_win_handle = win_ptr->handle;
672 resp_req->dev.pkt_flags = pkt_flags;
673
674 if (!MPIR_DATATYPE_IS_PREDEFINED(resp_req->dev.datatype)) {
675 MPIR_Datatype*result_dtp = NULL;
676 MPIR_Datatype_get_ptr(resp_req->dev.datatype, result_dtp);
677 resp_req->dev.datatype_ptr = result_dtp;
678 /* this will cause the datatype to be freed when the
679 * request is freed. */
680 }
681
682 /* Note: Get_accumulate uses the same packet type as accumulate */
683 get_accum_pkt->request_handle = resp_req->handle;
684
685 stream_offset = j * stream_elem_count * predefined_dtp_size;
686 stream_size = MPL_MIN(stream_elem_count * predefined_dtp_size, rest_len);
687 rest_len -= stream_size;
688
689 /* Set extended packet header if needed. */
690 init_stream_dtype_ext_pkt(pkt_flags, target_dtp_ptr, stream_offset, &ext_hdr_ptr, &ext_hdr_sz,
691 &get_accum_pkt->info.flattened_type_size);
692
693 /* Note: here we need to allocate an extended packet header in response request,
694 * in order to store the stream_offset locally and use it in PktHandler_Get_AccumResp.
695 * This extended packet header only contains stream_offset and does not contain any
696 * other information. */
697 {
698 int dummy;
699 init_stream_dtype_ext_pkt(pkt_flags, NULL /* target_dtp_ptr */ , stream_offset,
700 &(resp_req->dev.ext_hdr_ptr), &(resp_req->dev.ext_hdr_sz),
701 &dummy);
702 }
703
704 mpi_errno = issue_from_origin_buffer(rma_op, vc, ext_hdr_ptr, ext_hdr_sz,
705 stream_offset, stream_size, &curr_req);
706 MPIR_ERR_CHECK(mpi_errno);
707
708 if (curr_req != NULL) {
709 MPIR_Request_free(curr_req);
710 }
711
712 if (rma_op->reqs_size == 1)
713 rma_op->single_req = resp_req;
714 else
715 rma_op->multi_reqs[j] = resp_req;
716
717 rma_op->issued_stream_count++;
718
719 if (get_accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
720 get_accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
721 /* if piggybacked with LOCK flag, we
722 * only issue the first streaming unit */
723 MPIR_Assert(j == 0);
724 break;
725 }
726 } /* end of for loop */
727
728 if (rma_op->issued_stream_count == stream_unit_count) {
729 /* Mark that all stream units have been issued */
730 rma_op->issued_stream_count = ALL_STREAM_UNITS_ISSUED;
731 }
732
733 fn_exit:
734 MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_ISSUE_GET_ACC_OP);
735 return mpi_errno;
736 /* --BEGIN ERROR HANDLING-- */
737 fn_fail:
738 if (rma_op->reqs_size == 1) {
739 /* error case: drop both our reference to the request and the
740 * progress engine's reference to it, since the progress
741 * engine didn't get a chance to see it yet. */
742 MPIR_Request_free(rma_op->single_req);
743 MPIR_Request_free(rma_op->single_req);
744 rma_op->single_req = NULL;
745 }
746 else if (rma_op->reqs_size > 1) {
747 for (i = 0; i < rma_op->reqs_size; i++) {
748 if (rma_op->multi_reqs[i] != NULL) {
749 /* error case: drop both our reference to the request
750 * and the progress engine's reference to it, since
751 * the progress engine didn't get a chance to see it
752 * yet. */
753 MPIR_Request_free(rma_op->multi_reqs[i]);
754 MPIR_Request_free(rma_op->multi_reqs[i]);
755 }
756 }
757 MPL_free(rma_op->multi_reqs);
758 rma_op->multi_reqs = NULL;
759 }
760 rma_op->reqs_size = 0;
761 goto fn_exit;
762 /* --END ERROR HANDLING-- */
763 }
764
765
issue_get_op(MPIDI_RMA_Op_t * rma_op,MPIR_Win * win_ptr,MPIDI_RMA_Target_t * target_ptr,int pkt_flags)766 static int issue_get_op(MPIDI_RMA_Op_t * rma_op, MPIR_Win * win_ptr,
767 MPIDI_RMA_Target_t * target_ptr, int pkt_flags)
768 {
769 MPIDI_CH3_Pkt_get_t *get_pkt = &rma_op->pkt.get;
770 int mpi_errno = MPI_SUCCESS;
771 MPIDI_VC_t *vc;
772 MPIR_Comm *comm_ptr;
773 MPIR_Datatype*dtp;
774 MPI_Datatype target_datatype;
775 MPIR_Request *req = NULL;
776 MPIR_Request *curr_req = NULL;
777 struct iovec iov[MPL_IOV_LIMIT];
778 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_ISSUE_GET_OP);
779
780 MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_ISSUE_GET_OP);
781
782 rma_op->reqs_size = 1;
783
784 /* create a request, store the origin buf, cnt, datatype in it,
785 * and pass a handle to it in the get packet. When the get
786 * response comes from the target, it will contain the request
787 * handle. */
788 curr_req = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
789 if (curr_req == NULL) {
790 MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**nomemreq");
791 }
792
793 MPIR_Object_set_ref(curr_req, 2);
794
795 curr_req->dev.user_buf = rma_op->origin_addr;
796 curr_req->dev.user_count = rma_op->origin_count;
797 curr_req->dev.datatype = rma_op->origin_datatype;
798 curr_req->dev.target_win_handle = MPI_WIN_NULL;
799 curr_req->dev.source_win_handle = win_ptr->handle;
800 if (!MPIR_DATATYPE_IS_PREDEFINED(curr_req->dev.datatype)) {
801 MPIR_Datatype_get_ptr(curr_req->dev.datatype, dtp);
802 curr_req->dev.datatype_ptr = dtp;
803 /* this will cause the datatype to be freed when the
804 * request is freed. */
805 }
806
807 get_pkt->request_handle = curr_req->handle;
808
809 get_pkt->pkt_flags |= pkt_flags;
810
811 comm_ptr = win_ptr->comm_ptr;
812 MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
813
814 MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE(rma_op->pkt, target_datatype, mpi_errno);
815 if (MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) {
816 /* basic datatype on target. simply send the get_pkt. */
817 MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
818 mpi_errno = MPIDI_CH3_iStartMsg(vc, get_pkt, sizeof(*get_pkt), &req);
819 MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
820 }
821 else {
822 /* derived datatype on target. */
823 void *ext_hdr_ptr = NULL;
824 MPI_Aint ext_hdr_sz = 0;
825
826 MPIR_Datatype_get_ptr(target_datatype, dtp);
827 MPIR_Typerep_flatten_size(dtp, &get_pkt->info.flattened_type_size);
828
829 ext_hdr_ptr = MPL_malloc(get_pkt->info.flattened_type_size, MPL_MEM_RMA);
830 if (ext_hdr_ptr == NULL) {
831 MPIR_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem",
832 "**nomem %d", get_pkt->info.flattened_type_size);
833 }
834 MPL_VG_MEM_INIT(ext_hdr_ptr, get_pkt->info.flattened_type_size);
835
836 MPIR_Typerep_flatten(dtp, ext_hdr_ptr);
837 ext_hdr_sz = get_pkt->info.flattened_type_size;
838
839 iov[0].iov_base = (void *) get_pkt;
840 iov[0].iov_len = sizeof(*get_pkt);
841 iov[1].iov_base = (void *) ext_hdr_ptr;
842 iov[1].iov_len = ext_hdr_sz;
843
844 MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
845 mpi_errno = MPIDI_CH3_iStartMsgv(vc, iov, 2, &req);
846 MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
847
848 /* release the target datatype */
849 MPIR_Datatype_ptr_release(dtp);
850
851 /* If send is finished, we free extended header immediately.
852 * Otherwise, store its pointer in request thus it can be freed when request is freed.*/
853 if (req != NULL) {
854 req->dev.ext_hdr_ptr = ext_hdr_ptr;
855 }
856 else {
857 MPL_free(ext_hdr_ptr);
858 }
859 }
860
861 if (mpi_errno != MPI_SUCCESS) {
862 MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
863 }
864
865 /* release the request returned by iStartMsg or iStartMsgv */
866 if (req != NULL) {
867 MPIR_Request_free(req);
868 }
869
870 rma_op->single_req = curr_req;
871
872 fn_exit:
873 MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_ISSUE_GET_OP);
874 return mpi_errno;
875 /* --BEGIN ERROR HANDLING-- */
876 fn_fail:
877 rma_op->single_req = NULL;
878 rma_op->reqs_size = 0;
879 goto fn_exit;
880 /* --END ERROR HANDLING-- */
881 }
882
883
issue_cas_op(MPIDI_RMA_Op_t * rma_op,MPIR_Win * win_ptr,MPIDI_RMA_Target_t * target_ptr,int pkt_flags)884 static int issue_cas_op(MPIDI_RMA_Op_t * rma_op,
885 MPIR_Win * win_ptr, MPIDI_RMA_Target_t * target_ptr,
886 int pkt_flags)
887 {
888 MPIDI_VC_t *vc = NULL;
889 MPIR_Comm *comm_ptr = win_ptr->comm_ptr;
890 MPIDI_CH3_Pkt_cas_t *cas_pkt = &rma_op->pkt.cas;
891 MPIR_Request *rmw_req = NULL;
892 MPIR_Request *curr_req = NULL;
893 int mpi_errno = MPI_SUCCESS;
894 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_ISSUE_CAS_OP);
895
896 MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_ISSUE_CAS_OP);
897
898 rma_op->reqs_size = 1;
899
900 /* Create a request for the RMW response. Store the origin buf, count, and
901 * datatype in it, and pass the request's handle RMW packet. When the
902 * response comes from the target, it will contain the request handle. */
903 curr_req = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
904 MPIR_ERR_CHKANDJUMP(curr_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
905
906 /* Set refs on the request to 2: one for the response message, and one for
907 * the partial completion handler */
908 MPIR_Object_set_ref(curr_req, 2);
909
910 curr_req->dev.user_buf = rma_op->result_addr;
911 curr_req->dev.datatype = rma_op->result_datatype;
912
913 curr_req->dev.target_win_handle = MPI_WIN_NULL;
914 curr_req->dev.source_win_handle = win_ptr->handle;
915
916 cas_pkt->request_handle = curr_req->handle;
917 cas_pkt->pkt_flags |= pkt_flags;
918
919 MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
920 MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
921 mpi_errno = MPIDI_CH3_iStartMsg(vc, cas_pkt, sizeof(*cas_pkt), &rmw_req);
922 MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
923 MPIR_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
924
925 if (rmw_req != NULL) {
926 MPIR_Request_free(rmw_req);
927 }
928
929 rma_op->single_req = curr_req;
930
931 fn_exit:
932 MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_ISSUE_CAS_OP);
933 return mpi_errno;
934 /* --BEGIN ERROR HANDLING-- */
935 fn_fail:
936 rma_op->single_req = NULL;
937 rma_op->reqs_size = 0;
938 goto fn_exit;
939 /* --END ERROR HANDLING-- */
940 }
941
942
issue_fop_op(MPIDI_RMA_Op_t * rma_op,MPIR_Win * win_ptr,MPIDI_RMA_Target_t * target_ptr,int pkt_flags)943 static int issue_fop_op(MPIDI_RMA_Op_t * rma_op,
944 MPIR_Win * win_ptr, MPIDI_RMA_Target_t * target_ptr,
945 int pkt_flags)
946 {
947 MPIDI_VC_t *vc = NULL;
948 MPIR_Comm *comm_ptr = win_ptr->comm_ptr;
949 MPIDI_CH3_Pkt_fop_t *fop_pkt = &rma_op->pkt.fop;
950 MPIR_Request *resp_req = NULL;
951 MPIR_Request *curr_req = NULL;
952 int mpi_errno = MPI_SUCCESS;
953 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_ISSUE_FOP_OP);
954
955 MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_ISSUE_FOP_OP);
956
957 rma_op->reqs_size = 1;
958
959 /* Create a request for the GACC response. Store the response buf, count, and
960 * datatype in it, and pass the request's handle in the GACC packet. When the
961 * response comes from the target, it will contain the request handle. */
962 resp_req = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
963 MPIR_ERR_CHKANDJUMP(resp_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
964
965 MPIR_Object_set_ref(resp_req, 2);
966
967 resp_req->dev.user_buf = rma_op->result_addr;
968 resp_req->dev.datatype = rma_op->result_datatype;
969 resp_req->dev.target_win_handle = MPI_WIN_NULL;
970 resp_req->dev.source_win_handle = win_ptr->handle;
971
972 fop_pkt->request_handle = resp_req->handle;
973
974 fop_pkt->pkt_flags |= pkt_flags;
975
976 MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
977
978 if (rma_op->pkt.type == MPIDI_CH3_PKT_FOP_IMMED) {
979 /* All origin data is in packet header, issue the header. */
980 MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
981 mpi_errno = MPIDI_CH3_iStartMsg(vc, fop_pkt, sizeof(*fop_pkt), &curr_req);
982 MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
983 MPIR_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
984 }
985 else {
986 MPI_Aint origin_dtp_size;
987 MPIR_Datatype_get_size_macro(rma_op->origin_datatype, origin_dtp_size);
988 mpi_errno = issue_from_origin_buffer(rma_op, vc, NULL, 0, /*ext_hdr_ptr, ext_hdr_sz */
989 0, 1 * origin_dtp_size, &curr_req);
990 MPIR_ERR_CHECK(mpi_errno);
991 }
992
993 if (curr_req != NULL) {
994 MPIR_Request_free(curr_req);
995 }
996
997 rma_op->single_req = resp_req;
998
999 fn_exit:
1000 MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_ISSUE_FOP_OP);
1001 return mpi_errno;
1002 /* --BEGIN ERROR HANDLING-- */
1003 fn_fail:
1004 rma_op->single_req = NULL;
1005 rma_op->reqs_size = 0;
1006 goto fn_exit;
1007 /* --END ERROR HANDLING-- */
1008 }
1009
1010
1011 /* issue_rma_op() is called by ch3u_rma_progress.c, it triggers
1012 proper issuing functions according to packet type. */
issue_rma_op(MPIDI_RMA_Op_t * op_ptr,MPIR_Win * win_ptr,MPIDI_RMA_Target_t * target_ptr,int pkt_flags)1013 static inline int issue_rma_op(MPIDI_RMA_Op_t * op_ptr, MPIR_Win * win_ptr,
1014 MPIDI_RMA_Target_t * target_ptr, int pkt_flags)
1015 {
1016 int mpi_errno = MPI_SUCCESS;
1017 MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_ISSUE_RMA_OP);
1018
1019 MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_ISSUE_RMA_OP);
1020
1021 switch (op_ptr->pkt.type) {
1022 case (MPIDI_CH3_PKT_PUT):
1023 case (MPIDI_CH3_PKT_PUT_IMMED):
1024 mpi_errno = issue_put_op(op_ptr, win_ptr, target_ptr, pkt_flags);
1025 break;
1026 case (MPIDI_CH3_PKT_ACCUMULATE):
1027 case (MPIDI_CH3_PKT_ACCUMULATE_IMMED):
1028 mpi_errno = issue_acc_op(op_ptr, win_ptr, target_ptr, pkt_flags);
1029 break;
1030 case (MPIDI_CH3_PKT_GET_ACCUM):
1031 case (MPIDI_CH3_PKT_GET_ACCUM_IMMED):
1032 mpi_errno = issue_get_acc_op(op_ptr, win_ptr, target_ptr, pkt_flags);
1033 break;
1034 case (MPIDI_CH3_PKT_GET):
1035 mpi_errno = issue_get_op(op_ptr, win_ptr, target_ptr, pkt_flags);
1036 break;
1037 case (MPIDI_CH3_PKT_CAS_IMMED):
1038 mpi_errno = issue_cas_op(op_ptr, win_ptr, target_ptr, pkt_flags);
1039 break;
1040 case (MPIDI_CH3_PKT_FOP):
1041 case (MPIDI_CH3_PKT_FOP_IMMED):
1042 mpi_errno = issue_fop_op(op_ptr, win_ptr, target_ptr, pkt_flags);
1043 break;
1044 default:
1045 MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**winInvalidOp");
1046 }
1047
1048 MPIR_ERR_CHECK(mpi_errno);
1049
1050 fn_exit:
1051 MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_ISSUE_RMA_OP);
1052 return mpi_errno;
1053 /* --BEGIN ERROR HANDLING-- */
1054 fn_fail:
1055 goto fn_exit;
1056 /* --END ERROR HANDLING-- */
1057 }
1058
1059 #endif /* MPID_RMA_ISSUE_H_INCLUDED */
1060