1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3 * (C) 2014 by Argonne National Laboratory.
4 * See COPYRIGHT in top-level directory.
5 */
6
7 #include "adio.h"
8 #include "adio_extern.h"
9 #include "mpiu_greq.h"
10 #include "mpioimpl.h"
11
12 #ifdef USE_DBG_LOGGING
13 #define RDCOLL_DEBUG 1
14 #endif
15 #ifdef AGGREGATION_PROFILE
16 #include "mpe.h"
17 #endif
18
19 #ifdef HAVE_MPI_GREQUEST_EXTENSIONS
20
21 /* ADIOI_GEN_IreadStridedColl */
22 struct ADIOI_GEN_IreadStridedColl_vars {
23 /* requests */
24 MPI_Request req_offset[2]; /* ADIOI_IRC_STATE_GEN_IREADSTRIDEDCOLL */
25 MPI_Request req_ind_io; /* ADIOI_IRC_STATE_GEN_IREADSTRIDEDCOLL_INDIO */
26
27 /* parameters */
28 ADIO_File fd;
29 void *buf;
30 int count;
31 MPI_Datatype datatype;
32 int file_ptr_type;
33 ADIO_Offset offset;
34
35 /* stack variables */
36 ADIOI_Access *my_req;
37 /* array of nprocs structures, one for each other process in
38 whose file domain this process's request lies */
39
40 ADIOI_Access *others_req;
41 /* array of nprocs structures, one for each other process
42 whose request lies in this process's file domain. */
43
44 int nprocs;
45 int nprocs_for_coll;
46 int myrank;
47 int contig_access_count;
48 int interleave_count;
49 int buftype_is_contig;
50 int *count_my_req_per_proc;
51 int count_my_req_procs;
52 int count_others_req_procs;
53 ADIO_Offset start_offset;
54 ADIO_Offset end_offset;
55 ADIO_Offset orig_fp;
56 ADIO_Offset fd_size;
57 ADIO_Offset min_st_offset;
58 ADIO_Offset *offset_list;
59 ADIO_Offset *st_offsets;
60 ADIO_Offset *fd_start;
61 ADIO_Offset *fd_end;
62 ADIO_Offset *end_offsets;
63 ADIO_Offset *len_list;
64 int *buf_idx;
65 };
66
67 /* ADIOI_Iread_and_exch */
68 struct ADIOI_Iread_and_exch_vars {
69 /* requests */
70 MPI_Request req1; /* ADIOI_IRC_STATE_IREAD_AND_EXCH */
71 MPI_Request req2; /* ADIOI_IRC_STATE_IREAD_AND_EXCH_L1_BEGIN */
72
73 /* parameters */
74 ADIO_File fd;
75 void *buf;
76 MPI_Datatype datatype;
77 int nprocs;
78 int myrank;
79 ADIOI_Access *others_req;
80 ADIO_Offset *offset_list;
81 ADIO_Offset *len_list;
82 int contig_access_count;
83 ADIO_Offset min_st_offset;
84 ADIO_Offset fd_size;
85 ADIO_Offset *fd_start;
86 ADIO_Offset *fd_end;
87 int *buf_idx;
88
89 /* stack variables */
90 int m;
91 int ntimes;
92 int max_ntimes;
93 int buftype_is_contig;
94 ADIO_Offset st_loc;
95 ADIO_Offset end_loc;
96 ADIO_Offset off;
97 ADIO_Offset done;
98 char *read_buf;
99 int *curr_offlen_ptr;
100 int *count;
101 int *send_size;
102 int *recv_size;
103 int *partial_send;
104 int *recd_from_proc;
105 int *start_pos;
106 /* Not convinced end_loc-st_loc couldn't be > int, so make these offsets*/
107 ADIO_Offset size;
108 ADIO_Offset real_size;
109 ADIO_Offset for_curr_iter;
110 ADIO_Offset for_next_iter;
111 ADIOI_Flatlist_node *flat_buf;
112 MPI_Aint buftype_extent;
113 int coll_bufsize;
114
115 /* next function to be called */
116 void (*next_fn)(ADIOI_NBC_Request *, int *);
117 };
118
119 /* ADIOI_R_Iexchange_data */
120 struct ADIOI_R_Iexchange_data_vars {
121 /* requests */
122 MPI_Request req1; /* ADIOI_IRC_STATE_R_IEXCHANGE_DATA */
123 MPI_Request *req2; /* ADIOI_IRC_STATE_R_IEXCHANGE_DATA_RECV & FILL */
124
125 /* parameters */
126 ADIO_File fd;
127 void *buf;
128 ADIOI_Flatlist_node *flat_buf;
129 ADIO_Offset *offset_list;
130 ADIO_Offset *len_list;
131 int *send_size;
132 int *recv_size;
133 int *count;
134 int *start_pos;
135 int *partial_send;
136 int *recd_from_proc;
137 int nprocs;
138 int myrank;
139 int buftype_is_contig;
140 int contig_access_count;
141 ADIO_Offset min_st_offset;
142 ADIO_Offset fd_size;
143 ADIO_Offset *fd_start;
144 ADIO_Offset *fd_end;
145 ADIOI_Access *others_req;
146 int iter;
147 MPI_Aint buftype_extent;
148 int *buf_idx;
149
150 /* stack variables */
151 int nprocs_recv;
152 int nprocs_send;
153 char **recv_buf;
154
155 /* next function to be called */
156 void (*next_fn)(ADIOI_NBC_Request *, int *);
157 };
158
159
160 void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
161 *flat_buf, char **recv_buf, ADIO_Offset
162 *offset_list, ADIO_Offset *len_list,
163 unsigned *recv_size,
164 MPI_Request *requests, MPI_Status *statuses,
165 int *recd_from_proc, int nprocs,
166 int contig_access_count,
167 ADIO_Offset min_st_offset,
168 ADIO_Offset fd_size, ADIO_Offset *fd_start,
169 ADIO_Offset *fd_end,
170 MPI_Aint buftype_extent);
171
172 /* prototypes of functions used for nonblocking collective reads only. */
173 static void ADIOI_GEN_IreadStridedColl_inter(ADIOI_NBC_Request *, int *);
174 static void ADIOI_GEN_IreadStridedColl_indio(ADIOI_NBC_Request *, int *);
175 static void ADIOI_GEN_IreadStridedColl_read(ADIOI_NBC_Request *, int *);
176 static void ADIOI_GEN_IreadStridedColl_free(ADIOI_NBC_Request *, int *);
177 static void ADIOI_GEN_IreadStridedColl_fini(ADIOI_NBC_Request *, int *);
178
179 static void ADIOI_Iread_and_exch(ADIOI_NBC_Request *, int *);
180 static void ADIOI_Iread_and_exch_l1_begin(ADIOI_NBC_Request *, int *);
181 static void ADIOI_Iread_and_exch_l1_end(ADIOI_NBC_Request *, int *);
182 static void ADIOI_Iread_and_exch_reset(ADIOI_NBC_Request *, int *);
183 static void ADIOI_Iread_and_exch_l2_begin(ADIOI_NBC_Request *, int *);
184 static void ADIOI_Iread_and_exch_l2_end(ADIOI_NBC_Request *, int *);
185 static void ADIOI_Iread_and_exch_fini(ADIOI_NBC_Request *, int *);
186
187 static void ADIOI_R_Iexchange_data(ADIOI_NBC_Request *, int *);
188 static void ADIOI_R_Iexchange_data_recv(ADIOI_NBC_Request *, int *);
189 static void ADIOI_R_Iexchange_data_fill(ADIOI_NBC_Request *, int *);
190 static void ADIOI_R_Iexchange_data_fini(ADIOI_NBC_Request *, int *);
191
192 static MPIX_Grequest_class ADIOI_GEN_greq_class = 0;
193 static int ADIOI_GEN_irc_query_fn(void *extra_state, MPI_Status *status);
194 static int ADIOI_GEN_irc_free_fn(void *extra_state);
195 static int ADIOI_GEN_irc_poll_fn(void *extra_state, MPI_Status *status);
196 static int ADIOI_GEN_irc_wait_fn(int count, void **array_of_states,
197 double timeout, MPI_Status *status);
198
199
200 /* Nonblocking version of ADIOI_GEN_ReadStridedColl() */
ADIOI_GEN_IreadStridedColl(ADIO_File fd,void * buf,int count,MPI_Datatype datatype,int file_ptr_type,ADIO_Offset offset,MPI_Request * request,int * error_code)201 void ADIOI_GEN_IreadStridedColl(ADIO_File fd, void *buf, int count,
202 MPI_Datatype datatype, int file_ptr_type,
203 ADIO_Offset offset, MPI_Request *request,
204 int *error_code)
205 {
206 /* Uses a generalized version of the extended two-phase method described
207 in "An Extended Two-Phase Method for Accessing Sections of
208 Out-of-Core Arrays", Rajeev Thakur and Alok Choudhary,
209 Scientific Programming, (5)4:301--317, Winter 1996.
210 http://www.mcs.anl.gov/home/thakur/ext2ph.ps */
211
212 ADIOI_NBC_Request *nbc_req = NULL;
213 ADIOI_GEN_IreadStridedColl_vars *vars = NULL;
214 int nprocs, myrank;
215 #ifdef RDCOLL_DEBUG
216 int i;
217 #endif
218
219 /* FIXME: need an implementation of ADIOI_IOIstridedColl
220 if (fd->hints->cb_pfr != ADIOI_HINT_DISABLE) {
221 ADIOI_IOIstridedColl(fd, buf, count, ADIOI_READ, datatype,
222 file_ptr_type, offset, request, error_code);
223 return;
224 }
225 */
226
227 /* top-level struct keeping the status of function progress */
228 nbc_req = (ADIOI_NBC_Request *)ADIOI_Calloc(1, sizeof(ADIOI_NBC_Request));
229 nbc_req->rdwr = ADIOI_READ;
230
231 /* create a generalized request */
232 if (ADIOI_GEN_greq_class == 0) {
233 MPIX_Grequest_class_create(ADIOI_GEN_irc_query_fn,
234 ADIOI_GEN_irc_free_fn, MPIU_Greq_cancel_fn,
235 ADIOI_GEN_irc_poll_fn, ADIOI_GEN_irc_wait_fn,
236 &ADIOI_GEN_greq_class);
237 }
238 MPIX_Grequest_class_allocate(ADIOI_GEN_greq_class, nbc_req, request);
239 memcpy(&nbc_req->req, request, sizeof(MPI_Request));
240
241 /* create a struct for parameters and variables */
242 vars = (ADIOI_GEN_IreadStridedColl_vars *)ADIOI_Calloc(
243 1, sizeof(ADIOI_GEN_IreadStridedColl_vars));
244 nbc_req->data.rd.rsc_vars = vars;
245
246 /* save the parameters */
247 vars->fd = fd;
248 vars->buf = buf;
249 vars->count = count;
250 vars->datatype = datatype;
251 vars->file_ptr_type = file_ptr_type;
252 vars->offset = offset;
253
254 MPI_Comm_size(fd->comm, &nprocs);
255 MPI_Comm_rank(fd->comm, &myrank);
256 vars->nprocs = nprocs;
257 vars->myrank = myrank;
258
259 /* number of aggregators, cb_nodes, is stored in the hints */
260 vars->nprocs_for_coll = fd->hints->cb_nodes;
261 vars->orig_fp = fd->fp_ind;
262
263 /* only check for interleaving if cb_read isn't disabled */
264 if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
265 /* For this process's request, calculate the list of offsets and
266 lengths in the file and determine the start and end offsets. */
267
268 /* Note: end_offset points to the last byte-offset that will be accessed.
269 e.g., if start_offset=0 and 100 bytes to be read, end_offset=99*/
270
271 ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
272 &vars->offset_list, &vars->len_list,
273 &vars->start_offset, &vars->end_offset,
274 &vars->contig_access_count);
275
276 #ifdef RDCOLL_DEBUG
277 for (i = 0; i < vars->contig_access_count; i++) {
278 DBG_FPRINTF(stderr, "rank %d off %lld len %lld\n",
279 myrank, vars->offset_list[i], vars->len_list[i]);
280 }
281 #endif
282
283 /* each process communicates its start and end offsets to other
284 processes. The result is an array each of start and end offsets
285 stored in order of process rank. */
286
287 vars->st_offsets = (ADIO_Offset *)ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
288 vars->end_offsets = (ADIO_Offset *)ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
289
290 *error_code = MPI_Iallgather(&vars->start_offset, 1, ADIO_OFFSET,
291 vars->st_offsets, 1, ADIO_OFFSET,
292 fd->comm, &vars->req_offset[0]);
293 if (*error_code != MPI_SUCCESS) return;
294 *error_code = MPI_Iallgather(&vars->end_offset, 1, ADIO_OFFSET,
295 vars->end_offsets, 1, ADIO_OFFSET,
296 fd->comm, &vars->req_offset[1]);
297
298 nbc_req->data.rd.state = ADIOI_IRC_STATE_GEN_IREADSTRIDEDCOLL;
299 return;
300 }
301
302 ADIOI_GEN_IreadStridedColl_indio(nbc_req, error_code);
303 }
304
ADIOI_GEN_IreadStridedColl_inter(ADIOI_NBC_Request * nbc_req,int * error_code)305 static void ADIOI_GEN_IreadStridedColl_inter(ADIOI_NBC_Request *nbc_req,
306 int *error_code)
307 {
308 ADIOI_GEN_IreadStridedColl_vars *vars = nbc_req->data.rd.rsc_vars;
309 int nprocs = vars->nprocs;
310 ADIO_Offset *st_offsets = vars->st_offsets;
311 ADIO_Offset *end_offsets = vars->end_offsets;
312 int i, interleave_count = 0;
313
314 /* are the accesses of different processes interleaved? */
315 for (i = 1; i < nprocs; i++)
316 if ((st_offsets[i] < end_offsets[i-1]) &&
317 (st_offsets[i] <= end_offsets[i]))
318 interleave_count++;
319 /* This is a rudimentary check for interleaving, but should suffice
320 for the moment. */
321
322 vars->interleave_count = interleave_count;
323
324 ADIOI_GEN_IreadStridedColl_indio(nbc_req, error_code);
325 }
326
ADIOI_GEN_IreadStridedColl_indio(ADIOI_NBC_Request * nbc_req,int * error_code)327 static void ADIOI_GEN_IreadStridedColl_indio(ADIOI_NBC_Request *nbc_req,
328 int *error_code)
329 {
330 ADIOI_GEN_IreadStridedColl_vars *vars = nbc_req->data.rd.rsc_vars;
331 ADIOI_Icalc_others_req_vars *cor_vars = NULL;
332 ADIO_File fd = vars->fd;
333 void *buf;
334 int count, file_ptr_type;
335 MPI_Datatype datatype = vars->datatype;
336 ADIO_Offset offset;
337 int filetype_is_contig;
338 ADIO_Offset off;
339 int nprocs;
340
341 ADIOI_Datatype_iscontig(datatype, &vars->buftype_is_contig);
342
343 if (fd->hints->cb_read == ADIOI_HINT_DISABLE
344 || (!vars->interleave_count && (fd->hints->cb_read == ADIOI_HINT_AUTO)))
345 {
346 buf = vars->buf;
347 count = vars->count;
348 file_ptr_type = vars->file_ptr_type;
349 offset = vars->offset;
350
351 /* don't do aggregation */
352 if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
353 ADIOI_Free(vars->offset_list);
354 ADIOI_Free(vars->len_list);
355 ADIOI_Free(vars->st_offsets);
356 ADIOI_Free(vars->end_offsets);
357 }
358
359 fd->fp_ind = vars->orig_fp;
360 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
361
362 #if defined(ROMIO_RUN_ON_LINUX) && !defined(HAVE_AIO_LITE_H)
363 /* NOTE: This is currently a workaround to avoid weird errors, e.g.,
364 * stack fault, occurred on Linux. When the host OS is Linux and
365 * aio-lite is not used, a blocking ADIO function is used here.
366 * See https://trac.mpich.org/projects/mpich/ticket/2201. */
367 MPI_Status status;
368 if (vars->buftype_is_contig && filetype_is_contig) {
369 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
370 off = fd->disp + (fd->etype_size) * offset;
371 ADIO_ReadContig(fd, buf, count, datatype, ADIO_EXPLICIT_OFFSET,
372 off, &status, error_code);
373 }
374 else ADIO_ReadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
375 0, &status, error_code);
376 }
377 else {
378 ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type,
379 offset, &status, error_code);
380 }
381 ADIOI_GEN_IreadStridedColl_fini(nbc_req, error_code);
382 #else
383 if (vars->buftype_is_contig && filetype_is_contig) {
384 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
385 off = fd->disp + (fd->etype_size) * offset;
386 ADIO_IreadContig(fd, buf, count, datatype, ADIO_EXPLICIT_OFFSET,
387 off, &vars->req_ind_io, error_code);
388 }
389 else ADIO_IreadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
390 0, &vars->req_ind_io, error_code);
391 }
392 else {
393 ADIO_IreadStrided(fd, buf, count, datatype, file_ptr_type,
394 offset, &vars->req_ind_io, error_code);
395 }
396
397 nbc_req->data.rd.state = ADIOI_IRC_STATE_GEN_IREADSTRIDEDCOLL_INDIO;
398 #endif
399 return;
400 }
401
402 nprocs = vars->nprocs;
403
404 /* We're going to perform aggregation of I/O. Here we call
405 * ADIOI_Calc_file_domains() to determine what processes will handle I/O
406 * to what regions. We pass nprocs_for_coll into this function; it is
407 * used to determine how many processes will perform I/O, which is also
408 * the number of regions into which the range of bytes must be divided.
409 * These regions are called "file domains", or FDs.
410 *
411 * When this function returns, fd_start, fd_end, fd_size, and
412 * min_st_offset will be filled in. fd_start holds the starting byte
413 * location for each file domain. fd_end holds the ending byte location.
414 * min_st_offset holds the minimum byte location that will be accessed.
415 *
416 * Both fd_start[] and fd_end[] are indexed by an aggregator number; this
417 * needs to be mapped to an actual rank in the communicator later.
418 *
419 */
420 ADIOI_Calc_file_domains(vars->st_offsets, vars->end_offsets, nprocs,
421 vars->nprocs_for_coll, &vars->min_st_offset,
422 &vars->fd_start, &vars->fd_end,
423 fd->hints->min_fdomain_size, &vars->fd_size,
424 fd->hints->striping_unit);
425
426 /* calculate where the portions of the access requests of this process
427 * are located in terms of the file domains. this could be on the same
428 * process or on other processes. this function fills in:
429 * count_my_req_procs - number of processes (including this one) for which
430 * this process has requests in their file domain
431 * count_my_req_per_proc - count of requests for each process, indexed
432 * by rank of the process
433 * my_req[] - array of data structures describing the requests to be
434 * performed by each process (including self). indexed by rank.
435 * buf_idx[] - array of locations into which data can be directly moved;
436 * this is only valid for contiguous buffer case
437 */
438 ADIOI_Calc_my_req(fd, vars->offset_list, vars->len_list,
439 vars->contig_access_count, vars->min_st_offset,
440 vars->fd_start, vars->fd_end, vars->fd_size,
441 nprocs, &vars->count_my_req_procs,
442 &vars->count_my_req_per_proc, &vars->my_req,
443 &vars->buf_idx);
444
445 /* perform a collective communication in order to distribute the
446 * data calculated above. fills in the following:
447 * count_others_req_procs - number of processes (including this
448 * one) which have requests in this process's file domain.
449 * count_others_req_per_proc[] - number of separate contiguous
450 * requests from proc i lie in this process's file domain.
451 */
452
453 cor_vars = (ADIOI_Icalc_others_req_vars *)ADIOI_Calloc(
454 1, sizeof(ADIOI_Icalc_others_req_vars));
455 nbc_req->cor_vars = cor_vars;
456 cor_vars->fd = vars->fd;
457 cor_vars->count_my_req_procs = vars->count_my_req_procs;
458 cor_vars->count_my_req_per_proc = vars->count_my_req_per_proc;
459 cor_vars->my_req = vars->my_req;
460 cor_vars->nprocs = vars->nprocs;
461 cor_vars->myrank = vars->myrank;
462 cor_vars->count_others_req_procs_ptr = &vars->count_others_req_procs;
463 cor_vars->others_req_ptr = &vars->others_req;
464 cor_vars->next_fn = ADIOI_GEN_IreadStridedColl_read;
465
466 ADIOI_Icalc_others_req(nbc_req, error_code);
467 }
468
ADIOI_GEN_IreadStridedColl_read(ADIOI_NBC_Request * nbc_req,int * error_code)469 static void ADIOI_GEN_IreadStridedColl_read(ADIOI_NBC_Request *nbc_req,
470 int *error_code)
471 {
472 ADIOI_GEN_IreadStridedColl_vars *vars = nbc_req->data.rd.rsc_vars;
473 ADIOI_Iread_and_exch_vars *rae_vars = NULL;
474 ADIOI_Access *my_req = vars->my_req;
475 int nprocs = vars->nprocs;
476 int i;
477
478 /* my_req[] and count_my_req_per_proc aren't needed at this point, so
479 * let's free the memory
480 */
481 ADIOI_Free(vars->count_my_req_per_proc);
482 for (i = 0; i < nprocs; i++) {
483 if (my_req[i].count) {
484 ADIOI_Free(my_req[i].offsets);
485 ADIOI_Free(my_req[i].lens);
486 }
487 }
488 ADIOI_Free(my_req);
489
490 /* read data in sizes of no more than ADIOI_Coll_bufsize,
491 * communicate, and fill user buf.
492 */
493 rae_vars = (ADIOI_Iread_and_exch_vars *)ADIOI_Calloc(
494 1, sizeof(ADIOI_Iread_and_exch_vars));
495 nbc_req->data.rd.rae_vars = rae_vars;
496 rae_vars->fd = vars->fd;
497 rae_vars->buf = vars->buf;
498 rae_vars->datatype = vars->datatype;
499 rae_vars->nprocs = vars->nprocs;
500 rae_vars->myrank = vars->myrank;
501 rae_vars->others_req = vars->others_req;
502 rae_vars->offset_list = vars->offset_list;
503 rae_vars->len_list = vars->len_list;
504 rae_vars->contig_access_count = vars->contig_access_count;
505 rae_vars->min_st_offset = vars->min_st_offset;
506 rae_vars->fd_size = vars->fd_size;
507 rae_vars->fd_start = vars->fd_start;
508 rae_vars->fd_end = vars->fd_end;
509 rae_vars->buf_idx = vars->buf_idx;
510 rae_vars->next_fn = ADIOI_GEN_IreadStridedColl_free;
511
512 ADIOI_Iread_and_exch(nbc_req, error_code);
513 }
514
ADIOI_GEN_IreadStridedColl_free(ADIOI_NBC_Request * nbc_req,int * error_code)515 static void ADIOI_GEN_IreadStridedColl_free(ADIOI_NBC_Request *nbc_req,
516 int *error_code)
517 {
518 ADIOI_GEN_IreadStridedColl_vars *vars = nbc_req->data.rd.rsc_vars;
519 ADIO_File fd = vars->fd;
520 MPI_Datatype datatype = vars->datatype;
521 ADIOI_Access *others_req = vars->others_req;
522 int nprocs = vars->nprocs;
523 int i;
524
525 if (!vars->buftype_is_contig) ADIOI_Delete_flattened(datatype);
526
527 /* free all memory allocated for collective I/O */
528 for (i = 0; i < nprocs; i++) {
529 if (others_req[i].count) {
530 ADIOI_Free(others_req[i].offsets);
531 ADIOI_Free(others_req[i].lens);
532 ADIOI_Free(others_req[i].mem_ptrs);
533 }
534 }
535 ADIOI_Free(others_req);
536
537 ADIOI_Free(vars->buf_idx);
538 ADIOI_Free(vars->offset_list);
539 ADIOI_Free(vars->len_list);
540 ADIOI_Free(vars->st_offsets);
541 ADIOI_Free(vars->end_offsets);
542 ADIOI_Free(vars->fd_start);
543 ADIOI_Free(vars->fd_end);
544
545 fd->fp_sys_posn = -1; /* set it to null. */
546
547 ADIOI_GEN_IreadStridedColl_fini(nbc_req, error_code);
548 }
549
ADIOI_GEN_IreadStridedColl_fini(ADIOI_NBC_Request * nbc_req,int * error_code)550 static void ADIOI_GEN_IreadStridedColl_fini(ADIOI_NBC_Request *nbc_req,
551 int *error_code)
552 {
553 ADIOI_GEN_IreadStridedColl_vars *vars = nbc_req->data.rd.rsc_vars;
554 MPI_Count size;
555
556 /* This is a temporary way of filling in status. The right way is to
557 keep track of how much data was actually read and placed in buf
558 during collective I/O. */
559 MPI_Type_size_x(vars->datatype, &size);
560 nbc_req->nbytes = size * vars->count;
561
562 /* free the struct for parameters and variables */
563 if (nbc_req->data.rd.rsc_vars) {
564 ADIOI_Free(nbc_req->data.rd.rsc_vars);
565 nbc_req->data.rd.rsc_vars = NULL;
566 }
567
568 /* make the request complete */
569 *error_code = MPI_Grequest_complete(nbc_req->req);
570 nbc_req->data.rd.state = ADIOI_IRC_STATE_COMPLETE;
571 }
572
573
ADIOI_Iread_and_exch(ADIOI_NBC_Request * nbc_req,int * error_code)574 static void ADIOI_Iread_and_exch(ADIOI_NBC_Request *nbc_req, int *error_code)
575 {
576 ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
577 ADIO_File fd = vars->fd;
578 MPI_Datatype datatype = vars->datatype;
579 int nprocs = vars->nprocs;
580 MPI_Aint lb;
581 ADIOI_Access *others_req = vars->others_req;
582
583 /* Read in sizes of no more than coll_bufsize, an info parameter.
584 Send data to appropriate processes.
585 Place recd. data in user buf.
586 The idea is to reduce the amount of extra memory required for
587 collective I/O. If all data were read all at once, which is much
588 easier, it would require temp space more than the size of user_buf,
589 which is often unacceptable. For example, to read a distributed
590 array from a file, where each local array is 8Mbytes, requiring
591 at least another 8Mbytes of temp space is unacceptable. */
592
593 int i, j;
594 ADIO_Offset st_loc = -1, end_loc = -1;
595 int coll_bufsize;
596
597 *error_code = MPI_SUCCESS; /* changed below if error */
598 /* only I/O errors are currently reported */
599
600 /* calculate the number of reads of size coll_bufsize
601 to be done by each process and the max among all processes.
602 That gives the no. of communication phases as well.
603 coll_bufsize is obtained from the hints object. */
604
605 coll_bufsize = fd->hints->cb_buffer_size;
606 vars->coll_bufsize = coll_bufsize;
607
608 /* grab some initial values for st_loc and end_loc */
609 for (i = 0; i < nprocs; i++) {
610 if (others_req[i].count) {
611 st_loc = others_req[i].offsets[0];
612 end_loc = others_req[i].offsets[0];
613 break;
614 }
615 }
616
617 /* now find the real values */
618 for (i = 0; i < nprocs; i++)
619 for (j = 0; j < others_req[i].count; j++) {
620 st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
621 end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
622 + others_req[i].lens[j] - 1));
623 }
624
625 vars->st_loc = st_loc;
626 vars->end_loc = end_loc;
627
628 /* calculate ntimes, the number of times this process must perform I/O
629 * operations in order to complete all the requests it has received.
630 * the need for multiple I/O operations comes from the restriction that
631 * we only use coll_bufsize bytes of memory for internal buffering.
632 */
633 if ((st_loc == -1) && (end_loc == -1)) {
634 /* this process does no I/O. */
635 vars->ntimes = 0;
636 }
637 else {
638 /* ntimes=ceiling_div(end_loc - st_loc + 1, coll_bufsize)*/
639 vars->ntimes = (int)((end_loc - st_loc + coll_bufsize) / coll_bufsize);
640 }
641
642 *error_code = MPI_Iallreduce(&vars->ntimes, &vars->max_ntimes, 1, MPI_INT,
643 MPI_MAX, fd->comm, &vars->req1);
644
645 vars->read_buf = fd->io_buf; /* Allocated at open time */
646
647 vars->curr_offlen_ptr = (int *)ADIOI_Calloc(nprocs, sizeof(int));
648 /* its use is explained below. calloc initializes to 0. */
649
650 vars->count = (int *)ADIOI_Malloc(nprocs * sizeof(int));
651 /* to store count of how many off-len pairs per proc are satisfied
652 in an iteration. */
653
654 vars->partial_send = (int *)ADIOI_Calloc(nprocs, sizeof(int));
655 /* if only a portion of the last off-len pair is sent to a process
656 in a particular iteration, the length sent is stored here.
657 calloc initializes to 0. */
658
659 vars->send_size = (int *)ADIOI_Malloc(nprocs * sizeof(int));
660 /* total size of data to be sent to each proc. in an iteration */
661
662 vars->recv_size = (int *)ADIOI_Malloc(nprocs * sizeof(int));
663 /* total size of data to be recd. from each proc. in an iteration.
664 Of size nprocs so that I can use MPI_Alltoall later. */
665
666 vars->recd_from_proc = (int *)ADIOI_Calloc(nprocs, sizeof(int));
667 /* amount of data recd. so far from each proc. Used in
668 ADIOI_Fill_user_buffer. initialized to 0 here. */
669
670 vars->start_pos = (int *)ADIOI_Malloc(nprocs*sizeof(int));
671 /* used to store the starting value of curr_offlen_ptr[i] in
672 this iteration */
673
674 ADIOI_Datatype_iscontig(datatype, &vars->buftype_is_contig);
675 if (!vars->buftype_is_contig) {
676 vars->flat_buf = ADIOI_Flatten_and_find(datatype);
677 }
678 MPI_Type_get_extent(datatype, &lb, &vars->buftype_extent);
679
680 vars->done = 0;
681 vars->off = st_loc;
682 vars->for_curr_iter = vars->for_next_iter = 0;
683
684 /* set the state to wait until MPI_Ialltoall finishes. */
685 nbc_req->data.rd.state = ADIOI_IRC_STATE_IREAD_AND_EXCH;
686 }
687
ADIOI_Iread_and_exch_l1_begin(ADIOI_NBC_Request * nbc_req,int * error_code)688 static void ADIOI_Iread_and_exch_l1_begin(ADIOI_NBC_Request *nbc_req,
689 int *error_code)
690 {
691 ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
692 ADIO_File fd;
693 int nprocs;
694 ADIOI_Access *others_req;
695
696 int i, j;
697 ADIO_Offset real_off, req_off;
698 char *read_buf;
699 int *curr_offlen_ptr, *count, *send_size;
700 int *partial_send, *start_pos;
701 ADIO_Offset size, real_size, for_next_iter;
702 int req_len, flag;
703
704 ADIOI_R_Iexchange_data_vars *red_vars = NULL;
705
706 /* loop exit condition */
707 if (vars->m >= vars->ntimes) {
708 ADIOI_Iread_and_exch_reset(nbc_req, error_code);
709 return;
710 }
711
712 fd = vars->fd;
713 nprocs = vars->nprocs;
714 others_req = vars->others_req;
715
716 read_buf = vars->read_buf;
717 curr_offlen_ptr = vars->curr_offlen_ptr;
718 count = vars->count;
719 send_size = vars->send_size;
720 partial_send = vars->partial_send;
721 start_pos = vars->start_pos;
722
723 /* read buf of size coll_bufsize (or less) */
724 /* go through all others_req and check if any are satisfied
725 by the current read */
726
727 /* since MPI guarantees that displacements in filetypes are in
728 monotonically nondecreasing order, I can maintain a pointer
729 (curr_offlen_ptr) to
730 current off-len pair for each process in others_req and scan
731 further only from there. There is still a problem of filetypes
732 such as: (1, 2, 3 are not process nos. They are just numbers for
733 three chunks of data, specified by a filetype.)
734
735 1 -------!--
736 2 -----!----
737 3 --!-----
738
739 where ! indicates where the current read_size limitation cuts
740 through the filetype. I resolve this by reading up to !, but
741 filling the communication buffer only for 1. I copy the portion
742 left over for 2 into a tmp_buf for use in the next
743 iteration. i.e., 2 and 3 will be satisfied in the next
744 iteration. This simplifies filling in the user's buf at the
745 other end, as only one off-len pair with incomplete data
746 will be sent. I also don't need to send the individual
747 offsets and lens along with the data, as the data is being
748 sent in a particular order. */
749
750 /* off = start offset in the file for the data actually read in
751 this iteration
752 size = size of data read corresponding to off
753 real_off = off minus whatever data was retained in memory from
754 previous iteration for cases like 2, 3 illustrated above
755 real_size = size plus the extra corresponding to real_off
756 req_off = off in file for a particular contiguous request
757 minus what was satisfied in previous iteration
758 req_size = size corresponding to req_off */
759
760 size = ADIOI_MIN((unsigned)vars->coll_bufsize,
761 vars->end_loc - vars->st_loc + 1 - vars->done);
762 real_off = vars->off - vars->for_curr_iter;
763 real_size = size + vars->for_curr_iter;
764
765 vars->size = size;
766 vars->real_size = real_size;
767
768 for (i = 0; i < nprocs; i++) count[i] = send_size[i] = 0;
769 for_next_iter = 0;
770
771 for (i = 0; i < nprocs; i++) {
772 #ifdef RDCOLL_DEBUG
773 DBG_FPRINTF(stderr, "rank %d, i %d, others_count %d\n",
774 vars->myrank, i, others_req[i].count);
775 #endif
776 if (others_req[i].count) {
777 start_pos[i] = curr_offlen_ptr[i];
778 for (j = curr_offlen_ptr[i]; j < others_req[i].count; j++) {
779 if (partial_send[i]) {
780 /* this request may have been partially
781 satisfied in the previous iteration. */
782 req_off = others_req[i].offsets[j] + partial_send[i];
783 req_len = others_req[i].lens[j] - partial_send[i];
784 partial_send[i] = 0;
785 /* modify the off-len pair to reflect this change */
786 others_req[i].offsets[j] = req_off;
787 others_req[i].lens[j] = req_len;
788 }
789 else {
790 req_off = others_req[i].offsets[j];
791 req_len = others_req[i].lens[j];
792 }
793 if (req_off < real_off + real_size) {
794 count[i]++;
795 ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)read_buf) + req_off - real_off) == (ADIO_Offset)(MPIU_Upint)(read_buf + req_off - real_off));
796 MPI_Get_address(read_buf + req_off - real_off,
797 &(others_req[i].mem_ptrs[j]));
798 ADIOI_Assert((real_off + real_size - req_off) == (int)(real_off + real_size - req_off));
799 send_size[i] += (int)(ADIOI_MIN(real_off + real_size - req_off,
800 (ADIO_Offset)(unsigned)req_len));
801
802 if (real_off + real_size - req_off < (ADIO_Offset)(unsigned)req_len) {
803 partial_send[i] = (int)(real_off + real_size - req_off);
804 if ((j+1 < others_req[i].count) &&
805 (others_req[i].offsets[j+1] < real_off + real_size)) {
806 /* this is the case illustrated in the
807 figure above. */
808 for_next_iter = ADIOI_MAX(for_next_iter,
809 real_off + real_size - others_req[i].offsets[j+1]);
810 /* max because it must cover requests
811 from different processes */
812 }
813 break;
814 }
815 }
816 else break;
817 }
818 curr_offlen_ptr[i] = j;
819 }
820 }
821 vars->for_next_iter = for_next_iter;
822
823 flag = 0;
824 for (i = 0; i < nprocs; i++)
825 if (count[i]) flag = 1;
826
827 /* create a struct for ADIOI_R_Iexchange_data() */
828 red_vars = (ADIOI_R_Iexchange_data_vars *)ADIOI_Calloc(
829 1, sizeof(ADIOI_R_Iexchange_data_vars));
830 nbc_req->data.rd.red_vars = red_vars;
831 red_vars->fd = vars->fd;
832 red_vars->buf = vars->buf;
833 red_vars->flat_buf = vars->flat_buf;
834 red_vars->offset_list = vars->offset_list;
835 red_vars->len_list = vars->len_list;
836 red_vars->send_size = vars->send_size;
837 red_vars->recv_size = vars->recv_size;
838 red_vars->count = vars->count;
839 red_vars->start_pos = vars->start_pos;
840 red_vars->partial_send = vars->partial_send;
841 red_vars->recd_from_proc = vars->recd_from_proc;
842 red_vars->nprocs = vars->nprocs;
843 red_vars->myrank = vars->myrank;
844 red_vars->buftype_is_contig = vars->buftype_is_contig;
845 red_vars->contig_access_count = vars->contig_access_count;
846 red_vars->min_st_offset = vars->min_st_offset;
847 red_vars->fd_size = vars->fd_size;
848 red_vars->fd_start = vars->fd_start;
849 red_vars->fd_end = vars->fd_end;
850 red_vars->others_req = vars->others_req;
851 red_vars->iter = vars->m;
852 red_vars->buftype_extent = vars->buftype_extent;
853 red_vars->buf_idx = vars->buf_idx;
854 red_vars->next_fn = ADIOI_Iread_and_exch_l1_end;
855
856 if (flag) {
857 ADIOI_Assert(size == (int)size);
858 #if defined(ROMIO_RUN_ON_LINUX) && !defined(HAVE_AIO_LITE_H)
859 MPI_Status status;
860 ADIO_ReadContig(fd, read_buf+vars->for_curr_iter, (int)size,
861 MPI_BYTE, ADIO_EXPLICIT_OFFSET, vars->off,
862 &status, error_code);
863 #else
864 ADIO_IreadContig(fd, read_buf+vars->for_curr_iter, (int)size,
865 MPI_BYTE, ADIO_EXPLICIT_OFFSET, vars->off,
866 &vars->req2, error_code);
867
868 nbc_req->data.rd.state = ADIOI_IRC_STATE_IREAD_AND_EXCH_L1_BEGIN;
869 return;
870 #endif
871 }
872
873 ADIOI_R_Iexchange_data(nbc_req, error_code);
874 }
875
ADIOI_Iread_and_exch_l1_end(ADIOI_NBC_Request * nbc_req,int * error_code)876 static void ADIOI_Iread_and_exch_l1_end(ADIOI_NBC_Request *nbc_req,
877 int *error_code)
878 {
879 ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
880 ADIO_File fd = vars->fd;
881 ADIO_Offset size = vars->size;
882 ADIO_Offset real_size = vars->real_size;
883 ADIO_Offset for_next_iter = vars->for_next_iter;
884 char *read_buf = vars->read_buf;
885 char *tmp_buf;
886
887 vars->for_curr_iter = for_next_iter;
888
889 if (for_next_iter) {
890 tmp_buf = (char *)ADIOI_Malloc(for_next_iter);
891 ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)read_buf)+real_size-for_next_iter) == (ADIO_Offset)(MPIU_Upint)(read_buf+real_size-for_next_iter));
892 ADIOI_Assert((for_next_iter+vars->coll_bufsize) == (size_t)(for_next_iter+vars->coll_bufsize));
893 memcpy(tmp_buf, read_buf+real_size-for_next_iter, for_next_iter);
894 ADIOI_Free(fd->io_buf);
895 fd->io_buf = (char *)ADIOI_Malloc(for_next_iter+vars->coll_bufsize);
896 memcpy(fd->io_buf, tmp_buf, for_next_iter);
897 vars->read_buf = fd->io_buf;
898 ADIOI_Free(tmp_buf);
899 }
900
901 vars->off += size;
902 vars->done += size;
903
904 /* increment m and go back to the beginning of m loop */
905 vars->m++;
906 ADIOI_Iread_and_exch_l1_begin(nbc_req, error_code);
907 }
908
ADIOI_Iread_and_exch_reset(ADIOI_NBC_Request * nbc_req,int * error_code)909 static void ADIOI_Iread_and_exch_reset(ADIOI_NBC_Request *nbc_req,
910 int *error_code)
911 {
912 ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
913 int nprocs = vars->nprocs;
914 int *count = vars->count;
915 int *send_size = vars->send_size;
916 int i;
917
918 for (i = 0; i < nprocs; i++) count[i] = send_size[i] = 0;
919
920 vars->m = vars->ntimes;
921 ADIOI_Iread_and_exch_l2_begin(nbc_req, error_code);
922 }
923
ADIOI_Iread_and_exch_l2_begin(ADIOI_NBC_Request * nbc_req,int * error_code)924 static void ADIOI_Iread_and_exch_l2_begin(ADIOI_NBC_Request *nbc_req,
925 int *error_code)
926 {
927 ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
928 ADIOI_R_Iexchange_data_vars *red_vars = NULL;
929
930 /* loop exit condition */
931 if (vars->m >= vars->max_ntimes) {
932 ADIOI_Iread_and_exch_fini(nbc_req, error_code);
933 return;
934 }
935
936 /* create a struct for ADIOI_R_Iexchange_data() */
937 red_vars = (ADIOI_R_Iexchange_data_vars *)ADIOI_Calloc(
938 1, sizeof(ADIOI_R_Iexchange_data_vars));
939 nbc_req->data.rd.red_vars = red_vars;
940 red_vars->fd = vars->fd;
941 red_vars->buf = vars->buf;
942 red_vars->flat_buf = vars->flat_buf;
943 red_vars->offset_list = vars->offset_list;
944 red_vars->len_list = vars->len_list;
945 red_vars->send_size = vars->send_size;
946 red_vars->recv_size = vars->recv_size;
947 red_vars->count = vars->count;
948 red_vars->start_pos = vars->start_pos;
949 red_vars->partial_send = vars->partial_send;
950 red_vars->recd_from_proc = vars->recd_from_proc;
951 red_vars->nprocs = vars->nprocs;
952 red_vars->myrank = vars->myrank;
953 red_vars->buftype_is_contig = vars->buftype_is_contig;
954 red_vars->contig_access_count = vars->contig_access_count;
955 red_vars->min_st_offset = vars->min_st_offset;
956 red_vars->fd_size = vars->fd_size;
957 red_vars->fd_start = vars->fd_start;
958 red_vars->fd_end = vars->fd_end;
959 red_vars->others_req = vars->others_req;
960 red_vars->iter = vars->m;
961 red_vars->buftype_extent = vars->buftype_extent;
962 red_vars->buf_idx = vars->buf_idx;
963 red_vars->next_fn = ADIOI_Iread_and_exch_l2_end;
964
965 ADIOI_R_Iexchange_data(nbc_req, error_code);
966 }
967
ADIOI_Iread_and_exch_l2_end(ADIOI_NBC_Request * nbc_req,int * error_code)968 static void ADIOI_Iread_and_exch_l2_end(ADIOI_NBC_Request *nbc_req,
969 int *error_code)
970 {
971 ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
972
973 vars->m++;
974 ADIOI_Iread_and_exch_l2_begin(nbc_req, error_code);
975 }
976
ADIOI_Iread_and_exch_fini(ADIOI_NBC_Request * nbc_req,int * error_code)977 static void ADIOI_Iread_and_exch_fini(ADIOI_NBC_Request *nbc_req, int *error_code)
978 {
979 ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
980 void (*next_fn)(ADIOI_NBC_Request *, int *);
981
982 ADIOI_Free(vars->curr_offlen_ptr);
983 ADIOI_Free(vars->count);
984 ADIOI_Free(vars->partial_send);
985 ADIOI_Free(vars->send_size);
986 ADIOI_Free(vars->recv_size);
987 ADIOI_Free(vars->recd_from_proc);
988 ADIOI_Free(vars->start_pos);
989
990 next_fn = vars->next_fn;
991
992 /* free the struct for parameters and variables */
993 ADIOI_Free(nbc_req->data.rd.rae_vars);
994 nbc_req->data.rd.rae_vars = NULL;
995
996 /* move to the next function */
997 next_fn(nbc_req, error_code);
998 }
999
1000
ADIOI_R_Iexchange_data(ADIOI_NBC_Request * nbc_req,int * error_code)1001 static void ADIOI_R_Iexchange_data(ADIOI_NBC_Request *nbc_req, int *error_code)
1002 {
1003 ADIOI_R_Iexchange_data_vars *vars = nbc_req->data.rd.red_vars;
1004
1005 /* exchange send_size info so that each process knows how much to
1006 receive from whom and how much memory to allocate. */
1007 *error_code = MPI_Ialltoall(vars->send_size, 1, MPI_INT, vars->recv_size, 1,
1008 MPI_INT, vars->fd->comm, &vars->req1);
1009
1010 nbc_req->data.rd.state = ADIOI_IRC_STATE_R_IEXCHANGE_DATA;
1011 }
1012
ADIOI_R_Iexchange_data_recv(ADIOI_NBC_Request * nbc_req,int * error_code)1013 static void ADIOI_R_Iexchange_data_recv(ADIOI_NBC_Request *nbc_req,
1014 int *error_code)
1015 {
1016 ADIOI_R_Iexchange_data_vars *vars = nbc_req->data.rd.red_vars;
1017 ADIO_File fd = vars->fd;
1018 int *send_size = vars->send_size;
1019 int *recv_size = vars->recv_size;
1020 int *count = vars->count;
1021 int *start_pos = vars->start_pos;
1022 int *partial_send = vars->partial_send;
1023 int nprocs = vars->nprocs;
1024 int myrank = vars->myrank;
1025 ADIOI_Access *others_req = vars->others_req;
1026 int iter = vars->iter;
1027 int *buf_idx = vars->buf_idx;
1028
1029 int i, j, k = 0, tmp = 0, nprocs_recv, nprocs_send;
1030 char **recv_buf = NULL;
1031 MPI_Datatype send_type;
1032
1033 nprocs_recv = 0;
1034 for (i = 0; i < nprocs; i++) if (recv_size[i]) nprocs_recv++;
1035 vars->nprocs_recv = nprocs_recv;
1036
1037 nprocs_send = 0;
1038 for (i = 0; i < nprocs; i++) if (send_size[i]) nprocs_send++;
1039 vars->nprocs_send = nprocs_send;
1040
1041 vars->req2 = (MPI_Request *)
1042 ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
1043 /* +1 to avoid a 0-size malloc */
1044
1045 /* post recvs. if buftype_is_contig, data can be directly recd. into
1046 user buf at location given by buf_idx. else use recv_buf. */
1047
1048 #ifdef AGGREGATION_PROFILE
1049 MPE_Log_event (5032, 0, NULL);
1050 #endif
1051
1052 if (vars->buftype_is_contig) {
1053 j = 0;
1054 for (i = 0; i < nprocs; i++)
1055 if (recv_size[i]) {
1056 MPI_Irecv(((char *)vars->buf) + buf_idx[i], recv_size[i],
1057 MPI_BYTE, i, myrank+i+100*iter, fd->comm,
1058 vars->req2 + j);
1059 j++;
1060 buf_idx[i] += recv_size[i];
1061 }
1062 }
1063 else {
1064 /* allocate memory for recv_buf and post receives */
1065 recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char*));
1066 vars->recv_buf = recv_buf;
1067 for (i = 0; i < nprocs; i++)
1068 if (recv_size[i]) recv_buf[i] = (char *)ADIOI_Malloc(recv_size[i]);
1069
1070 j = 0;
1071 for (i = 0; i < nprocs; i++)
1072 if (recv_size[i]) {
1073 MPI_Irecv(recv_buf[i], recv_size[i], MPI_BYTE, i,
1074 myrank+i+100*iter, fd->comm,
1075 vars->req2 + j);
1076 j++;
1077 #ifdef RDCOLL_DEBUG
1078 DBG_FPRINTF(stderr, "node %d, recv_size %d, tag %d \n",
1079 myrank, recv_size[i], myrank+i+100*iter);
1080 #endif
1081 }
1082 }
1083
1084 /* create derived datatypes and send data */
1085
1086 j = 0;
1087 for (i = 0; i < nprocs; i++) {
1088 if (send_size[i]) {
1089 /* take care if the last off-len pair is a partial send */
1090 if (partial_send[i]) {
1091 k = start_pos[i] + count[i] - 1;
1092 tmp = others_req[i].lens[k];
1093 others_req[i].lens[k] = partial_send[i];
1094 }
1095 ADIOI_Type_create_hindexed_x(count[i],
1096 &(others_req[i].lens[start_pos[i]]),
1097 &(others_req[i].mem_ptrs[start_pos[i]]),
1098 MPI_BYTE, &send_type);
1099 /* absolute displacement; use MPI_BOTTOM in send */
1100 MPI_Type_commit(&send_type);
1101 MPI_Isend(MPI_BOTTOM, 1, send_type, i, myrank+i+100*iter,
1102 fd->comm, vars->req2 + nprocs_recv + j);
1103 MPI_Type_free(&send_type);
1104 if (partial_send[i]) others_req[i].lens[k] = tmp;
1105 j++;
1106 }
1107 }
1108
1109 /* wait on the receives */
1110 if (nprocs_recv) {
1111 nbc_req->data.rd.state = ADIOI_IRC_STATE_R_IEXCHANGE_DATA_RECV;
1112 return;
1113 }
1114
1115 ADIOI_R_Iexchange_data_fill(nbc_req, error_code);
1116 }
1117
ADIOI_R_Iexchange_data_fill(ADIOI_NBC_Request * nbc_req,int * error_code)1118 static void ADIOI_R_Iexchange_data_fill(ADIOI_NBC_Request *nbc_req,
1119 int *error_code)
1120 {
1121 ADIOI_R_Iexchange_data_vars *vars = nbc_req->data.rd.red_vars;
1122
1123 if (vars->nprocs_recv) {
1124 /* if noncontiguous, to the copies from the recv buffers */
1125 if (!vars->buftype_is_contig)
1126 ADIOI_Fill_user_buffer(vars->fd, vars->buf, vars->flat_buf,
1127 vars->recv_buf, vars->offset_list, vars->len_list,
1128 (unsigned*)vars->recv_size,
1129 vars->req2, NULL, vars->recd_from_proc,
1130 vars->nprocs, vars->contig_access_count,
1131 vars->min_st_offset, vars->fd_size, vars->fd_start,
1132 vars->fd_end, vars->buftype_extent);
1133 }
1134
1135 nbc_req->data.rd.state = ADIOI_IRC_STATE_R_IEXCHANGE_DATA_FILL;
1136 }
1137
ADIOI_R_Iexchange_data_fini(ADIOI_NBC_Request * nbc_req,int * error_code)1138 static void ADIOI_R_Iexchange_data_fini(ADIOI_NBC_Request *nbc_req, int *error_code)
1139 {
1140 ADIOI_R_Iexchange_data_vars *vars = nbc_req->data.rd.red_vars;
1141 void (*next_fn)(ADIOI_NBC_Request *, int *);
1142 int i;
1143
1144 ADIOI_Free(vars->req2);
1145
1146 if (!vars->buftype_is_contig) {
1147 for (i = 0; i < vars->nprocs; i++)
1148 if (vars->recv_size[i]) ADIOI_Free(vars->recv_buf[i]);
1149 ADIOI_Free(vars->recv_buf);
1150 }
1151 #ifdef AGGREGATION_PROFILE
1152 MPE_Log_event (5033, 0, NULL);
1153 #endif
1154
1155 next_fn = vars->next_fn;
1156
1157 /* free the structure for parameters and variables */
1158 ADIOI_Free(vars);
1159 nbc_req->data.rd.red_vars = NULL;
1160
1161 /* move to the next function */
1162 next_fn(nbc_req, error_code);
1163 }
1164
1165
ADIOI_GEN_irc_query_fn(void * extra_state,MPI_Status * status)1166 static int ADIOI_GEN_irc_query_fn(void *extra_state, MPI_Status *status)
1167 {
1168 ADIOI_NBC_Request *nbc_req;
1169
1170 nbc_req = (ADIOI_NBC_Request *)extra_state;
1171
1172 MPI_Status_set_elements_x(status, MPI_BYTE, nbc_req->nbytes);
1173
1174 /* can never cancel so always true */
1175 MPI_Status_set_cancelled(status, 0);
1176
1177 /* choose not to return a value for this */
1178 status->MPI_SOURCE = MPI_UNDEFINED;
1179 /* tag has no meaning for this generalized request */
1180 status->MPI_TAG = MPI_UNDEFINED;
1181
1182 /* this generalized request never fails */
1183 return MPI_SUCCESS;
1184 }
1185
ADIOI_GEN_irc_free_fn(void * extra_state)1186 static int ADIOI_GEN_irc_free_fn(void *extra_state)
1187 {
1188 ADIOI_NBC_Request *nbc_req;
1189
1190 nbc_req = (ADIOI_NBC_Request *)extra_state;
1191 ADIOI_Free(nbc_req);
1192
1193 return MPI_SUCCESS;
1194 }
1195
ADIOI_GEN_irc_poll_fn(void * extra_state,MPI_Status * status)1196 static int ADIOI_GEN_irc_poll_fn(void *extra_state, MPI_Status *status)
1197 {
1198 ADIOI_NBC_Request *nbc_req;
1199 ADIOI_GEN_IreadStridedColl_vars *rsc_vars = NULL;
1200 ADIOI_Icalc_others_req_vars *cor_vars = NULL;
1201 ADIOI_Iread_and_exch_vars *rae_vars = NULL;
1202 ADIOI_R_Iexchange_data_vars *red_vars = NULL;
1203 int errcode = MPI_SUCCESS;
1204 int flag;
1205
1206 nbc_req = (ADIOI_NBC_Request *)extra_state;
1207
1208 switch (nbc_req->data.rd.state) {
1209 case ADIOI_IRC_STATE_GEN_IREADSTRIDEDCOLL:
1210 rsc_vars = nbc_req->data.rd.rsc_vars;
1211 errcode = MPI_Testall(2, rsc_vars->req_offset, &flag,
1212 MPI_STATUSES_IGNORE);
1213 if (errcode == MPI_SUCCESS && flag) {
1214 ADIOI_GEN_IreadStridedColl_inter(nbc_req, &errcode);
1215 }
1216 break;
1217
1218 case ADIOI_IRC_STATE_GEN_IREADSTRIDEDCOLL_INDIO:
1219 rsc_vars = nbc_req->data.rd.rsc_vars;
1220 errcode = MPI_Test(&rsc_vars->req_ind_io, &flag, MPI_STATUS_IGNORE);
1221 if (errcode == MPI_SUCCESS && flag) {
1222 /* call the last function */
1223 ADIOI_GEN_IreadStridedColl_fini(nbc_req, &errcode);
1224 }
1225 break;
1226
1227 case ADIOI_IRC_STATE_ICALC_OTHERS_REQ:
1228 cor_vars = nbc_req->cor_vars;
1229 errcode = MPI_Test(&cor_vars->req1, &flag, MPI_STATUS_IGNORE);
1230 if (errcode == MPI_SUCCESS && flag) {
1231 ADIOI_Icalc_others_req_main(nbc_req, &errcode);
1232 }
1233 break;
1234
1235 case ADIOI_IRC_STATE_ICALC_OTHERS_REQ_MAIN:
1236 cor_vars = nbc_req->cor_vars;
1237 if (cor_vars->num_req2) {
1238 errcode = MPI_Testall(cor_vars->num_req2, cor_vars->req2,
1239 &flag, MPI_STATUSES_IGNORE);
1240 if (errcode == MPI_SUCCESS && flag) {
1241 ADIOI_Icalc_others_req_fini(nbc_req, &errcode);
1242 }
1243 } else {
1244 ADIOI_Icalc_others_req_fini(nbc_req, &errcode);
1245 }
1246 break;
1247
1248 case ADIOI_IRC_STATE_IREAD_AND_EXCH:
1249 rae_vars = nbc_req->data.rd.rae_vars;
1250 errcode = MPI_Test(&rae_vars->req1, &flag, MPI_STATUS_IGNORE);
1251 if (errcode == MPI_SUCCESS && flag) {
1252 rae_vars->m = 0;
1253 ADIOI_Iread_and_exch_l1_begin(nbc_req, &errcode);
1254 }
1255 break;
1256
1257 case ADIOI_IRC_STATE_IREAD_AND_EXCH_L1_BEGIN:
1258 rae_vars = nbc_req->data.rd.rae_vars;
1259 errcode = MPI_Test(&rae_vars->req2, &flag, MPI_STATUS_IGNORE);
1260 if (errcode == MPI_SUCCESS && flag) {
1261 ADIOI_R_Iexchange_data(nbc_req, &errcode);
1262 }
1263 break;
1264
1265 case ADIOI_IRC_STATE_R_IEXCHANGE_DATA:
1266 red_vars = nbc_req->data.rd.red_vars;
1267 errcode = MPI_Test(&red_vars->req1, &flag, MPI_STATUS_IGNORE);
1268 if (errcode == MPI_SUCCESS && flag) {
1269 ADIOI_R_Iexchange_data_recv(nbc_req, &errcode);
1270 }
1271 break;
1272
1273 case ADIOI_IRC_STATE_R_IEXCHANGE_DATA_RECV:
1274 red_vars = nbc_req->data.rd.red_vars;
1275 errcode = MPI_Testall(red_vars->nprocs_recv, red_vars->req2, &flag,
1276 MPI_STATUSES_IGNORE);
1277 if (errcode == MPI_SUCCESS && flag) {
1278 ADIOI_R_Iexchange_data_fill(nbc_req, &errcode);
1279 }
1280 break;
1281
1282 case ADIOI_IRC_STATE_R_IEXCHANGE_DATA_FILL:
1283 red_vars = nbc_req->data.rd.red_vars;
1284 errcode = MPI_Testall(red_vars->nprocs_send,
1285 red_vars->req2 + red_vars->nprocs_recv,
1286 &flag, MPI_STATUSES_IGNORE);
1287 if (errcode == MPI_SUCCESS && flag) {
1288 ADIOI_R_Iexchange_data_fini(nbc_req, &errcode);
1289 }
1290 break;
1291
1292 default:
1293 break;
1294 }
1295
1296 /* --BEGIN ERROR HANDLING-- */
1297 if (errcode != MPI_SUCCESS) {
1298 errcode = MPIO_Err_create_code(MPI_SUCCESS,
1299 MPIR_ERR_RECOVERABLE,
1300 "ADIOI_GEN_irc_poll_fn", __LINE__,
1301 MPI_ERR_IO, "**mpi_grequest_complete",
1302 0);
1303 }
1304 /* --END ERROR HANDLING-- */
1305
1306 return errcode;
1307 }
1308
1309 /* wait for multiple requests to complete */
ADIOI_GEN_irc_wait_fn(int count,void ** array_of_states,double timeout,MPI_Status * status)1310 static int ADIOI_GEN_irc_wait_fn(int count, void **array_of_states,
1311 double timeout, MPI_Status *status)
1312 {
1313 int i, errcode = MPI_SUCCESS;
1314 double starttime;
1315 ADIOI_NBC_Request **nbc_reqlist;
1316
1317 nbc_reqlist = (ADIOI_NBC_Request **)array_of_states;
1318
1319 starttime = MPI_Wtime();
1320 for (i = 0; i < count ; i++) {
1321 while (nbc_reqlist[i]->data.rd.state != ADIOI_IRC_STATE_COMPLETE) {
1322 errcode = ADIOI_GEN_irc_poll_fn(nbc_reqlist[i], MPI_STATUS_IGNORE);
1323 /* --BEGIN ERROR HANDLING-- */
1324 if (errcode != MPI_SUCCESS) {
1325 errcode = MPIO_Err_create_code(MPI_SUCCESS,
1326 MPIR_ERR_RECOVERABLE,
1327 "ADIOI_GEN_irc_wait_fn",
1328 __LINE__, MPI_ERR_IO,
1329 "**mpi_grequest_complete", 0);
1330 }
1331 /* --END ERROR HANDLING-- */
1332
1333 if ((timeout > 0) && (timeout < (MPI_Wtime() - starttime)))
1334 goto fn_exit;
1335
1336 /* If the progress engine is blocked, we have to yield for another
1337 * thread to be able to unblock the progress engine. */
1338 MPIR_Ext_cs_yield();
1339 }
1340 }
1341
1342 fn_exit:
1343 return errcode;
1344 }
1345
1346 #endif /* HAVE_MPI_GREQUEST_EXTENSIONS */
1347