1 /*
2 * Copyright (C) by Argonne National Laboratory
3 * See COPYRIGHT in top-level directory
4 */
5
6 #include "mpidimpl.h"
7 #include "ofi_impl.h"
8 #include "ofi_noinline.h"
9
create_chunk(void * pack_buffer,MPI_Aint unpack_size,MPI_Aint unpack_offset,MPIDI_OFI_win_request_t * winreq)10 static MPIDI_OFI_pack_chunk *create_chunk(void *pack_buffer, MPI_Aint unpack_size,
11 MPI_Aint unpack_offset, MPIDI_OFI_win_request_t * winreq)
12 {
13 MPIDI_OFI_pack_chunk *chunk = MPL_malloc(sizeof(MPIDI_OFI_chunk_request), MPL_MEM_RMA);
14 if (chunk == NULL)
15 return NULL;
16
17 chunk->pack_buffer = pack_buffer;
18 chunk->unpack_size = unpack_size;
19 chunk->unpack_offset = unpack_offset;
20
21 /* prepend to chunk list */
22 chunk->next = winreq->chunks;
23 winreq->chunks = chunk;
24
25 return chunk;
26 }
27
MPIDI_OFI_complete_chunks(MPIDI_OFI_win_request_t * winreq)28 void MPIDI_OFI_complete_chunks(MPIDI_OFI_win_request_t * winreq)
29 {
30 MPIDI_OFI_pack_chunk *chunk = winreq->chunks;
31
32 while (chunk) {
33 if (chunk->unpack_size > 0) {
34 MPI_Aint actual_unpack_bytes;
35 MPIR_Typerep_unpack(chunk->pack_buffer,
36 chunk->unpack_size, winreq->noncontig.get.origin.addr,
37 winreq->noncontig.get.origin.count,
38 winreq->noncontig.get.origin.datatype, chunk->unpack_offset,
39 &actual_unpack_bytes);
40 MPIR_Assert(chunk->unpack_size == actual_unpack_bytes);
41 }
42
43 MPIDI_OFI_pack_chunk *next = chunk->next;
44 MPIDU_genq_private_pool_free_cell(MPIDI_OFI_global.pack_buf_pool, chunk->pack_buffer);
45 MPL_free(chunk);
46 chunk = next;
47 }
48
49 winreq->chunks = NULL;
50 }
51
MPIDI_OFI_nopack_putget(const void * origin_addr,int origin_count,MPI_Datatype origin_datatype,int target_rank,int target_count,MPI_Datatype target_datatype,MPIDI_OFI_target_mr_t target_mr,MPIR_Win * win,MPIDI_av_entry_t * addr,int rma_type,MPIR_Request ** sigreq)52 int MPIDI_OFI_nopack_putget(const void *origin_addr, int origin_count,
53 MPI_Datatype origin_datatype, int target_rank,
54 int target_count, MPI_Datatype target_datatype,
55 MPIDI_OFI_target_mr_t target_mr, MPIR_Win * win,
56 MPIDI_av_entry_t * addr, int rma_type, MPIR_Request ** sigreq)
57 {
58 int mpi_errno = MPI_SUCCESS;
59 uint64_t flags;
60 struct fi_msg_rma msg;
61 struct fi_rma_iov riov;
62 struct iovec iov;
63 size_t target_bytes, origin_bytes;
64
65 MPIR_Datatype_get_size_macro(origin_datatype, origin_bytes);
66 origin_bytes *= origin_count;
67 MPIR_Datatype_get_size_macro(target_datatype, target_bytes);
68 target_bytes *= target_count;
69
70 /* allocate request */
71 MPIDI_OFI_win_request_t *req = MPIDI_OFI_win_request_create();
72 MPIR_ERR_CHKANDSTMT((req) == NULL, mpi_errno, MPIX_ERR_NOREQ, goto fn_fail, "**nomemreq");
73 req->next = MPIDI_OFI_WIN(win).syncQ;
74 MPIDI_OFI_WIN(win).syncQ = req;
75 req->sigreq = sigreq;
76 req->chunks = NULL;
77 if (rma_type == MPIDI_OFI_PUT) {
78 req->noncontig.put.origin.datatype = MPI_DATATYPE_NULL;
79 req->noncontig.put.target.datatype = MPI_DATATYPE_NULL;
80 } else {
81 req->noncontig.get.origin.datatype = MPI_DATATYPE_NULL;
82 req->noncontig.get.target.datatype = MPI_DATATYPE_NULL;
83 }
84
85 /* allocate target iovecs */
86 struct iovec *target_iov;
87 MPI_Aint total_target_iov_len;
88 MPI_Aint target_len;
89 MPI_Aint target_iov_offset = 0;
90 MPIR_Typerep_iov_len(target_count, target_datatype, target_bytes, &total_target_iov_len);
91 target_len = MPL_MIN(total_target_iov_len, MPIR_CVAR_CH4_OFI_RMA_IOVEC_MAX);
92 target_iov = MPL_malloc(sizeof(struct iovec) * target_len, MPL_MEM_RMA);
93
94 /* allocate origin iovecs */
95 struct iovec *origin_iov;
96 MPI_Aint total_origin_iov_len;
97 MPI_Aint origin_len;
98 MPI_Aint origin_iov_offset = 0;
99 MPIR_Typerep_iov_len(origin_count, origin_datatype, origin_bytes, &total_origin_iov_len);
100 origin_len = MPL_MIN(total_origin_iov_len, MPIR_CVAR_CH4_OFI_RMA_IOVEC_MAX);
101 origin_iov = MPL_malloc(sizeof(struct iovec) * origin_len, MPL_MEM_RMA);
102
103 if (sigreq) {
104 #ifdef MPIDI_CH4_USE_WORK_QUEUES
105 if (*sigreq) {
106 MPIR_Request_add_ref(*sigreq);
107 } else
108 #endif
109 {
110 MPIDI_OFI_REQUEST_CREATE(*sigreq, MPIR_REQUEST_KIND__RMA, 0);
111 }
112 flags = FI_COMPLETION | FI_DELIVERY_COMPLETE;
113 } else {
114 flags = FI_DELIVERY_COMPLETE;
115 }
116
117 int i = 0, j = 0;
118 size_t msg_len;
119 while (i < total_origin_iov_len && j < total_target_iov_len) {
120 MPI_Aint origin_cur = i % origin_len;
121 MPI_Aint target_cur = j % target_len;
122 if (i == origin_iov_offset)
123 MPIDI_OFI_load_iov(origin_addr, origin_count, origin_datatype, origin_len,
124 &origin_iov_offset, origin_iov);
125 if (j == target_iov_offset)
126 MPIDI_OFI_load_iov((const void *) target_mr.addr, target_count, target_datatype,
127 target_len, &target_iov_offset, target_iov);
128
129 msg_len = MPL_MIN(origin_iov[origin_cur].iov_len, target_iov[target_cur].iov_len);
130
131 msg.desc = NULL;
132 msg.addr = MPIDI_OFI_av_to_phys(addr, 0, 0);
133 msg.context = NULL;
134 msg.data = 0;
135 msg.msg_iov = &iov;
136 msg.iov_count = 1;
137 msg.rma_iov = &riov;
138 msg.rma_iov_count = 1;
139 iov.iov_base = origin_iov[origin_cur].iov_base;
140 iov.iov_len = msg_len;
141 riov.addr = (uintptr_t) target_iov[target_cur].iov_base;
142 riov.len = msg_len;
143 riov.key = target_mr.mr_key;
144 MPIDI_OFI_INIT_CHUNK_CONTEXT(win, sigreq);
145 if (rma_type == MPIDI_OFI_PUT) {
146 MPIDI_OFI_CALL_RETRY(fi_writemsg(MPIDI_OFI_WIN(win).ep, &msg, flags), 0, rdma_write,
147 FALSE);
148 req->rma_type = MPIDI_OFI_PUT;
149 } else { /* MPIDI_OFI_GET */
150 MPIDI_OFI_CALL_RETRY(fi_readmsg(MPIDI_OFI_WIN(win).ep, &msg, flags), 0, rdma_write,
151 FALSE);
152 req->rma_type = MPIDI_OFI_GET;
153 }
154
155 if (msg_len < origin_iov[origin_cur].iov_len) {
156 origin_iov[origin_cur].iov_base = (char *) origin_iov[origin_cur].iov_base + msg_len;
157 origin_iov[origin_cur].iov_len -= msg_len;
158 } else {
159 i++;
160 }
161
162 if (msg_len < target_iov[target_cur].iov_len) {
163 target_iov[target_cur].iov_base = (char *) target_iov[target_cur].iov_base + msg_len;
164 target_iov[target_cur].iov_len -= msg_len;
165 } else {
166 j++;
167 }
168 }
169 MPIR_Assert(i == total_origin_iov_len);
170 MPIR_Assert(j == total_target_iov_len);
171 MPL_free(origin_iov);
172 MPL_free(target_iov);
173
174 fn_exit:
175 return mpi_errno;
176 fn_fail:
177 goto fn_exit;
178 }
179
issue_packed_put(MPIR_Win * win,MPIDI_OFI_win_request_t * req)180 static int issue_packed_put(MPIR_Win * win, MPIDI_OFI_win_request_t * req)
181 {
182 int mpi_errno = MPI_SUCCESS;
183 MPIR_Request **sigreq = req->sigreq;
184 MPI_Aint actual_pack_bytes;
185 struct fi_msg_rma msg;
186 struct iovec iov;
187 struct fi_rma_iov riov;
188 uint64_t flags;
189 void *pack_buffer;
190
191 if (sigreq)
192 flags = FI_COMPLETION | FI_DELIVERY_COMPLETE;
193 else
194 flags = FI_DELIVERY_COMPLETE;
195
196 int j = req->noncontig.put.target.iov_cur;
197 size_t msg_len;
198 while (req->noncontig.put.origin.pack_offset < req->noncontig.put.origin.total_bytes) {
199 MPIDU_genq_private_pool_alloc_cell(MPIDI_OFI_global.pack_buf_pool, &pack_buffer);
200 if (pack_buffer == NULL)
201 break;
202
203 MPI_Aint target_cur = j % req->noncontig.put.target.iov_len;
204 if (j == req->noncontig.put.target.iov_offset)
205 MPIDI_OFI_load_iov(req->noncontig.put.target.base,
206 req->noncontig.put.target.count,
207 req->noncontig.put.target.datatype,
208 req->noncontig.put.target.iov_len,
209 &req->noncontig.put.target.iov_offset,
210 req->noncontig.put.target.iov);
211
212 msg_len =
213 MPL_MIN(MPIDI_OFI_DEFAULT_SHORT_SEND_SIZE,
214 req->noncontig.put.target.iov[target_cur].iov_len);
215 /* load the pack buffer */
216 MPIR_Typerep_pack(req->noncontig.put.origin.addr, req->noncontig.put.origin.count,
217 req->noncontig.put.origin.datatype,
218 req->noncontig.put.origin.pack_offset, pack_buffer,
219 msg_len, &actual_pack_bytes);
220 MPIR_Assert(msg_len == actual_pack_bytes);
221
222 MPIDI_OFI_pack_chunk *chunk = create_chunk(pack_buffer, 0, 0, req);
223 MPIR_ERR_CHKANDSTMT(chunk == NULL, mpi_errno, MPI_ERR_NO_MEM, goto fn_fail, "**nomem");
224
225 msg.desc = NULL;
226 msg.addr = MPIDI_OFI_av_to_phys(req->noncontig.put.target.addr, 0, 0);
227 msg.context = NULL;
228 msg.data = 0;
229 msg.msg_iov = &iov;
230 msg.iov_count = 1;
231 msg.rma_iov = &riov;
232 msg.rma_iov_count = 1;
233 iov.iov_base = pack_buffer;
234 iov.iov_len = msg_len;
235 riov.addr = (uintptr_t) req->noncontig.put.target.iov[target_cur].iov_base;
236 riov.len = msg_len;
237 riov.key = req->noncontig.put.target.key;
238 MPIDI_OFI_INIT_CHUNK_CONTEXT(win, sigreq);
239 MPIDI_OFI_CALL_RETRY(fi_writemsg(MPIDI_OFI_WIN(win).ep, &msg, flags), 0, rdma_write, FALSE);
240 req->noncontig.put.origin.pack_offset += msg_len;
241
242 if (msg_len < req->noncontig.put.target.iov[target_cur].iov_len) {
243 req->noncontig.put.target.iov[target_cur].iov_base =
244 (char *) req->noncontig.put.target.iov[target_cur].iov_base + msg_len;
245 req->noncontig.put.target.iov[target_cur].iov_len -= msg_len;
246 } else {
247 j++;
248 }
249 }
250
251 /* not finished. update our place in the target iov array for later. */
252 if (req->noncontig.put.origin.pack_offset < req->noncontig.put.origin.total_bytes) {
253 req->noncontig.put.target.iov_cur = j;
254 } else {
255 /* finished issuing. move from deferredQ to syncQ. */
256 DL_DELETE(MPIDI_OFI_WIN(win).deferredQ, req);
257 req->next = MPIDI_OFI_WIN(win).syncQ;
258 MPIDI_OFI_WIN(win).syncQ = req;
259 MPL_free(req->noncontig.put.target.iov);
260 }
261
262 fn_exit:
263 return mpi_errno;
264 fn_fail:
265 goto fn_exit;
266 }
267
issue_packed_get(MPIR_Win * win,MPIDI_OFI_win_request_t * req)268 static int issue_packed_get(MPIR_Win * win, MPIDI_OFI_win_request_t * req)
269 {
270 int mpi_errno = MPI_SUCCESS;
271 MPIR_Request **sigreq = req->sigreq;
272 struct fi_msg_rma msg;
273 struct iovec iov;
274 struct fi_rma_iov riov;
275 uint64_t flags;
276 void *pack_buffer;
277
278 if (sigreq)
279 flags = FI_COMPLETION | FI_DELIVERY_COMPLETE;
280 else
281 flags = FI_DELIVERY_COMPLETE;
282
283 int j = req->noncontig.get.target.iov_cur;
284 size_t msg_len;
285 while (req->noncontig.get.origin.pack_offset < req->noncontig.get.origin.total_bytes) {
286 MPIDU_genq_private_pool_alloc_cell(MPIDI_OFI_global.pack_buf_pool, &pack_buffer);
287 if (pack_buffer == NULL)
288 break;
289
290 MPI_Aint target_cur = j % req->noncontig.get.target.iov_len;
291 if (j == req->noncontig.get.target.iov_offset)
292 MPIDI_OFI_load_iov(req->noncontig.get.target.base,
293 req->noncontig.get.target.count,
294 req->noncontig.get.target.datatype,
295 req->noncontig.get.target.iov_len,
296 &req->noncontig.get.target.iov_offset,
297 req->noncontig.get.target.iov);
298
299 msg_len =
300 MPL_MIN(MPIDI_OFI_DEFAULT_SHORT_SEND_SIZE,
301 req->noncontig.get.target.iov[target_cur].iov_len);
302
303 MPIDI_OFI_pack_chunk *chunk =
304 create_chunk(pack_buffer, msg_len, req->noncontig.get.origin.pack_offset, req);
305 MPIR_ERR_CHKANDSTMT(chunk == NULL, mpi_errno, MPI_ERR_NO_MEM, goto fn_fail, "**nomem");
306
307 msg.desc = NULL;
308 msg.addr = MPIDI_OFI_av_to_phys(req->noncontig.get.target.addr, 0, 0);
309 msg.context = NULL;
310 msg.data = 0;
311 msg.msg_iov = &iov;
312 msg.iov_count = 1;
313 msg.rma_iov = &riov;
314 msg.rma_iov_count = 1;
315 iov.iov_base = pack_buffer;
316 iov.iov_len = msg_len;
317 riov.addr = (uintptr_t) req->noncontig.get.target.iov[target_cur].iov_base;
318 riov.len = msg_len;
319 riov.key = req->noncontig.get.target.key;
320 MPIDI_OFI_INIT_CHUNK_CONTEXT(win, sigreq);
321 MPIDI_OFI_CALL_RETRY(fi_readmsg(MPIDI_OFI_WIN(win).ep, &msg, flags), 0, rdma_write, FALSE);
322 req->noncontig.get.origin.pack_offset += msg_len;
323
324 if (msg_len < req->noncontig.get.target.iov[target_cur].iov_len) {
325 req->noncontig.get.target.iov[target_cur].iov_base =
326 (char *) req->noncontig.get.target.iov[target_cur].iov_base + msg_len;
327 req->noncontig.get.target.iov[target_cur].iov_len -= msg_len;
328 } else {
329 j++;
330 }
331 }
332
333 /* not finished. update our place in the target iov array for later. */
334 if (req->noncontig.get.origin.pack_offset < req->noncontig.get.origin.total_bytes) {
335 req->noncontig.get.target.iov_cur = j;
336 } else {
337 /* finished issuing. move from deferredQ to syncQ. */
338 DL_DELETE(MPIDI_OFI_WIN(win).deferredQ, req);
339 req->next = MPIDI_OFI_WIN(win).syncQ;
340 MPIDI_OFI_WIN(win).syncQ = req;
341 MPL_free(req->noncontig.get.target.iov);
342 }
343
344 fn_exit:
345 return mpi_errno;
346 fn_fail:
347 goto fn_exit;
348 }
349
MPIDI_OFI_pack_put(const void * origin_addr,int origin_count,MPI_Datatype origin_datatype,int target_rank,int target_count,MPI_Datatype target_datatype,MPIDI_OFI_target_mr_t target_mr,MPIR_Win * win,MPIDI_av_entry_t * addr,MPIR_Request ** sigreq)350 int MPIDI_OFI_pack_put(const void *origin_addr, int origin_count,
351 MPI_Datatype origin_datatype, int target_rank,
352 int target_count, MPI_Datatype target_datatype,
353 MPIDI_OFI_target_mr_t target_mr, MPIR_Win * win,
354 MPIDI_av_entry_t * addr, MPIR_Request ** sigreq)
355 {
356 int mpi_errno = MPI_SUCCESS;
357 size_t target_bytes, origin_bytes;
358
359 MPIR_Datatype_get_size_macro(origin_datatype, origin_bytes);
360 origin_bytes *= origin_count;
361 MPIR_Datatype_get_size_macro(target_datatype, target_bytes);
362 target_bytes *= target_count;
363
364 /* allocate request */
365 MPIDI_OFI_win_request_t *req = MPIDI_OFI_win_request_create();
366 MPIR_ERR_CHKANDSTMT((req) == NULL, mpi_errno, MPIX_ERR_NOREQ, goto fn_fail, "**nomemreq");
367 req->sigreq = sigreq;
368
369 /* allocate target iovecs */
370 struct iovec *target_iov;
371 MPI_Aint total_target_iov_len;
372 MPI_Aint target_len;
373 MPIR_Typerep_iov_len(target_count, target_datatype, target_bytes, &total_target_iov_len);
374 target_len = MPL_MIN(total_target_iov_len, MPIR_CVAR_CH4_OFI_RMA_IOVEC_MAX);
375 target_iov = MPL_malloc(sizeof(struct iovec) * target_len, MPL_MEM_RMA);
376
377 /* put on deferred list */
378 DL_APPEND(MPIDI_OFI_WIN(win).deferredQ, req);
379 req->rma_type = MPIDI_OFI_PUT;
380 req->chunks = NULL;
381
382 /* origin */
383 req->noncontig.put.origin.addr = origin_addr;
384 req->noncontig.put.origin.count = origin_count;
385 req->noncontig.put.origin.datatype = origin_datatype;
386 MPIR_Datatype_add_ref_if_not_builtin(origin_datatype);
387 req->noncontig.put.origin.pack_offset = 0;
388 req->noncontig.put.origin.total_bytes = origin_bytes;
389
390 /* target */
391 req->noncontig.put.target.base = (void *) target_mr.addr;
392 req->noncontig.put.target.count = target_count;
393 req->noncontig.put.target.datatype = target_datatype;
394 MPIR_Datatype_add_ref_if_not_builtin(target_datatype);
395 req->noncontig.put.target.iov = target_iov;
396 req->noncontig.put.target.iov_len = target_len;
397 req->noncontig.put.target.iov_offset = 0;
398 req->noncontig.put.target.iov_cur = 0;
399 req->noncontig.put.target.addr = addr;
400 req->noncontig.put.target.key = target_mr.mr_key;
401
402 mpi_errno = issue_packed_put(win, req);
403
404 fn_exit:
405 return mpi_errno;
406 fn_fail:
407 goto fn_exit;
408 }
409
MPIDI_OFI_pack_get(void * origin_addr,int origin_count,MPI_Datatype origin_datatype,int target_rank,int target_count,MPI_Datatype target_datatype,MPIDI_OFI_target_mr_t target_mr,MPIR_Win * win,MPIDI_av_entry_t * addr,MPIR_Request ** sigreq)410 int MPIDI_OFI_pack_get(void *origin_addr, int origin_count,
411 MPI_Datatype origin_datatype, int target_rank,
412 int target_count, MPI_Datatype target_datatype,
413 MPIDI_OFI_target_mr_t target_mr, MPIR_Win * win,
414 MPIDI_av_entry_t * addr, MPIR_Request ** sigreq)
415 {
416 int mpi_errno = MPI_SUCCESS;
417 size_t target_bytes, origin_bytes;
418
419 MPIR_Datatype_get_size_macro(origin_datatype, origin_bytes);
420 origin_bytes *= origin_count;
421 MPIR_Datatype_get_size_macro(target_datatype, target_bytes);
422 target_bytes *= target_count;
423
424 /* allocate request */
425 MPIDI_OFI_win_request_t *req = MPIDI_OFI_win_request_create();
426 MPIR_ERR_CHKANDSTMT((req) == NULL, mpi_errno, MPIX_ERR_NOREQ, goto fn_fail, "**nomemreq");
427 req->sigreq = sigreq;
428
429 /* allocate target iovecs */
430 struct iovec *target_iov;
431 MPI_Aint total_target_iov_len;
432 MPI_Aint target_len;
433 MPIR_Typerep_iov_len(target_count, target_datatype, target_bytes, &total_target_iov_len);
434 target_len = MPL_MIN(total_target_iov_len, MPIR_CVAR_CH4_OFI_RMA_IOVEC_MAX);
435 target_iov = MPL_malloc(sizeof(struct iovec) * target_len, MPL_MEM_RMA);
436
437 /* put on deferred list */
438 DL_APPEND(MPIDI_OFI_WIN(win).deferredQ, req);
439 req->rma_type = MPIDI_OFI_GET;
440 req->chunks = NULL;
441
442 /* origin */
443 req->noncontig.get.origin.addr = origin_addr;
444 req->noncontig.get.origin.count = origin_count;
445 req->noncontig.get.origin.datatype = origin_datatype;
446 MPIR_Datatype_add_ref_if_not_builtin(origin_datatype);
447 req->noncontig.get.origin.pack_offset = 0;
448 req->noncontig.get.origin.total_bytes = origin_bytes;
449
450 /* target */
451 req->noncontig.get.target.base = (void *) target_mr.addr;
452 req->noncontig.get.target.count = target_count;
453 req->noncontig.get.target.datatype = target_datatype;
454 MPIR_Datatype_add_ref_if_not_builtin(target_datatype);
455 req->noncontig.get.target.iov = target_iov;
456 req->noncontig.get.target.iov_len = target_len;
457 req->noncontig.get.target.iov_offset = 0;
458 req->noncontig.get.target.iov_cur = 0;
459 req->noncontig.get.target.addr = addr;
460 req->noncontig.get.target.key = target_mr.mr_key;
461
462 mpi_errno = issue_packed_get(win, req);
463
464 fn_exit:
465 return mpi_errno;
466 fn_fail:
467 goto fn_exit;
468 }
469
MPIDI_OFI_issue_deferred_rma(MPIR_Win * win)470 int MPIDI_OFI_issue_deferred_rma(MPIR_Win * win)
471 {
472 int mpi_errno = MPI_SUCCESS;
473 MPIDI_OFI_win_request_t *req = MPIDI_OFI_WIN(win).deferredQ;
474
475 while (req) {
476 /* free temporary buffers */
477 MPIDI_OFI_complete_chunks(req);
478
479 switch (req->rma_type) {
480 case MPIDI_OFI_PUT:
481 mpi_errno = issue_packed_put(win, req);
482 MPIR_ERR_CHECK(mpi_errno);
483 break;
484 case MPIDI_OFI_GET:
485 mpi_errno = issue_packed_get(win, req);
486 MPIR_ERR_CHECK(mpi_errno);
487 break;
488 default:
489 MPIR_Assert(0);
490 }
491 req = req->next;
492 }
493
494 fn_exit:
495 return mpi_errno;
496 fn_fail:
497 goto fn_exit;
498 }
499