1 /*
2  * Copyright (C) by Argonne National Laboratory
3  *     See COPYRIGHT in top-level directory
4  */
5 
6 #ifndef MPID_RMA_ISSUE_H_INCLUDED
7 #define MPID_RMA_ISSUE_H_INCLUDED
8 
9 #include "utlist.h"
10 #include "mpid_rma_types.h"
11 
12 /* =========================================================== */
13 /*                    auxiliary functions                      */
14 /* =========================================================== */
15 
16 /* immed_copy() copys data from origin buffer to
17    IMMED packet header. */
immed_copy(void * src,void * dest,size_t len)18 static inline int immed_copy(void *src, void *dest, size_t len)
19 {
20     int mpi_errno = MPI_SUCCESS;
21     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_IMMED_COPY);
22 
23     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_IMMED_COPY);
24 
25     if (src == NULL || dest == NULL || len == 0)
26         goto fn_exit;
27 
28     switch (len) {
29     case 1:
30         *(uint8_t *) dest = *(uint8_t *) src;
31         break;
32 #ifndef NEEDS_STRICT_ALIGNMENT
33         /* Following copy is unsafe on platforms that require strict
34          * alignment (e.g., SPARC). Because the buffers may not be aligned
35          * for data type access except char. */
36     case 2:
37         *(uint16_t *) dest = *(uint16_t *) src;
38         break;
39     case 4:
40         *(uint32_t *) dest = *(uint32_t *) src;
41         break;
42     case 8:
43         *(uint64_t *) dest = *(uint64_t *) src;
44         break;
45 #endif
46     default:
47         MPIR_Memcpy(dest, (void *) src, len);
48     }
49 
50   fn_exit:
51     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_IMMED_COPY);
52     return mpi_errno;
53   fn_fail:
54     goto fn_exit;
55 }
56 
57 /* =========================================================== */
58 /*                  extended packet functions                  */
59 /* =========================================================== */
60 
61 /* Set extended header for ACC operation and return its real size. */
init_stream_dtype_ext_pkt(int pkt_flags,MPIR_Datatype * target_dtp,intptr_t stream_offset,void ** ext_hdr_ptr,MPI_Aint * ext_hdr_sz,int * flattened_type_size)62 static int init_stream_dtype_ext_pkt(int pkt_flags,
63                               MPIR_Datatype* target_dtp, intptr_t stream_offset,
64                               void **ext_hdr_ptr, MPI_Aint * ext_hdr_sz, int *flattened_type_size)
65 {
66     MPI_Aint _total_sz = 0, stream_hdr_sz = 0;
67     void *flattened_type, *total_hdr;
68     int mpi_errno = MPI_SUCCESS;
69 
70     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_INIT_ACCUM_EXT_PKT);
71     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_INIT_ACCUM_EXT_PKT);
72 
73     /*
74      * The extended header consists of two parts:
75      *
76      *  1. Stream header: if the size of the data is large and needs
77      *  to be chunked into multiple pieces.
78      *
79      *  2. Flattened datatype: if the target is a derived datatype.
80      */
81 
82     if ((pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM))
83         stream_hdr_sz = sizeof(MPIDI_CH3_Ext_pkt_stream_t);
84     else
85         stream_hdr_sz = 0;
86 
87     if (target_dtp != NULL)
88         MPIR_Typerep_flatten_size(target_dtp, flattened_type_size);
89     else
90         *flattened_type_size = 0;
91 
92     _total_sz = stream_hdr_sz + *flattened_type_size;
93     if (_total_sz) {
94         total_hdr = MPL_malloc(_total_sz, MPL_MEM_RMA);
95         if (total_hdr == NULL) {
96             MPIR_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem",
97                                  "**nomem %d", _total_sz);
98         }
99         MPL_VG_MEM_INIT(total_hdr, _total_sz);
100     }
101     else {
102         total_hdr = NULL;
103     }
104 
105     if ((pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM)) {
106         ((MPIDI_CH3_Ext_pkt_stream_t *) total_hdr)->stream_offset = stream_offset;
107     }
108     if (target_dtp != NULL) {
109         flattened_type = (void *) ((char *) total_hdr + stream_hdr_sz);
110         MPIR_Typerep_flatten(target_dtp, flattened_type);
111     }
112 
113     (*ext_hdr_ptr) = total_hdr;
114     (*ext_hdr_sz) = _total_sz;
115 
116   fn_exit:
117     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_INIT_ACCUM_EXT_PKT);
118     return mpi_errno;
119   fn_fail:
120     MPL_free((*ext_hdr_ptr));
121     (*ext_hdr_ptr) = NULL;
122     (*ext_hdr_sz) = 0;
123     goto fn_exit;
124 }
125 
126 /* =========================================================== */
127 /*                      issuinng functions                     */
128 /* =========================================================== */
129 
130 /* issue_from_origin_buffer() issues data from origin
131    buffer (i.e. non-IMMED operation). */
issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op,MPIDI_VC_t * vc,void * ext_hdr_ptr,MPI_Aint ext_hdr_sz,intptr_t stream_offset,intptr_t stream_size,MPIR_Request ** req_ptr)132 static int issue_from_origin_buffer(MPIDI_RMA_Op_t * rma_op, MPIDI_VC_t * vc,
133                                     void *ext_hdr_ptr, MPI_Aint ext_hdr_sz,
134                                     intptr_t stream_offset, intptr_t stream_size,
135                                     MPIR_Request ** req_ptr)
136 {
137     MPI_Datatype target_datatype;
138     MPIR_Datatype*target_dtp = NULL, *origin_dtp = NULL;
139     int is_origin_contig;
140     struct iovec iov[MPL_IOV_LIMIT];
141     int iovcnt = 0;
142     MPIR_Request *req = NULL;
143     MPI_Aint dt_true_lb;
144     int pkt_flags;
145     int is_empty_origin = FALSE;
146     int mpi_errno = MPI_SUCCESS;
147     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
148 
149     MPIR_FUNC_VERBOSE_ENTER(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
150 
151     /* Judge if origin buffer is empty (this can only happens for
152      * GACC and FOP when op is MPI_NO_OP). */
153     if ((rma_op->pkt).type == MPIDI_CH3_PKT_GET_ACCUM || (rma_op->pkt).type == MPIDI_CH3_PKT_FOP) {
154         MPI_Op op;
155         MPIDI_CH3_PKT_RMA_GET_OP(rma_op->pkt, op, mpi_errno);
156         if (op == MPI_NO_OP)
157             is_empty_origin = TRUE;
158     }
159 
160     /* Judge if target datatype is derived datatype. */
161     MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE(rma_op->pkt, target_datatype, mpi_errno);
162     if (!MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) {
163         MPIR_Datatype_get_ptr(target_datatype, target_dtp);
164     }
165 
166     if (is_empty_origin == FALSE) {
167         /* Judge if origin datatype is derived datatype. */
168         if (!MPIR_DATATYPE_IS_PREDEFINED(rma_op->origin_datatype)) {
169             MPIR_Datatype_get_ptr(rma_op->origin_datatype, origin_dtp);
170         }
171 
172         /* check if origin data is contiguous and get true lb */
173         MPIR_Datatype_is_contig(rma_op->origin_datatype, &is_origin_contig);
174         MPIR_Datatype_get_true_lb(rma_op->origin_datatype, &dt_true_lb);
175     }
176     else {
177         /* origin buffer is empty, mark origin data as contig and true_lb as 0. */
178         is_origin_contig = 1;
179         dt_true_lb = 0;
180     }
181 
182     iov[iovcnt].iov_base = (void *) & (rma_op->pkt);
183     iov[iovcnt].iov_len = sizeof(rma_op->pkt);
184     iovcnt++;
185 
186     MPIDI_CH3_PKT_RMA_GET_FLAGS(rma_op->pkt, pkt_flags, mpi_errno);
187     if (!(pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_STREAM) && target_dtp == NULL && is_origin_contig) {
188         /* Fast path --- use iStartMsgv() to issue the data, which does not need a request
189          * to be passed in:
190          * (1) non-streamed op (do not need to send extended packet header);
191          * (2) target datatype is predefined (do not need to send derived datatype info);
192          * (3) origin datatype is contiguous (do not need to pack the data and send);
193          */
194 
195         if (is_empty_origin == FALSE) {
196             iov[iovcnt].iov_base =
197                 (void *) ((char *) rma_op->origin_addr + dt_true_lb + stream_offset);
198             iov[iovcnt].iov_len = stream_size;
199             iovcnt++;
200         }
201 
202         MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
203         mpi_errno = MPIDI_CH3_iStartMsgv(vc, iov, iovcnt, &req);
204         MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
205         MPIR_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
206 
207         if (origin_dtp != NULL) {
208             if (req == NULL) {
209                 MPIR_Datatype_ptr_release(origin_dtp);
210             }
211             else {
212                 /* this will cause the datatype to be freed when the request
213                  * is freed. */
214                 req->dev.datatype_ptr = origin_dtp;
215             }
216         }
217 
218         goto fn_exit;
219     }
220 
221     /* Normal path: use iSendv() and sendNoncontig_fn() to issue the data, which
222      * always need a request to be passed in. */
223 
224     /* create a new request */
225     req = MPIR_Request_create(MPIR_REQUEST_KIND__SEND);
226     MPIR_ERR_CHKANDJUMP(req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
227 
228     MPIR_Object_set_ref(req, 2);
229 
230     /* set extended packet header, it is freed when the request is freed.  */
231     if (ext_hdr_sz > 0) {
232         req->dev.ext_hdr_sz = ext_hdr_sz;
233         req->dev.ext_hdr_ptr = ext_hdr_ptr;
234         req->dev.flattened_type = NULL;
235 
236         iov[iovcnt].iov_base = (void *) req->dev.ext_hdr_ptr;
237         iov[iovcnt].iov_len = ext_hdr_sz;
238         iovcnt++;
239     }
240 
241     if (origin_dtp != NULL) {
242         req->dev.datatype_ptr = origin_dtp;
243         /* this will cause the datatype to be freed when the request
244          * is freed. */
245     }
246 
247     if (is_origin_contig) {
248         /* origin data is contiguous */
249         if (is_empty_origin == FALSE) {
250             iov[iovcnt].iov_base =
251                 (void *) ((char *) rma_op->origin_addr + dt_true_lb + stream_offset);
252             iov[iovcnt].iov_len = stream_size;
253             iovcnt++;
254         }
255 
256         MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
257         mpi_errno = MPIDI_CH3_iSendv(vc, req, iov, iovcnt);
258         MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
259         MPIR_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
260     }
261     else {
262         /* origin data is non-contiguous */
263         req->dev.user_buf = rma_op->origin_addr;
264         req->dev.user_count = rma_op->origin_count;
265         req->dev.datatype = rma_op->origin_datatype;
266         req->dev.msg_offset = stream_offset;
267         req->dev.msgsize = stream_offset + stream_size;
268 
269         req->dev.OnFinal = 0;
270         req->dev.OnDataAvail = 0;
271 
272         MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
273         mpi_errno = vc->sendNoncontig_fn(vc, req, iov[0].iov_base, iov[0].iov_len,
274                                          &iov[1], iovcnt - 1);
275         MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
276         MPIR_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
277     }
278 
279   fn_exit:
280     /* release the target datatype */
281     if (target_dtp)
282         MPIR_Datatype_ptr_release(target_dtp);
283     (*req_ptr) = req;
284 
285     MPIR_FUNC_VERBOSE_EXIT(MPID_STATE_ISSUE_FROM_ORIGIN_BUFFER);
286     return mpi_errno;
287   fn_fail:
288     if (req) {
289         if (req->dev.datatype_ptr)
290             MPIR_Datatype_ptr_release(req->dev.datatype_ptr);
291         MPL_free(req->dev.ext_hdr_ptr);
292         MPIR_Request_free(req);
293     }
294 
295     (*req_ptr) = NULL;
296     goto fn_exit;
297 }
298 
299 
300 /* issue_put_op() issues PUT packet header and data. */
issue_put_op(MPIDI_RMA_Op_t * rma_op,MPIR_Win * win_ptr,MPIDI_RMA_Target_t * target_ptr,int pkt_flags)301 static int issue_put_op(MPIDI_RMA_Op_t * rma_op, MPIR_Win * win_ptr,
302                         MPIDI_RMA_Target_t * target_ptr, int pkt_flags)
303 {
304     MPIDI_VC_t *vc = NULL;
305     MPIR_Comm *comm_ptr = win_ptr->comm_ptr;
306     MPIDI_CH3_Pkt_put_t *put_pkt = &rma_op->pkt.put;
307     MPIR_Request *curr_req = NULL;
308     MPI_Datatype target_datatype;
309     MPIR_Datatype*target_dtp_ptr = NULL;
310     int mpi_errno = MPI_SUCCESS;
311     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_ISSUE_PUT_OP);
312 
313     MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_ISSUE_PUT_OP);
314 
315     put_pkt->pkt_flags |= pkt_flags;
316 
317     MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
318 
319     if (rma_op->pkt.type == MPIDI_CH3_PKT_PUT_IMMED) {
320         /* All origin data is in packet header, issue the header. */
321         MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
322         mpi_errno = MPIDI_CH3_iStartMsg(vc, put_pkt, sizeof(*put_pkt), &curr_req);
323         MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
324         MPIR_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
325     }
326     else {
327         MPI_Aint origin_type_size;
328         void *ext_hdr_ptr = NULL;
329         MPI_Aint ext_hdr_sz = 0;
330         MPIR_Datatype_get_size_macro(rma_op->origin_datatype, origin_type_size);
331 
332         /* If derived datatype on target, add extended packet header. */
333         MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE(rma_op->pkt, target_datatype, mpi_errno);
334         if (!MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) {
335             MPIR_Datatype_get_ptr(target_datatype, target_dtp_ptr);
336             MPIR_Typerep_flatten_size(target_dtp_ptr, &put_pkt->info.flattened_type_size);
337 
338             ext_hdr_ptr = MPL_malloc(put_pkt->info.flattened_type_size, MPL_MEM_RMA);
339             if (ext_hdr_ptr == NULL) {
340                 MPIR_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem",
341                                      "**nomem %d", put_pkt->info.flattened_type_size);
342             }
343             MPL_VG_MEM_INIT(ext_hdr_ptr, put_pkt->info.flattened_type_size);
344 
345             MPIR_Typerep_flatten(target_dtp_ptr, ext_hdr_ptr);
346             ext_hdr_sz = put_pkt->info.flattened_type_size;
347         }
348 
349         mpi_errno = issue_from_origin_buffer(rma_op, vc, ext_hdr_ptr, ext_hdr_sz,
350                                              0, rma_op->origin_count * origin_type_size, &curr_req);
351         MPIR_ERR_CHECK(mpi_errno);
352     }
353 
354     if (curr_req != NULL) {
355         rma_op->reqs_size = 1;
356 
357         rma_op->single_req = curr_req;
358     }
359 
360   fn_exit:
361     MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_ISSUE_PUT_OP);
362     return mpi_errno;
363     /* --BEGIN ERROR HANDLING-- */
364   fn_fail:
365     rma_op->single_req = NULL;
366     rma_op->reqs_size = 0;
367     goto fn_exit;
368     /* --END ERROR HANDLING-- */
369 }
370 
371 #define ALL_STREAM_UNITS_ISSUED (-1)
372 
373 /* issue_acc_op() send ACC packet header and data. */
issue_acc_op(MPIDI_RMA_Op_t * rma_op,MPIR_Win * win_ptr,MPIDI_RMA_Target_t * target_ptr,int pkt_flags)374 static int issue_acc_op(MPIDI_RMA_Op_t * rma_op, MPIR_Win * win_ptr,
375                         MPIDI_RMA_Target_t * target_ptr, int pkt_flags)
376 {
377     MPIDI_VC_t *vc = NULL;
378     MPIR_Comm *comm_ptr = win_ptr->comm_ptr;
379     MPIDI_CH3_Pkt_accum_t *accum_pkt = &rma_op->pkt.accum;
380     int i, j;
381     MPI_Aint stream_elem_count, stream_unit_count;
382     MPI_Aint predefined_dtp_size, predefined_dtp_extent, predefined_dtp_count;
383     MPI_Aint total_len, rest_len;
384     MPI_Aint origin_dtp_size;
385     MPIR_Datatype*origin_dtp_ptr = NULL;
386     MPIR_Datatype*target_dtp_ptr = NULL;
387     void *ext_hdr_ptr = NULL;
388     MPI_Aint ext_hdr_sz = 0;
389     int mpi_errno = MPI_SUCCESS;
390     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_ISSUE_ACC_OP);
391 
392     MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_ISSUE_ACC_OP);
393 
394     MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
395 
396     if (rma_op->pkt.type == MPIDI_CH3_PKT_ACCUMULATE_IMMED) {
397         MPIR_Request *curr_req = NULL;
398 
399         accum_pkt->pkt_flags |= pkt_flags;
400 
401         /* All origin data is in packet header, issue the header. */
402         MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
403         mpi_errno = MPIDI_CH3_iStartMsg(vc, accum_pkt, sizeof(*accum_pkt), &curr_req);
404         MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
405         MPIR_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
406 
407         if (curr_req != NULL) {
408             MPIR_Assert(rma_op->reqs_size == 0 && rma_op->single_req == NULL);
409 
410             rma_op->reqs_size = 1;
411             rma_op->single_req = curr_req;
412         }
413         goto fn_exit;
414     }
415 
416     /* Get total length of origin data */
417     MPIR_Datatype_get_size_macro(rma_op->origin_datatype, origin_dtp_size);
418     total_len = origin_dtp_size * rma_op->origin_count;
419 
420     /* Get size and count for predefined datatype elements */
421     if (MPIR_DATATYPE_IS_PREDEFINED(rma_op->origin_datatype)) {
422         predefined_dtp_size = origin_dtp_size;
423         predefined_dtp_count = rma_op->origin_count;
424         MPIR_Datatype_get_extent_macro(rma_op->origin_datatype, predefined_dtp_extent);
425     }
426     else {
427         MPIR_Datatype_get_ptr(rma_op->origin_datatype, origin_dtp_ptr);
428         MPIR_Assert(origin_dtp_ptr != NULL && origin_dtp_ptr->basic_type != MPI_DATATYPE_NULL);
429         MPIR_Datatype_get_size_macro(origin_dtp_ptr->basic_type, predefined_dtp_size);
430         predefined_dtp_count = total_len / predefined_dtp_size;
431         MPIR_Datatype_get_extent_macro(origin_dtp_ptr->basic_type, predefined_dtp_extent);
432     }
433     MPIR_Assert(predefined_dtp_count > 0 && predefined_dtp_size > 0 && predefined_dtp_extent > 0);
434 
435     /* Calculate number of predefined elements in each stream unit, and
436      * total number of stream units. */
437     stream_elem_count = MPIDI_CH3U_Acc_stream_size / predefined_dtp_extent;
438     stream_unit_count = (predefined_dtp_count - 1) / stream_elem_count + 1;
439     MPIR_Assert(stream_elem_count > 0 && stream_unit_count > 0);
440 
441     /* If there are more than one stream unit, mark the current packet
442      * as stream packet */
443     if (stream_unit_count > 1)
444         pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_STREAM;
445 
446     /* Get target datatype */
447     if (!MPIR_DATATYPE_IS_PREDEFINED(accum_pkt->datatype))
448         MPIR_Datatype_get_ptr(accum_pkt->datatype, target_dtp_ptr);
449 
450     rest_len = total_len;
451     MPIR_Assert(rma_op->issued_stream_count >= 0);
452     for (j = 0; j < stream_unit_count; j++) {
453         intptr_t stream_offset, stream_size;
454         MPIR_Request *curr_req = NULL;
455 
456         if (j < rma_op->issued_stream_count)
457             continue;
458 
459         accum_pkt->pkt_flags |= pkt_flags;
460 
461         if (j != 0) {
462             accum_pkt->pkt_flags &= ~MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED;
463             accum_pkt->pkt_flags &= ~MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE;
464         }
465         if (j != stream_unit_count - 1) {
466             accum_pkt->pkt_flags &= ~MPIDI_CH3_PKT_FLAG_RMA_UNLOCK;
467             accum_pkt->pkt_flags &= ~MPIDI_CH3_PKT_FLAG_RMA_FLUSH;
468             accum_pkt->pkt_flags &= ~MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER;
469         }
470 
471         stream_offset = j * stream_elem_count * predefined_dtp_size;
472         stream_size = MPL_MIN(stream_elem_count * predefined_dtp_size, rest_len);
473         rest_len -= stream_size;
474 
475         /* Set extended packet header if needed. */
476         init_stream_dtype_ext_pkt(pkt_flags, target_dtp_ptr, stream_offset, &ext_hdr_ptr, &ext_hdr_sz,
477                            &accum_pkt->info.flattened_type_size);
478 
479         mpi_errno = issue_from_origin_buffer(rma_op, vc, ext_hdr_ptr, ext_hdr_sz,
480                                              stream_offset, stream_size, &curr_req);
481         MPIR_ERR_CHECK(mpi_errno);
482 
483         if (curr_req != NULL) {
484             if (rma_op->reqs_size == 0) {
485                 MPIR_Assert(rma_op->single_req == NULL && rma_op->multi_reqs == NULL);
486                 rma_op->reqs_size = stream_unit_count;
487 
488                 if (stream_unit_count > 1) {
489                     rma_op->multi_reqs =
490                         (MPIR_Request **) MPL_malloc(sizeof(MPIR_Request *) * rma_op->reqs_size, MPL_MEM_RMA);
491                     for (i = 0; i < rma_op->reqs_size; i++)
492                         rma_op->multi_reqs[i] = NULL;
493                 }
494             }
495 
496             if (rma_op->reqs_size == 1)
497                 rma_op->single_req = curr_req;
498             else
499                 rma_op->multi_reqs[j] = curr_req;
500         }
501 
502         rma_op->issued_stream_count++;
503 
504         if (accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
505             accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
506             /* if piggybacked with LOCK flag, we
507              * only issue the first streaming unit */
508             MPIR_Assert(j == 0);
509             break;
510         }
511     }   /* end of for loop */
512 
513     if (rma_op->issued_stream_count == stream_unit_count) {
514         /* Mark that all stream units have been issued */
515         rma_op->issued_stream_count = ALL_STREAM_UNITS_ISSUED;
516     }
517 
518   fn_exit:
519     MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_ISSUE_ACC_OP);
520     return mpi_errno;
521   fn_fail:
522     if (rma_op->reqs_size == 1) {
523         rma_op->single_req = NULL;
524     }
525     else if (rma_op->reqs_size > 1) {
526         MPL_free(rma_op->multi_reqs);
527         rma_op->multi_reqs = NULL;
528     }
529     rma_op->reqs_size = 0;
530     goto fn_exit;
531 }
532 
533 
534 /* issue_get_acc_op() send GACC packet header and data. */
issue_get_acc_op(MPIDI_RMA_Op_t * rma_op,MPIR_Win * win_ptr,MPIDI_RMA_Target_t * target_ptr,int pkt_flags)535 static int issue_get_acc_op(MPIDI_RMA_Op_t * rma_op, MPIR_Win * win_ptr,
536                             MPIDI_RMA_Target_t * target_ptr, int pkt_flags)
537 {
538     MPIDI_VC_t *vc = NULL;
539     MPIR_Comm *comm_ptr = win_ptr->comm_ptr;
540     MPIDI_CH3_Pkt_get_accum_t *get_accum_pkt = &rma_op->pkt.get_accum;
541     int i, j;
542     MPI_Aint stream_elem_count, stream_unit_count;
543     MPI_Aint predefined_dtp_size, predefined_dtp_count, predefined_dtp_extent;
544     MPI_Aint total_len, rest_len;
545     MPI_Aint target_dtp_size;
546     MPIR_Datatype*target_dtp_ptr = NULL;
547     void *ext_hdr_ptr = NULL;
548     MPI_Aint ext_hdr_sz = 0;
549     int mpi_errno = MPI_SUCCESS;
550     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_ISSUE_GET_ACC_OP);
551 
552     MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_ISSUE_GET_ACC_OP);
553 
554     MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
555 
556     if (rma_op->pkt.type == MPIDI_CH3_PKT_GET_ACCUM_IMMED) {
557         MPIR_Request *resp_req = NULL;
558         MPIR_Request *curr_req = NULL;
559 
560         get_accum_pkt->pkt_flags |= pkt_flags;
561 
562         rma_op->reqs_size = 1;
563 
564         /* Create a request for the GACC response.  Store the response buf, count, and
565          * datatype in it, and pass the request's handle in the GACC packet. When the
566          * response comes from the target, it will contain the request handle. */
567         resp_req = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
568         MPIR_ERR_CHKANDJUMP(resp_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
569 
570         MPIR_Object_set_ref(resp_req, 2);
571 
572         resp_req->dev.user_buf = rma_op->result_addr;
573         resp_req->dev.user_count = rma_op->result_count;
574         resp_req->dev.datatype = rma_op->result_datatype;
575         resp_req->dev.target_win_handle = MPI_WIN_NULL;
576         resp_req->dev.source_win_handle = win_ptr->handle;
577 
578         /* Note: Get_accumulate uses the same packet type as accumulate */
579         get_accum_pkt->request_handle = resp_req->handle;
580 
581         /* All origin data is in packet header, issue the header. */
582         MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
583         mpi_errno = MPIDI_CH3_iStartMsg(vc, get_accum_pkt, sizeof(*get_accum_pkt), &curr_req);
584         MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
585         MPIR_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
586 
587         if (curr_req != NULL) {
588             MPIR_Request_free(curr_req);
589         }
590 
591         rma_op->single_req = resp_req;
592 
593         goto fn_exit;
594     }
595 
596     /* Get total length of target data */
597     MPIR_Datatype_get_size_macro(get_accum_pkt->datatype, target_dtp_size);
598     total_len = target_dtp_size * get_accum_pkt->count;
599 
600     /* Get size and count for predefined datatype elements */
601     if (MPIR_DATATYPE_IS_PREDEFINED(get_accum_pkt->datatype)) {
602         predefined_dtp_size = target_dtp_size;
603         predefined_dtp_count = get_accum_pkt->count;
604         MPIR_Datatype_get_extent_macro(get_accum_pkt->datatype, predefined_dtp_extent);
605     }
606     else {
607         MPIR_Datatype_get_ptr(get_accum_pkt->datatype, target_dtp_ptr);
608         MPIR_Assert(target_dtp_ptr != NULL && target_dtp_ptr->basic_type != MPI_DATATYPE_NULL);
609         MPIR_Datatype_get_size_macro(target_dtp_ptr->basic_type, predefined_dtp_size);
610         predefined_dtp_count = total_len / predefined_dtp_size;
611         MPIR_Datatype_get_extent_macro(target_dtp_ptr->basic_type, predefined_dtp_extent);
612     }
613     MPIR_Assert(predefined_dtp_count > 0 && predefined_dtp_size > 0 && predefined_dtp_extent > 0);
614 
615     /* Calculate number of predefined elements in each stream unit, and
616      * total number of stream units. */
617     stream_elem_count = MPIDI_CH3U_Acc_stream_size / predefined_dtp_extent;
618     stream_unit_count = (predefined_dtp_count - 1) / stream_elem_count + 1;
619     MPIR_Assert(stream_elem_count > 0 && stream_unit_count > 0);
620 
621     /* If there are more than one stream unit, mark the current packet
622      * as stream packet */
623     if (stream_unit_count > 1)
624         pkt_flags |= MPIDI_CH3_PKT_FLAG_RMA_STREAM;
625 
626     rest_len = total_len;
627 
628     rma_op->reqs_size = stream_unit_count;
629 
630     if (rma_op->reqs_size > 1) {
631         rma_op->multi_reqs =
632             (MPIR_Request **) MPL_malloc(sizeof(MPIR_Request *) * rma_op->reqs_size, MPL_MEM_RMA);
633         for (i = 0; i < rma_op->reqs_size; i++)
634             rma_op->multi_reqs[i] = NULL;
635     }
636 
637     MPIR_Assert(rma_op->issued_stream_count >= 0);
638 
639     for (j = 0; j < stream_unit_count; j++) {
640         intptr_t stream_offset, stream_size;
641         MPIR_Request *resp_req = NULL;
642         MPIR_Request *curr_req = NULL;
643 
644         if (j < rma_op->issued_stream_count)
645             continue;
646 
647         get_accum_pkt->pkt_flags |= pkt_flags;
648 
649         if (j != 0) {
650             get_accum_pkt->pkt_flags &= ~MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED;
651             get_accum_pkt->pkt_flags &= ~MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE;
652         }
653         if (j != stream_unit_count - 1) {
654             get_accum_pkt->pkt_flags &= ~MPIDI_CH3_PKT_FLAG_RMA_UNLOCK;
655             get_accum_pkt->pkt_flags &= ~MPIDI_CH3_PKT_FLAG_RMA_FLUSH;
656             get_accum_pkt->pkt_flags &= ~MPIDI_CH3_PKT_FLAG_RMA_DECR_AT_COUNTER;
657         }
658 
659         /* Create a request for the GACC response.  Store the response buf, count, and
660          * datatype in it, and pass the request's handle in the GACC packet. When the
661          * response comes from the target, it will contain the request handle. */
662         resp_req = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
663         MPIR_ERR_CHKANDJUMP(resp_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
664 
665         MPIR_Object_set_ref(resp_req, 2);
666 
667         resp_req->dev.user_buf = rma_op->result_addr;
668         resp_req->dev.user_count = rma_op->result_count;
669         resp_req->dev.datatype = rma_op->result_datatype;
670         resp_req->dev.target_win_handle = MPI_WIN_NULL;
671         resp_req->dev.source_win_handle = win_ptr->handle;
672         resp_req->dev.pkt_flags = pkt_flags;
673 
674         if (!MPIR_DATATYPE_IS_PREDEFINED(resp_req->dev.datatype)) {
675             MPIR_Datatype*result_dtp = NULL;
676             MPIR_Datatype_get_ptr(resp_req->dev.datatype, result_dtp);
677             resp_req->dev.datatype_ptr = result_dtp;
678             /* this will cause the datatype to be freed when the
679              * request is freed. */
680         }
681 
682         /* Note: Get_accumulate uses the same packet type as accumulate */
683         get_accum_pkt->request_handle = resp_req->handle;
684 
685         stream_offset = j * stream_elem_count * predefined_dtp_size;
686         stream_size = MPL_MIN(stream_elem_count * predefined_dtp_size, rest_len);
687         rest_len -= stream_size;
688 
689         /* Set extended packet header if needed. */
690         init_stream_dtype_ext_pkt(pkt_flags, target_dtp_ptr, stream_offset, &ext_hdr_ptr, &ext_hdr_sz,
691                            &get_accum_pkt->info.flattened_type_size);
692 
693         /* Note: here we need to allocate an extended packet header in response request,
694          * in order to store the stream_offset locally and use it in PktHandler_Get_AccumResp.
695          * This extended packet header only contains stream_offset and does not contain any
696          * other information. */
697         {
698             int dummy;
699             init_stream_dtype_ext_pkt(pkt_flags, NULL /* target_dtp_ptr */ , stream_offset,
700                                       &(resp_req->dev.ext_hdr_ptr), &(resp_req->dev.ext_hdr_sz),
701                                       &dummy);
702         }
703 
704         mpi_errno = issue_from_origin_buffer(rma_op, vc, ext_hdr_ptr, ext_hdr_sz,
705                                              stream_offset, stream_size, &curr_req);
706         MPIR_ERR_CHECK(mpi_errno);
707 
708         if (curr_req != NULL) {
709             MPIR_Request_free(curr_req);
710         }
711 
712         if (rma_op->reqs_size == 1)
713             rma_op->single_req = resp_req;
714         else
715             rma_op->multi_reqs[j] = resp_req;
716 
717         rma_op->issued_stream_count++;
718 
719         if (get_accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_SHARED ||
720             get_accum_pkt->pkt_flags & MPIDI_CH3_PKT_FLAG_RMA_LOCK_EXCLUSIVE) {
721             /* if piggybacked with LOCK flag, we
722              * only issue the first streaming unit */
723             MPIR_Assert(j == 0);
724             break;
725         }
726     }   /* end of for loop */
727 
728     if (rma_op->issued_stream_count == stream_unit_count) {
729         /* Mark that all stream units have been issued */
730         rma_op->issued_stream_count = ALL_STREAM_UNITS_ISSUED;
731     }
732 
733   fn_exit:
734     MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_ISSUE_GET_ACC_OP);
735     return mpi_errno;
736     /* --BEGIN ERROR HANDLING-- */
737   fn_fail:
738     if (rma_op->reqs_size == 1) {
739         /* error case: drop both our reference to the request and the
740          * progress engine's reference to it, since the progress
741          * engine didn't get a chance to see it yet. */
742         MPIR_Request_free(rma_op->single_req);
743         MPIR_Request_free(rma_op->single_req);
744         rma_op->single_req = NULL;
745     }
746     else if (rma_op->reqs_size > 1) {
747         for (i = 0; i < rma_op->reqs_size; i++) {
748             if (rma_op->multi_reqs[i] != NULL) {
749                 /* error case: drop both our reference to the request
750                  * and the progress engine's reference to it, since
751                  * the progress engine didn't get a chance to see it
752                  * yet. */
753                 MPIR_Request_free(rma_op->multi_reqs[i]);
754                 MPIR_Request_free(rma_op->multi_reqs[i]);
755             }
756         }
757         MPL_free(rma_op->multi_reqs);
758         rma_op->multi_reqs = NULL;
759     }
760     rma_op->reqs_size = 0;
761     goto fn_exit;
762     /* --END ERROR HANDLING-- */
763 }
764 
765 
issue_get_op(MPIDI_RMA_Op_t * rma_op,MPIR_Win * win_ptr,MPIDI_RMA_Target_t * target_ptr,int pkt_flags)766 static int issue_get_op(MPIDI_RMA_Op_t * rma_op, MPIR_Win * win_ptr,
767                         MPIDI_RMA_Target_t * target_ptr, int pkt_flags)
768 {
769     MPIDI_CH3_Pkt_get_t *get_pkt = &rma_op->pkt.get;
770     int mpi_errno = MPI_SUCCESS;
771     MPIDI_VC_t *vc;
772     MPIR_Comm *comm_ptr;
773     MPIR_Datatype*dtp;
774     MPI_Datatype target_datatype;
775     MPIR_Request *req = NULL;
776     MPIR_Request *curr_req = NULL;
777     struct iovec iov[MPL_IOV_LIMIT];
778     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_ISSUE_GET_OP);
779 
780     MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_ISSUE_GET_OP);
781 
782     rma_op->reqs_size = 1;
783 
784     /* create a request, store the origin buf, cnt, datatype in it,
785      * and pass a handle to it in the get packet. When the get
786      * response comes from the target, it will contain the request
787      * handle. */
788     curr_req = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
789     if (curr_req == NULL) {
790         MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**nomemreq");
791     }
792 
793     MPIR_Object_set_ref(curr_req, 2);
794 
795     curr_req->dev.user_buf = rma_op->origin_addr;
796     curr_req->dev.user_count = rma_op->origin_count;
797     curr_req->dev.datatype = rma_op->origin_datatype;
798     curr_req->dev.target_win_handle = MPI_WIN_NULL;
799     curr_req->dev.source_win_handle = win_ptr->handle;
800     if (!MPIR_DATATYPE_IS_PREDEFINED(curr_req->dev.datatype)) {
801         MPIR_Datatype_get_ptr(curr_req->dev.datatype, dtp);
802         curr_req->dev.datatype_ptr = dtp;
803         /* this will cause the datatype to be freed when the
804          * request is freed. */
805     }
806 
807     get_pkt->request_handle = curr_req->handle;
808 
809     get_pkt->pkt_flags |= pkt_flags;
810 
811     comm_ptr = win_ptr->comm_ptr;
812     MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
813 
814     MPIDI_CH3_PKT_RMA_GET_TARGET_DATATYPE(rma_op->pkt, target_datatype, mpi_errno);
815     if (MPIR_DATATYPE_IS_PREDEFINED(target_datatype)) {
816         /* basic datatype on target. simply send the get_pkt. */
817         MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
818         mpi_errno = MPIDI_CH3_iStartMsg(vc, get_pkt, sizeof(*get_pkt), &req);
819         MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
820     }
821     else {
822         /* derived datatype on target. */
823         void *ext_hdr_ptr = NULL;
824         MPI_Aint ext_hdr_sz = 0;
825 
826         MPIR_Datatype_get_ptr(target_datatype, dtp);
827         MPIR_Typerep_flatten_size(dtp, &get_pkt->info.flattened_type_size);
828 
829         ext_hdr_ptr = MPL_malloc(get_pkt->info.flattened_type_size, MPL_MEM_RMA);
830         if (ext_hdr_ptr == NULL) {
831             MPIR_ERR_SETANDJUMP1(mpi_errno, MPI_ERR_OTHER, "**nomem",
832                                  "**nomem %d", get_pkt->info.flattened_type_size);
833         }
834         MPL_VG_MEM_INIT(ext_hdr_ptr, get_pkt->info.flattened_type_size);
835 
836         MPIR_Typerep_flatten(dtp, ext_hdr_ptr);
837         ext_hdr_sz = get_pkt->info.flattened_type_size;
838 
839         iov[0].iov_base = (void *) get_pkt;
840         iov[0].iov_len = sizeof(*get_pkt);
841         iov[1].iov_base = (void *) ext_hdr_ptr;
842         iov[1].iov_len = ext_hdr_sz;
843 
844         MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
845         mpi_errno = MPIDI_CH3_iStartMsgv(vc, iov, 2, &req);
846         MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
847 
848         /* release the target datatype */
849         MPIR_Datatype_ptr_release(dtp);
850 
851         /* If send is finished, we free extended header immediately.
852          * Otherwise, store its pointer in request thus it can be freed when request is freed.*/
853         if (req != NULL) {
854             req->dev.ext_hdr_ptr = ext_hdr_ptr;
855         }
856         else {
857             MPL_free(ext_hdr_ptr);
858         }
859     }
860 
861     if (mpi_errno != MPI_SUCCESS) {
862         MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
863     }
864 
865     /* release the request returned by iStartMsg or iStartMsgv */
866     if (req != NULL) {
867         MPIR_Request_free(req);
868     }
869 
870     rma_op->single_req = curr_req;
871 
872   fn_exit:
873     MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_ISSUE_GET_OP);
874     return mpi_errno;
875     /* --BEGIN ERROR HANDLING-- */
876   fn_fail:
877     rma_op->single_req = NULL;
878     rma_op->reqs_size = 0;
879     goto fn_exit;
880     /* --END ERROR HANDLING-- */
881 }
882 
883 
issue_cas_op(MPIDI_RMA_Op_t * rma_op,MPIR_Win * win_ptr,MPIDI_RMA_Target_t * target_ptr,int pkt_flags)884 static int issue_cas_op(MPIDI_RMA_Op_t * rma_op,
885                         MPIR_Win * win_ptr, MPIDI_RMA_Target_t * target_ptr,
886                         int pkt_flags)
887 {
888     MPIDI_VC_t *vc = NULL;
889     MPIR_Comm *comm_ptr = win_ptr->comm_ptr;
890     MPIDI_CH3_Pkt_cas_t *cas_pkt = &rma_op->pkt.cas;
891     MPIR_Request *rmw_req = NULL;
892     MPIR_Request *curr_req = NULL;
893     int mpi_errno = MPI_SUCCESS;
894     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_ISSUE_CAS_OP);
895 
896     MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_ISSUE_CAS_OP);
897 
898     rma_op->reqs_size = 1;
899 
900     /* Create a request for the RMW response.  Store the origin buf, count, and
901      * datatype in it, and pass the request's handle RMW packet. When the
902      * response comes from the target, it will contain the request handle. */
903     curr_req = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
904     MPIR_ERR_CHKANDJUMP(curr_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
905 
906     /* Set refs on the request to 2: one for the response message, and one for
907      * the partial completion handler */
908     MPIR_Object_set_ref(curr_req, 2);
909 
910     curr_req->dev.user_buf = rma_op->result_addr;
911     curr_req->dev.datatype = rma_op->result_datatype;
912 
913     curr_req->dev.target_win_handle = MPI_WIN_NULL;
914     curr_req->dev.source_win_handle = win_ptr->handle;
915 
916     cas_pkt->request_handle = curr_req->handle;
917     cas_pkt->pkt_flags |= pkt_flags;
918 
919     MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
920     MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
921     mpi_errno = MPIDI_CH3_iStartMsg(vc, cas_pkt, sizeof(*cas_pkt), &rmw_req);
922     MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
923     MPIR_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
924 
925     if (rmw_req != NULL) {
926         MPIR_Request_free(rmw_req);
927     }
928 
929     rma_op->single_req = curr_req;
930 
931   fn_exit:
932     MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_ISSUE_CAS_OP);
933     return mpi_errno;
934     /* --BEGIN ERROR HANDLING-- */
935   fn_fail:
936     rma_op->single_req = NULL;
937     rma_op->reqs_size = 0;
938     goto fn_exit;
939     /* --END ERROR HANDLING-- */
940 }
941 
942 
issue_fop_op(MPIDI_RMA_Op_t * rma_op,MPIR_Win * win_ptr,MPIDI_RMA_Target_t * target_ptr,int pkt_flags)943 static int issue_fop_op(MPIDI_RMA_Op_t * rma_op,
944                         MPIR_Win * win_ptr, MPIDI_RMA_Target_t * target_ptr,
945                         int pkt_flags)
946 {
947     MPIDI_VC_t *vc = NULL;
948     MPIR_Comm *comm_ptr = win_ptr->comm_ptr;
949     MPIDI_CH3_Pkt_fop_t *fop_pkt = &rma_op->pkt.fop;
950     MPIR_Request *resp_req = NULL;
951     MPIR_Request *curr_req = NULL;
952     int mpi_errno = MPI_SUCCESS;
953     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_ISSUE_FOP_OP);
954 
955     MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_ISSUE_FOP_OP);
956 
957     rma_op->reqs_size = 1;
958 
959     /* Create a request for the GACC response.  Store the response buf, count, and
960      * datatype in it, and pass the request's handle in the GACC packet. When the
961      * response comes from the target, it will contain the request handle. */
962     resp_req = MPIR_Request_create(MPIR_REQUEST_KIND__UNDEFINED);
963     MPIR_ERR_CHKANDJUMP(resp_req == NULL, mpi_errno, MPI_ERR_OTHER, "**nomemreq");
964 
965     MPIR_Object_set_ref(resp_req, 2);
966 
967     resp_req->dev.user_buf = rma_op->result_addr;
968     resp_req->dev.datatype = rma_op->result_datatype;
969     resp_req->dev.target_win_handle = MPI_WIN_NULL;
970     resp_req->dev.source_win_handle = win_ptr->handle;
971 
972     fop_pkt->request_handle = resp_req->handle;
973 
974     fop_pkt->pkt_flags |= pkt_flags;
975 
976     MPIDI_Comm_get_vc_set_active(comm_ptr, rma_op->target_rank, &vc);
977 
978     if (rma_op->pkt.type == MPIDI_CH3_PKT_FOP_IMMED) {
979         /* All origin data is in packet header, issue the header. */
980         MPID_THREAD_CS_ENTER(POBJ, vc->pobj_mutex);
981         mpi_errno = MPIDI_CH3_iStartMsg(vc, fop_pkt, sizeof(*fop_pkt), &curr_req);
982         MPID_THREAD_CS_EXIT(POBJ, vc->pobj_mutex);
983         MPIR_ERR_CHKANDJUMP(mpi_errno, mpi_errno, MPI_ERR_OTHER, "**ch3|rmamsg");
984     }
985     else {
986         MPI_Aint origin_dtp_size;
987         MPIR_Datatype_get_size_macro(rma_op->origin_datatype, origin_dtp_size);
988         mpi_errno = issue_from_origin_buffer(rma_op, vc, NULL, 0,       /*ext_hdr_ptr, ext_hdr_sz */
989                                              0, 1 * origin_dtp_size, &curr_req);
990         MPIR_ERR_CHECK(mpi_errno);
991     }
992 
993     if (curr_req != NULL) {
994         MPIR_Request_free(curr_req);
995     }
996 
997     rma_op->single_req = resp_req;
998 
999   fn_exit:
1000     MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_ISSUE_FOP_OP);
1001     return mpi_errno;
1002     /* --BEGIN ERROR HANDLING-- */
1003   fn_fail:
1004     rma_op->single_req = NULL;
1005     rma_op->reqs_size = 0;
1006     goto fn_exit;
1007     /* --END ERROR HANDLING-- */
1008 }
1009 
1010 
1011 /* issue_rma_op() is called by ch3u_rma_progress.c, it triggers
1012    proper issuing functions according to packet type. */
issue_rma_op(MPIDI_RMA_Op_t * op_ptr,MPIR_Win * win_ptr,MPIDI_RMA_Target_t * target_ptr,int pkt_flags)1013 static inline int issue_rma_op(MPIDI_RMA_Op_t * op_ptr, MPIR_Win * win_ptr,
1014                                MPIDI_RMA_Target_t * target_ptr, int pkt_flags)
1015 {
1016     int mpi_errno = MPI_SUCCESS;
1017     MPIR_FUNC_VERBOSE_STATE_DECL(MPID_STATE_ISSUE_RMA_OP);
1018 
1019     MPIR_FUNC_VERBOSE_RMA_ENTER(MPID_STATE_ISSUE_RMA_OP);
1020 
1021     switch (op_ptr->pkt.type) {
1022     case (MPIDI_CH3_PKT_PUT):
1023     case (MPIDI_CH3_PKT_PUT_IMMED):
1024         mpi_errno = issue_put_op(op_ptr, win_ptr, target_ptr, pkt_flags);
1025         break;
1026     case (MPIDI_CH3_PKT_ACCUMULATE):
1027     case (MPIDI_CH3_PKT_ACCUMULATE_IMMED):
1028         mpi_errno = issue_acc_op(op_ptr, win_ptr, target_ptr, pkt_flags);
1029         break;
1030     case (MPIDI_CH3_PKT_GET_ACCUM):
1031     case (MPIDI_CH3_PKT_GET_ACCUM_IMMED):
1032         mpi_errno = issue_get_acc_op(op_ptr, win_ptr, target_ptr, pkt_flags);
1033         break;
1034     case (MPIDI_CH3_PKT_GET):
1035         mpi_errno = issue_get_op(op_ptr, win_ptr, target_ptr, pkt_flags);
1036         break;
1037     case (MPIDI_CH3_PKT_CAS_IMMED):
1038         mpi_errno = issue_cas_op(op_ptr, win_ptr, target_ptr, pkt_flags);
1039         break;
1040     case (MPIDI_CH3_PKT_FOP):
1041     case (MPIDI_CH3_PKT_FOP_IMMED):
1042         mpi_errno = issue_fop_op(op_ptr, win_ptr, target_ptr, pkt_flags);
1043         break;
1044     default:
1045         MPIR_ERR_SETANDJUMP(mpi_errno, MPI_ERR_OTHER, "**winInvalidOp");
1046     }
1047 
1048     MPIR_ERR_CHECK(mpi_errno);
1049 
1050   fn_exit:
1051     MPIR_FUNC_VERBOSE_RMA_EXIT(MPID_STATE_ISSUE_RMA_OP);
1052     return mpi_errno;
1053     /* --BEGIN ERROR HANDLING-- */
1054   fn_fail:
1055     goto fn_exit;
1056     /* --END ERROR HANDLING-- */
1057 }
1058 
1059 #endif /* MPID_RMA_ISSUE_H_INCLUDED */
1060