1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  *  (C) 2014 by Argonne National Laboratory.
4  *      See COPYRIGHT in top-level directory.
5  */
6 
7 #include "adio.h"
8 #include "adio_extern.h"
9 #include "mpiu_greq.h"
10 #include "mpioimpl.h"
11 
12 #ifdef USE_DBG_LOGGING
13   #define RDCOLL_DEBUG 1
14 #endif
15 #ifdef AGGREGATION_PROFILE
16 #include "mpe.h"
17 #endif
18 
19 #ifdef HAVE_MPI_GREQUEST_EXTENSIONS
20 
21 /* ADIOI_GEN_IreadStridedColl */
22 struct ADIOI_GEN_IreadStridedColl_vars {
23     /* requests */
24     MPI_Request req_offset[2];  /* ADIOI_IRC_STATE_GEN_IREADSTRIDEDCOLL */
25     MPI_Request req_ind_io;     /* ADIOI_IRC_STATE_GEN_IREADSTRIDEDCOLL_INDIO */
26 
27     /* parameters */
28     ADIO_File fd;
29     void *buf;
30     int count;
31     MPI_Datatype datatype;
32     int file_ptr_type;
33     ADIO_Offset offset;
34 
35     /* stack variables */
36     ADIOI_Access *my_req;
37     /* array of nprocs structures, one for each other process in
38        whose file domain this process's request lies */
39 
40     ADIOI_Access *others_req;
41     /* array of nprocs structures, one for each other process
42        whose request lies in this process's file domain. */
43 
44     int nprocs;
45     int nprocs_for_coll;
46     int myrank;
47     int contig_access_count;
48     int interleave_count;
49     int buftype_is_contig;
50     int *count_my_req_per_proc;
51     int count_my_req_procs;
52     int count_others_req_procs;
53     ADIO_Offset start_offset;
54     ADIO_Offset end_offset;
55     ADIO_Offset orig_fp;
56     ADIO_Offset fd_size;
57     ADIO_Offset min_st_offset;
58     ADIO_Offset *offset_list;
59     ADIO_Offset *st_offsets;
60     ADIO_Offset *fd_start;
61     ADIO_Offset *fd_end;
62     ADIO_Offset *end_offsets;
63     ADIO_Offset *len_list;
64     int *buf_idx;
65 };
66 
67 /* ADIOI_Iread_and_exch */
68 struct ADIOI_Iread_and_exch_vars {
69     /* requests */
70     MPI_Request req1;   /* ADIOI_IRC_STATE_IREAD_AND_EXCH */
71     MPI_Request req2;   /* ADIOI_IRC_STATE_IREAD_AND_EXCH_L1_BEGIN */
72 
73     /* parameters */
74     ADIO_File fd;
75     void *buf;
76     MPI_Datatype datatype;
77     int nprocs;
78     int myrank;
79     ADIOI_Access *others_req;
80     ADIO_Offset *offset_list;
81     ADIO_Offset *len_list;
82     int contig_access_count;
83     ADIO_Offset min_st_offset;
84     ADIO_Offset fd_size;
85     ADIO_Offset *fd_start;
86     ADIO_Offset *fd_end;
87     int *buf_idx;
88 
89     /* stack variables */
90     int m;
91     int ntimes;
92     int max_ntimes;
93     int buftype_is_contig;
94     ADIO_Offset st_loc;
95     ADIO_Offset end_loc;
96     ADIO_Offset off;
97     ADIO_Offset done;
98     char *read_buf;
99     int *curr_offlen_ptr;
100     int *count;
101     int *send_size;
102     int *recv_size;
103     int *partial_send;
104     int *recd_from_proc;
105     int *start_pos;
106     /* Not convinced end_loc-st_loc couldn't be > int, so make these offsets*/
107     ADIO_Offset size;
108     ADIO_Offset real_size;
109     ADIO_Offset for_curr_iter;
110     ADIO_Offset for_next_iter;
111     ADIOI_Flatlist_node *flat_buf;
112     MPI_Aint buftype_extent;
113     int coll_bufsize;
114 
115     /* next function to be called */
116     void (*next_fn)(ADIOI_NBC_Request *, int *);
117 };
118 
119 /* ADIOI_R_Iexchange_data */
120 struct ADIOI_R_Iexchange_data_vars {
121     /* requests */
122     MPI_Request req1;   /* ADIOI_IRC_STATE_R_IEXCHANGE_DATA */
123     MPI_Request *req2;  /* ADIOI_IRC_STATE_R_IEXCHANGE_DATA_RECV & FILL */
124 
125     /* parameters */
126     ADIO_File fd;
127     void *buf;
128     ADIOI_Flatlist_node *flat_buf;
129     ADIO_Offset *offset_list;
130     ADIO_Offset *len_list;
131     int *send_size;
132     int *recv_size;
133     int *count;
134     int *start_pos;
135     int *partial_send;
136     int *recd_from_proc;
137     int nprocs;
138     int myrank;
139     int buftype_is_contig;
140     int contig_access_count;
141     ADIO_Offset min_st_offset;
142     ADIO_Offset fd_size;
143     ADIO_Offset *fd_start;
144     ADIO_Offset *fd_end;
145     ADIOI_Access *others_req;
146     int iter;
147     MPI_Aint buftype_extent;
148     int *buf_idx;
149 
150     /* stack variables */
151     int nprocs_recv;
152     int nprocs_send;
153     char **recv_buf;
154 
155     /* next function to be called */
156     void (*next_fn)(ADIOI_NBC_Request *, int *);
157 };
158 
159 
160 void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
161                    *flat_buf, char **recv_buf, ADIO_Offset
162                    *offset_list, ADIO_Offset *len_list,
163                    unsigned *recv_size,
164                    MPI_Request *requests, MPI_Status *statuses,
165                    int *recd_from_proc, int nprocs,
166                    int contig_access_count,
167                    ADIO_Offset min_st_offset,
168                    ADIO_Offset fd_size, ADIO_Offset *fd_start,
169                    ADIO_Offset *fd_end,
170                    MPI_Aint buftype_extent);
171 
172 /* prototypes of functions used for nonblocking collective reads only. */
173 static void ADIOI_GEN_IreadStridedColl_inter(ADIOI_NBC_Request *, int *);
174 static void ADIOI_GEN_IreadStridedColl_indio(ADIOI_NBC_Request *, int *);
175 static void ADIOI_GEN_IreadStridedColl_read(ADIOI_NBC_Request *, int *);
176 static void ADIOI_GEN_IreadStridedColl_free(ADIOI_NBC_Request *, int *);
177 static void ADIOI_GEN_IreadStridedColl_fini(ADIOI_NBC_Request *, int *);
178 
179 static void ADIOI_Iread_and_exch(ADIOI_NBC_Request *, int *);
180 static void ADIOI_Iread_and_exch_l1_begin(ADIOI_NBC_Request *, int *);
181 static void ADIOI_Iread_and_exch_l1_end(ADIOI_NBC_Request *, int *);
182 static void ADIOI_Iread_and_exch_reset(ADIOI_NBC_Request *, int *);
183 static void ADIOI_Iread_and_exch_l2_begin(ADIOI_NBC_Request *, int *);
184 static void ADIOI_Iread_and_exch_l2_end(ADIOI_NBC_Request *, int *);
185 static void ADIOI_Iread_and_exch_fini(ADIOI_NBC_Request *, int *);
186 
187 static void ADIOI_R_Iexchange_data(ADIOI_NBC_Request *, int *);
188 static void ADIOI_R_Iexchange_data_recv(ADIOI_NBC_Request *, int *);
189 static void ADIOI_R_Iexchange_data_fill(ADIOI_NBC_Request *, int *);
190 static void ADIOI_R_Iexchange_data_fini(ADIOI_NBC_Request *, int *);
191 
192 static MPIX_Grequest_class ADIOI_GEN_greq_class = 0;
193 static int ADIOI_GEN_irc_query_fn(void *extra_state, MPI_Status *status);
194 static int ADIOI_GEN_irc_free_fn(void *extra_state);
195 static int ADIOI_GEN_irc_poll_fn(void *extra_state, MPI_Status *status);
196 static int ADIOI_GEN_irc_wait_fn(int count, void **array_of_states,
197                                  double timeout, MPI_Status *status);
198 
199 
200 /* Nonblocking version of ADIOI_GEN_ReadStridedColl() */
ADIOI_GEN_IreadStridedColl(ADIO_File fd,void * buf,int count,MPI_Datatype datatype,int file_ptr_type,ADIO_Offset offset,MPI_Request * request,int * error_code)201 void ADIOI_GEN_IreadStridedColl(ADIO_File fd, void *buf, int count,
202                    MPI_Datatype datatype, int file_ptr_type,
203                    ADIO_Offset offset, MPI_Request *request,
204                    int *error_code)
205 {
206     /* Uses a generalized version of the extended two-phase method described
207        in "An Extended Two-Phase Method for Accessing Sections of
208        Out-of-Core Arrays", Rajeev Thakur and Alok Choudhary,
209        Scientific Programming, (5)4:301--317, Winter 1996.
210        http://www.mcs.anl.gov/home/thakur/ext2ph.ps */
211 
212     ADIOI_NBC_Request *nbc_req = NULL;
213     ADIOI_GEN_IreadStridedColl_vars *vars = NULL;
214     int nprocs, myrank;
215 #ifdef RDCOLL_DEBUG
216     int i;
217 #endif
218 
219     /* FIXME: need an implementation of ADIOI_IOIstridedColl
220     if (fd->hints->cb_pfr != ADIOI_HINT_DISABLE) {
221         ADIOI_IOIstridedColl(fd, buf, count, ADIOI_READ, datatype,
222                              file_ptr_type, offset, request, error_code);
223         return;
224     }
225     */
226 
227     /* top-level struct keeping the status of function progress */
228     nbc_req = (ADIOI_NBC_Request *)ADIOI_Calloc(1, sizeof(ADIOI_NBC_Request));
229     nbc_req->rdwr = ADIOI_READ;
230 
231     /* create a generalized request */
232     if (ADIOI_GEN_greq_class == 0) {
233         MPIX_Grequest_class_create(ADIOI_GEN_irc_query_fn,
234                 ADIOI_GEN_irc_free_fn, MPIU_Greq_cancel_fn,
235                 ADIOI_GEN_irc_poll_fn, ADIOI_GEN_irc_wait_fn,
236                 &ADIOI_GEN_greq_class);
237     }
238     MPIX_Grequest_class_allocate(ADIOI_GEN_greq_class, nbc_req, request);
239     memcpy(&nbc_req->req, request, sizeof(MPI_Request));
240 
241     /* create a struct for parameters and variables */
242     vars = (ADIOI_GEN_IreadStridedColl_vars *)ADIOI_Calloc(
243             1, sizeof(ADIOI_GEN_IreadStridedColl_vars));
244     nbc_req->data.rd.rsc_vars = vars;
245 
246     /* save the parameters */
247     vars->fd = fd;
248     vars->buf = buf;
249     vars->count = count;
250     vars->datatype = datatype;
251     vars->file_ptr_type = file_ptr_type;
252     vars->offset = offset;
253 
254     MPI_Comm_size(fd->comm, &nprocs);
255     MPI_Comm_rank(fd->comm, &myrank);
256     vars->nprocs = nprocs;
257     vars->myrank = myrank;
258 
259     /* number of aggregators, cb_nodes, is stored in the hints */
260     vars->nprocs_for_coll = fd->hints->cb_nodes;
261     vars->orig_fp = fd->fp_ind;
262 
263     /* only check for interleaving if cb_read isn't disabled */
264     if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
265         /* For this process's request, calculate the list of offsets and
266            lengths in the file and determine the start and end offsets. */
267 
268         /* Note: end_offset points to the last byte-offset that will be accessed.
269            e.g., if start_offset=0 and 100 bytes to be read, end_offset=99*/
270 
271         ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
272                               &vars->offset_list, &vars->len_list,
273                               &vars->start_offset, &vars->end_offset,
274                               &vars->contig_access_count);
275 
276 #ifdef RDCOLL_DEBUG
277         for (i = 0; i < vars->contig_access_count; i++) {
278             DBG_FPRINTF(stderr, "rank %d  off %lld  len %lld\n",
279                         myrank, vars->offset_list[i], vars->len_list[i]);
280         }
281 #endif
282 
283         /* each process communicates its start and end offsets to other
284            processes. The result is an array each of start and end offsets
285            stored in order of process rank. */
286 
287         vars->st_offsets = (ADIO_Offset *)ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
288         vars->end_offsets = (ADIO_Offset *)ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
289 
290         *error_code = MPI_Iallgather(&vars->start_offset, 1, ADIO_OFFSET,
291                                      vars->st_offsets, 1, ADIO_OFFSET,
292                                      fd->comm, &vars->req_offset[0]);
293         if (*error_code != MPI_SUCCESS) return;
294         *error_code = MPI_Iallgather(&vars->end_offset, 1, ADIO_OFFSET,
295                                      vars->end_offsets, 1, ADIO_OFFSET,
296                                      fd->comm, &vars->req_offset[1]);
297 
298         nbc_req->data.rd.state = ADIOI_IRC_STATE_GEN_IREADSTRIDEDCOLL;
299         return;
300     }
301 
302     ADIOI_GEN_IreadStridedColl_indio(nbc_req, error_code);
303 }
304 
ADIOI_GEN_IreadStridedColl_inter(ADIOI_NBC_Request * nbc_req,int * error_code)305 static void ADIOI_GEN_IreadStridedColl_inter(ADIOI_NBC_Request *nbc_req,
306                                              int *error_code)
307 {
308     ADIOI_GEN_IreadStridedColl_vars *vars = nbc_req->data.rd.rsc_vars;
309     int nprocs = vars->nprocs;
310     ADIO_Offset *st_offsets = vars->st_offsets;
311     ADIO_Offset *end_offsets = vars->end_offsets;
312     int i, interleave_count = 0;
313 
314     /* are the accesses of different processes interleaved? */
315     for (i = 1; i < nprocs; i++)
316         if ((st_offsets[i] < end_offsets[i-1]) &&
317             (st_offsets[i] <= end_offsets[i]))
318             interleave_count++;
319     /* This is a rudimentary check for interleaving, but should suffice
320        for the moment. */
321 
322     vars->interleave_count = interleave_count;
323 
324     ADIOI_GEN_IreadStridedColl_indio(nbc_req, error_code);
325 }
326 
ADIOI_GEN_IreadStridedColl_indio(ADIOI_NBC_Request * nbc_req,int * error_code)327 static void ADIOI_GEN_IreadStridedColl_indio(ADIOI_NBC_Request *nbc_req,
328                                              int *error_code)
329 {
330     ADIOI_GEN_IreadStridedColl_vars *vars = nbc_req->data.rd.rsc_vars;
331     ADIOI_Icalc_others_req_vars *cor_vars = NULL;
332     ADIO_File fd = vars->fd;
333     void *buf;
334     int count, file_ptr_type;
335     MPI_Datatype datatype = vars->datatype;
336     ADIO_Offset offset;
337     int filetype_is_contig;
338     ADIO_Offset off;
339     int nprocs;
340 
341     ADIOI_Datatype_iscontig(datatype, &vars->buftype_is_contig);
342 
343     if (fd->hints->cb_read == ADIOI_HINT_DISABLE
344     || (!vars->interleave_count && (fd->hints->cb_read == ADIOI_HINT_AUTO)))
345     {
346         buf = vars->buf;
347         count = vars->count;
348         file_ptr_type = vars->file_ptr_type;
349         offset = vars->offset;
350 
351         /* don't do aggregation */
352         if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
353             ADIOI_Free(vars->offset_list);
354             ADIOI_Free(vars->len_list);
355             ADIOI_Free(vars->st_offsets);
356             ADIOI_Free(vars->end_offsets);
357         }
358 
359         fd->fp_ind = vars->orig_fp;
360         ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
361 
362 #if defined(ROMIO_RUN_ON_LINUX) && !defined(HAVE_AIO_LITE_H)
363         /* NOTE: This is currently a workaround to avoid weird errors, e.g.,
364          * stack fault, occurred on Linux. When the host OS is Linux and
365          * aio-lite is not used, a blocking ADIO function is used here.
366          * See https://trac.mpich.org/projects/mpich/ticket/2201. */
367         MPI_Status status;
368         if (vars->buftype_is_contig && filetype_is_contig) {
369             if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
370                 off = fd->disp + (fd->etype_size) * offset;
371                 ADIO_ReadContig(fd, buf, count, datatype, ADIO_EXPLICIT_OFFSET,
372                                 off, &status, error_code);
373             }
374             else ADIO_ReadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
375                                  0, &status, error_code);
376         }
377         else {
378             ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type,
379                              offset, &status, error_code);
380         }
381         ADIOI_GEN_IreadStridedColl_fini(nbc_req, error_code);
382 #else
383         if (vars->buftype_is_contig && filetype_is_contig) {
384             if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
385                 off = fd->disp + (fd->etype_size) * offset;
386                 ADIO_IreadContig(fd, buf, count, datatype, ADIO_EXPLICIT_OFFSET,
387                                  off, &vars->req_ind_io, error_code);
388             }
389             else ADIO_IreadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
390                                   0, &vars->req_ind_io, error_code);
391         }
392         else {
393             ADIO_IreadStrided(fd, buf, count, datatype, file_ptr_type,
394                               offset, &vars->req_ind_io, error_code);
395         }
396 
397         nbc_req->data.rd.state = ADIOI_IRC_STATE_GEN_IREADSTRIDEDCOLL_INDIO;
398 #endif
399         return;
400     }
401 
402     nprocs = vars->nprocs;
403 
404     /* We're going to perform aggregation of I/O.  Here we call
405      * ADIOI_Calc_file_domains() to determine what processes will handle I/O
406      * to what regions.  We pass nprocs_for_coll into this function; it is
407      * used to determine how many processes will perform I/O, which is also
408      * the number of regions into which the range of bytes must be divided.
409      * These regions are called "file domains", or FDs.
410      *
411      * When this function returns, fd_start, fd_end, fd_size, and
412      * min_st_offset will be filled in.  fd_start holds the starting byte
413      * location for each file domain.  fd_end holds the ending byte location.
414      * min_st_offset holds the minimum byte location that will be accessed.
415      *
416      * Both fd_start[] and fd_end[] are indexed by an aggregator number; this
417      * needs to be mapped to an actual rank in the communicator later.
418      *
419      */
420     ADIOI_Calc_file_domains(vars->st_offsets, vars->end_offsets, nprocs,
421                 vars->nprocs_for_coll, &vars->min_st_offset,
422                 &vars->fd_start, &vars->fd_end,
423                 fd->hints->min_fdomain_size, &vars->fd_size,
424                 fd->hints->striping_unit);
425 
426     /* calculate where the portions of the access requests of this process
427      * are located in terms of the file domains.  this could be on the same
428      * process or on other processes.  this function fills in:
429      * count_my_req_procs - number of processes (including this one) for which
430      *     this process has requests in their file domain
431      * count_my_req_per_proc - count of requests for each process, indexed
432      *     by rank of the process
433      * my_req[] - array of data structures describing the requests to be
434      *     performed by each process (including self).  indexed by rank.
435      * buf_idx[] - array of locations into which data can be directly moved;
436      *     this is only valid for contiguous buffer case
437      */
438     ADIOI_Calc_my_req(fd, vars->offset_list, vars->len_list,
439               vars->contig_access_count, vars->min_st_offset,
440               vars->fd_start, vars->fd_end, vars->fd_size,
441               nprocs, &vars->count_my_req_procs,
442               &vars->count_my_req_per_proc, &vars->my_req,
443               &vars->buf_idx);
444 
445     /* perform a collective communication in order to distribute the
446      * data calculated above.  fills in the following:
447      * count_others_req_procs - number of processes (including this
448      *     one) which have requests in this process's file domain.
449      * count_others_req_per_proc[] - number of separate contiguous
450      *     requests from proc i lie in this process's file domain.
451      */
452 
453     cor_vars = (ADIOI_Icalc_others_req_vars *)ADIOI_Calloc(
454             1, sizeof(ADIOI_Icalc_others_req_vars));
455     nbc_req->cor_vars = cor_vars;
456     cor_vars->fd = vars->fd;
457     cor_vars->count_my_req_procs = vars->count_my_req_procs;
458     cor_vars->count_my_req_per_proc = vars->count_my_req_per_proc;
459     cor_vars->my_req = vars->my_req;
460     cor_vars->nprocs = vars->nprocs;
461     cor_vars->myrank = vars->myrank;
462     cor_vars->count_others_req_procs_ptr = &vars->count_others_req_procs;
463     cor_vars->others_req_ptr = &vars->others_req;
464     cor_vars->next_fn = ADIOI_GEN_IreadStridedColl_read;
465 
466     ADIOI_Icalc_others_req(nbc_req, error_code);
467 }
468 
ADIOI_GEN_IreadStridedColl_read(ADIOI_NBC_Request * nbc_req,int * error_code)469 static void ADIOI_GEN_IreadStridedColl_read(ADIOI_NBC_Request *nbc_req,
470                                             int *error_code)
471 {
472     ADIOI_GEN_IreadStridedColl_vars *vars = nbc_req->data.rd.rsc_vars;
473     ADIOI_Iread_and_exch_vars *rae_vars = NULL;
474     ADIOI_Access *my_req = vars->my_req;
475     int nprocs = vars->nprocs;
476     int i;
477 
478     /* my_req[] and count_my_req_per_proc aren't needed at this point, so
479      * let's free the memory
480      */
481     ADIOI_Free(vars->count_my_req_per_proc);
482     for (i = 0; i < nprocs; i++) {
483         if (my_req[i].count) {
484             ADIOI_Free(my_req[i].offsets);
485             ADIOI_Free(my_req[i].lens);
486         }
487     }
488     ADIOI_Free(my_req);
489 
490     /* read data in sizes of no more than ADIOI_Coll_bufsize,
491      * communicate, and fill user buf.
492      */
493     rae_vars = (ADIOI_Iread_and_exch_vars *)ADIOI_Calloc(
494             1, sizeof(ADIOI_Iread_and_exch_vars));
495     nbc_req->data.rd.rae_vars = rae_vars;
496     rae_vars->fd = vars->fd;
497     rae_vars->buf = vars->buf;
498     rae_vars->datatype = vars->datatype;
499     rae_vars->nprocs = vars->nprocs;
500     rae_vars->myrank = vars->myrank;
501     rae_vars->others_req = vars->others_req;
502     rae_vars->offset_list = vars->offset_list;
503     rae_vars->len_list = vars->len_list;
504     rae_vars->contig_access_count = vars->contig_access_count;
505     rae_vars->min_st_offset = vars->min_st_offset;
506     rae_vars->fd_size = vars->fd_size;
507     rae_vars->fd_start = vars->fd_start;
508     rae_vars->fd_end = vars->fd_end;
509     rae_vars->buf_idx = vars->buf_idx;
510     rae_vars->next_fn = ADIOI_GEN_IreadStridedColl_free;
511 
512     ADIOI_Iread_and_exch(nbc_req, error_code);
513 }
514 
ADIOI_GEN_IreadStridedColl_free(ADIOI_NBC_Request * nbc_req,int * error_code)515 static void ADIOI_GEN_IreadStridedColl_free(ADIOI_NBC_Request *nbc_req,
516                                             int *error_code)
517 {
518     ADIOI_GEN_IreadStridedColl_vars *vars = nbc_req->data.rd.rsc_vars;
519     ADIO_File fd = vars->fd;
520     MPI_Datatype datatype = vars->datatype;
521     ADIOI_Access *others_req = vars->others_req;
522     int nprocs = vars->nprocs;
523     int i;
524 
525     if (!vars->buftype_is_contig) ADIOI_Delete_flattened(datatype);
526 
527     /* free all memory allocated for collective I/O */
528     for (i = 0; i < nprocs; i++) {
529         if (others_req[i].count) {
530             ADIOI_Free(others_req[i].offsets);
531             ADIOI_Free(others_req[i].lens);
532             ADIOI_Free(others_req[i].mem_ptrs);
533         }
534     }
535     ADIOI_Free(others_req);
536 
537     ADIOI_Free(vars->buf_idx);
538     ADIOI_Free(vars->offset_list);
539     ADIOI_Free(vars->len_list);
540     ADIOI_Free(vars->st_offsets);
541     ADIOI_Free(vars->end_offsets);
542     ADIOI_Free(vars->fd_start);
543     ADIOI_Free(vars->fd_end);
544 
545     fd->fp_sys_posn = -1;   /* set it to null. */
546 
547     ADIOI_GEN_IreadStridedColl_fini(nbc_req, error_code);
548 }
549 
ADIOI_GEN_IreadStridedColl_fini(ADIOI_NBC_Request * nbc_req,int * error_code)550 static void ADIOI_GEN_IreadStridedColl_fini(ADIOI_NBC_Request *nbc_req,
551                                             int *error_code)
552 {
553     ADIOI_GEN_IreadStridedColl_vars *vars = nbc_req->data.rd.rsc_vars;
554     MPI_Count size;
555 
556     /* This is a temporary way of filling in status. The right way is to
557        keep track of how much data was actually read and placed in buf
558        during collective I/O. */
559     MPI_Type_size_x(vars->datatype, &size);
560     nbc_req->nbytes = size * vars->count;
561 
562     /* free the struct for parameters and variables */
563     if (nbc_req->data.rd.rsc_vars) {
564         ADIOI_Free(nbc_req->data.rd.rsc_vars);
565         nbc_req->data.rd.rsc_vars = NULL;
566     }
567 
568     /* make the request complete */
569     *error_code = MPI_Grequest_complete(nbc_req->req);
570     nbc_req->data.rd.state = ADIOI_IRC_STATE_COMPLETE;
571 }
572 
573 
ADIOI_Iread_and_exch(ADIOI_NBC_Request * nbc_req,int * error_code)574 static void ADIOI_Iread_and_exch(ADIOI_NBC_Request *nbc_req, int *error_code)
575 {
576     ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
577     ADIO_File fd = vars->fd;
578     MPI_Datatype datatype = vars->datatype;
579     int nprocs = vars->nprocs;
580     MPI_Aint lb;
581     ADIOI_Access *others_req = vars->others_req;
582 
583     /* Read in sizes of no more than coll_bufsize, an info parameter.
584        Send data to appropriate processes.
585        Place recd. data in user buf.
586        The idea is to reduce the amount of extra memory required for
587        collective I/O. If all data were read all at once, which is much
588        easier, it would require temp space more than the size of user_buf,
589        which is often unacceptable. For example, to read a distributed
590        array from a file, where each local array is 8Mbytes, requiring
591        at least another 8Mbytes of temp space is unacceptable. */
592 
593     int i, j;
594     ADIO_Offset st_loc = -1, end_loc = -1;
595     int coll_bufsize;
596 
597     *error_code = MPI_SUCCESS;  /* changed below if error */
598     /* only I/O errors are currently reported */
599 
600     /* calculate the number of reads of size coll_bufsize
601        to be done by each process and the max among all processes.
602        That gives the no. of communication phases as well.
603        coll_bufsize is obtained from the hints object. */
604 
605     coll_bufsize = fd->hints->cb_buffer_size;
606     vars->coll_bufsize = coll_bufsize;
607 
608     /* grab some initial values for st_loc and end_loc */
609     for (i = 0; i < nprocs; i++) {
610         if (others_req[i].count) {
611             st_loc = others_req[i].offsets[0];
612             end_loc = others_req[i].offsets[0];
613             break;
614         }
615     }
616 
617     /* now find the real values */
618     for (i = 0; i < nprocs; i++)
619         for (j = 0; j < others_req[i].count; j++) {
620             st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
621             end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
622                           + others_req[i].lens[j] - 1));
623         }
624 
625     vars->st_loc = st_loc;
626     vars->end_loc = end_loc;
627 
628     /* calculate ntimes, the number of times this process must perform I/O
629      * operations in order to complete all the requests it has received.
630      * the need for multiple I/O operations comes from the restriction that
631      * we only use coll_bufsize bytes of memory for internal buffering.
632      */
633     if ((st_loc == -1) && (end_loc == -1)) {
634         /* this process does no I/O. */
635         vars->ntimes = 0;
636     }
637     else {
638         /* ntimes=ceiling_div(end_loc - st_loc + 1, coll_bufsize)*/
639         vars->ntimes = (int)((end_loc - st_loc + coll_bufsize) / coll_bufsize);
640     }
641 
642     *error_code = MPI_Iallreduce(&vars->ntimes, &vars->max_ntimes, 1, MPI_INT,
643                                  MPI_MAX, fd->comm, &vars->req1);
644 
645     vars->read_buf = fd->io_buf;  /* Allocated at open time */
646 
647     vars->curr_offlen_ptr = (int *)ADIOI_Calloc(nprocs, sizeof(int));
648     /* its use is explained below. calloc initializes to 0. */
649 
650     vars->count = (int *)ADIOI_Malloc(nprocs * sizeof(int));
651     /* to store count of how many off-len pairs per proc are satisfied
652        in an iteration. */
653 
654     vars->partial_send = (int *)ADIOI_Calloc(nprocs, sizeof(int));
655     /* if only a portion of the last off-len pair is sent to a process
656        in a particular iteration, the length sent is stored here.
657        calloc initializes to 0. */
658 
659     vars->send_size = (int *)ADIOI_Malloc(nprocs * sizeof(int));
660     /* total size of data to be sent to each proc. in an iteration */
661 
662     vars->recv_size = (int *)ADIOI_Malloc(nprocs * sizeof(int));
663     /* total size of data to be recd. from each proc. in an iteration.
664        Of size nprocs so that I can use MPI_Alltoall later. */
665 
666     vars->recd_from_proc = (int *)ADIOI_Calloc(nprocs, sizeof(int));
667     /* amount of data recd. so far from each proc. Used in
668        ADIOI_Fill_user_buffer. initialized to 0 here. */
669 
670     vars->start_pos = (int *)ADIOI_Malloc(nprocs*sizeof(int));
671     /* used to store the starting value of curr_offlen_ptr[i] in
672        this iteration */
673 
674     ADIOI_Datatype_iscontig(datatype, &vars->buftype_is_contig);
675     if (!vars->buftype_is_contig) {
676 	vars->flat_buf = ADIOI_Flatten_and_find(datatype);
677     }
678     MPI_Type_get_extent(datatype, &lb, &vars->buftype_extent);
679 
680     vars->done = 0;
681     vars->off = st_loc;
682     vars->for_curr_iter = vars->for_next_iter = 0;
683 
684     /* set the state to wait until MPI_Ialltoall finishes. */
685     nbc_req->data.rd.state = ADIOI_IRC_STATE_IREAD_AND_EXCH;
686 }
687 
ADIOI_Iread_and_exch_l1_begin(ADIOI_NBC_Request * nbc_req,int * error_code)688 static void ADIOI_Iread_and_exch_l1_begin(ADIOI_NBC_Request *nbc_req,
689                                           int *error_code)
690 {
691     ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
692     ADIO_File fd;
693     int nprocs;
694     ADIOI_Access *others_req;
695 
696     int i, j;
697     ADIO_Offset real_off, req_off;
698     char *read_buf;
699     int *curr_offlen_ptr, *count, *send_size;
700     int *partial_send, *start_pos;
701     ADIO_Offset size, real_size, for_next_iter;
702     int req_len, flag;
703 
704     ADIOI_R_Iexchange_data_vars *red_vars = NULL;
705 
706     /* loop exit condition */
707     if (vars->m >= vars->ntimes) {
708         ADIOI_Iread_and_exch_reset(nbc_req, error_code);
709         return;
710     }
711 
712     fd = vars->fd;
713     nprocs = vars->nprocs;
714     others_req = vars->others_req;
715 
716     read_buf = vars->read_buf;
717     curr_offlen_ptr = vars->curr_offlen_ptr;
718     count = vars->count;
719     send_size = vars->send_size;
720     partial_send = vars->partial_send;
721     start_pos = vars->start_pos;
722 
723     /* read buf of size coll_bufsize (or less) */
724     /* go through all others_req and check if any are satisfied
725        by the current read */
726 
727     /* since MPI guarantees that displacements in filetypes are in
728        monotonically nondecreasing order, I can maintain a pointer
729        (curr_offlen_ptr) to
730        current off-len pair for each process in others_req and scan
731        further only from there. There is still a problem of filetypes
732        such as:  (1, 2, 3 are not process nos. They are just numbers for
733        three chunks of data, specified by a filetype.)
734 
735        1  -------!--
736        2    -----!----
737        3       --!-----
738 
739        where ! indicates where the current read_size limitation cuts
740        through the filetype.  I resolve this by reading up to !, but
741        filling the communication buffer only for 1. I copy the portion
742        left over for 2 into a tmp_buf for use in the next
743        iteration. i.e., 2 and 3 will be satisfied in the next
744        iteration. This simplifies filling in the user's buf at the
745        other end, as only one off-len pair with incomplete data
746        will be sent. I also don't need to send the individual
747        offsets and lens along with the data, as the data is being
748        sent in a particular order. */
749 
750     /* off = start offset in the file for the data actually read in
751              this iteration
752        size = size of data read corresponding to off
753        real_off = off minus whatever data was retained in memory from
754              previous iteration for cases like 2, 3 illustrated above
755        real_size = size plus the extra corresponding to real_off
756        req_off = off in file for a particular contiguous request
757                  minus what was satisfied in previous iteration
758        req_size = size corresponding to req_off */
759 
760     size = ADIOI_MIN((unsigned)vars->coll_bufsize,
761                      vars->end_loc - vars->st_loc + 1 - vars->done);
762     real_off = vars->off - vars->for_curr_iter;
763     real_size = size + vars->for_curr_iter;
764 
765     vars->size = size;
766     vars->real_size = real_size;
767 
768     for (i = 0; i < nprocs; i++) count[i] = send_size[i] = 0;
769     for_next_iter = 0;
770 
771     for (i = 0; i < nprocs; i++) {
772 #ifdef RDCOLL_DEBUG
773         DBG_FPRINTF(stderr, "rank %d, i %d, others_count %d\n",
774                     vars->myrank, i, others_req[i].count);
775 #endif
776         if (others_req[i].count) {
777             start_pos[i] = curr_offlen_ptr[i];
778             for (j = curr_offlen_ptr[i]; j < others_req[i].count; j++) {
779                 if (partial_send[i]) {
780                     /* this request may have been partially
781                        satisfied in the previous iteration. */
782                     req_off = others_req[i].offsets[j] + partial_send[i];
783                     req_len = others_req[i].lens[j] - partial_send[i];
784                     partial_send[i] = 0;
785                     /* modify the off-len pair to reflect this change */
786                     others_req[i].offsets[j] = req_off;
787                     others_req[i].lens[j] = req_len;
788                 }
789                 else {
790                     req_off = others_req[i].offsets[j];
791                     req_len = others_req[i].lens[j];
792                 }
793                 if (req_off < real_off + real_size) {
794                     count[i]++;
795                     ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)read_buf) + req_off - real_off) == (ADIO_Offset)(MPIU_Upint)(read_buf + req_off - real_off));
796                     MPI_Get_address(read_buf + req_off - real_off,
797                                 &(others_req[i].mem_ptrs[j]));
798                     ADIOI_Assert((real_off + real_size - req_off) == (int)(real_off + real_size - req_off));
799                     send_size[i] += (int)(ADIOI_MIN(real_off + real_size - req_off,
800                                                     (ADIO_Offset)(unsigned)req_len));
801 
802                     if (real_off + real_size - req_off < (ADIO_Offset)(unsigned)req_len) {
803                         partial_send[i] = (int)(real_off + real_size - req_off);
804                         if ((j+1 < others_req[i].count) &&
805                             (others_req[i].offsets[j+1] < real_off + real_size)) {
806                             /* this is the case illustrated in the
807                                figure above. */
808                             for_next_iter = ADIOI_MAX(for_next_iter,
809                                     real_off + real_size - others_req[i].offsets[j+1]);
810                             /* max because it must cover requests
811                                from different processes */
812                         }
813                         break;
814                     }
815                 }
816                 else break;
817             }
818             curr_offlen_ptr[i] = j;
819         }
820     }
821     vars->for_next_iter = for_next_iter;
822 
823     flag = 0;
824     for (i = 0; i < nprocs; i++)
825         if (count[i]) flag = 1;
826 
827     /* create a struct for ADIOI_R_Iexchange_data() */
828     red_vars = (ADIOI_R_Iexchange_data_vars *)ADIOI_Calloc(
829             1, sizeof(ADIOI_R_Iexchange_data_vars));
830     nbc_req->data.rd.red_vars = red_vars;
831     red_vars->fd = vars->fd;
832     red_vars->buf = vars->buf;
833     red_vars->flat_buf = vars->flat_buf;
834     red_vars->offset_list = vars->offset_list;
835     red_vars->len_list = vars->len_list;
836     red_vars->send_size = vars->send_size;
837     red_vars->recv_size = vars->recv_size;
838     red_vars->count = vars->count;
839     red_vars->start_pos = vars->start_pos;
840     red_vars->partial_send = vars->partial_send;
841     red_vars->recd_from_proc = vars->recd_from_proc;
842     red_vars->nprocs = vars->nprocs;
843     red_vars->myrank = vars->myrank;
844     red_vars->buftype_is_contig = vars->buftype_is_contig;
845     red_vars->contig_access_count = vars->contig_access_count;
846     red_vars->min_st_offset = vars->min_st_offset;
847     red_vars->fd_size = vars->fd_size;
848     red_vars->fd_start = vars->fd_start;
849     red_vars->fd_end = vars->fd_end;
850     red_vars->others_req = vars->others_req;
851     red_vars->iter = vars->m;
852     red_vars->buftype_extent = vars->buftype_extent;
853     red_vars->buf_idx = vars->buf_idx;
854     red_vars->next_fn = ADIOI_Iread_and_exch_l1_end;
855 
856     if (flag) {
857         ADIOI_Assert(size == (int)size);
858 #if defined(ROMIO_RUN_ON_LINUX) && !defined(HAVE_AIO_LITE_H)
859         MPI_Status status;
860         ADIO_ReadContig(fd, read_buf+vars->for_curr_iter, (int)size,
861                         MPI_BYTE, ADIO_EXPLICIT_OFFSET, vars->off,
862                         &status, error_code);
863 #else
864         ADIO_IreadContig(fd, read_buf+vars->for_curr_iter, (int)size,
865                          MPI_BYTE, ADIO_EXPLICIT_OFFSET, vars->off,
866                          &vars->req2, error_code);
867 
868         nbc_req->data.rd.state = ADIOI_IRC_STATE_IREAD_AND_EXCH_L1_BEGIN;
869         return;
870 #endif
871     }
872 
873     ADIOI_R_Iexchange_data(nbc_req, error_code);
874 }
875 
ADIOI_Iread_and_exch_l1_end(ADIOI_NBC_Request * nbc_req,int * error_code)876 static void ADIOI_Iread_and_exch_l1_end(ADIOI_NBC_Request *nbc_req,
877                                         int *error_code)
878 {
879     ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
880     ADIO_File fd = vars->fd;
881     ADIO_Offset size = vars->size;
882     ADIO_Offset real_size = vars->real_size;
883     ADIO_Offset for_next_iter = vars->for_next_iter;
884     char *read_buf = vars->read_buf;
885     char *tmp_buf;
886 
887     vars->for_curr_iter = for_next_iter;
888 
889     if (for_next_iter) {
890         tmp_buf = (char *)ADIOI_Malloc(for_next_iter);
891         ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)read_buf)+real_size-for_next_iter) == (ADIO_Offset)(MPIU_Upint)(read_buf+real_size-for_next_iter));
892         ADIOI_Assert((for_next_iter+vars->coll_bufsize) == (size_t)(for_next_iter+vars->coll_bufsize));
893         memcpy(tmp_buf, read_buf+real_size-for_next_iter, for_next_iter);
894         ADIOI_Free(fd->io_buf);
895         fd->io_buf = (char *)ADIOI_Malloc(for_next_iter+vars->coll_bufsize);
896         memcpy(fd->io_buf, tmp_buf, for_next_iter);
897         vars->read_buf = fd->io_buf;
898         ADIOI_Free(tmp_buf);
899     }
900 
901     vars->off += size;
902     vars->done += size;
903 
904     /* increment m and go back to the beginning of m loop */
905     vars->m++;
906     ADIOI_Iread_and_exch_l1_begin(nbc_req, error_code);
907 }
908 
ADIOI_Iread_and_exch_reset(ADIOI_NBC_Request * nbc_req,int * error_code)909 static void ADIOI_Iread_and_exch_reset(ADIOI_NBC_Request *nbc_req,
910                                        int *error_code)
911 {
912     ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
913     int nprocs = vars->nprocs;
914     int *count = vars->count;
915     int *send_size = vars->send_size;
916     int i;
917 
918     for (i = 0; i < nprocs; i++) count[i] = send_size[i] = 0;
919 
920     vars->m = vars->ntimes;
921     ADIOI_Iread_and_exch_l2_begin(nbc_req, error_code);
922 }
923 
ADIOI_Iread_and_exch_l2_begin(ADIOI_NBC_Request * nbc_req,int * error_code)924 static void ADIOI_Iread_and_exch_l2_begin(ADIOI_NBC_Request *nbc_req,
925                                           int *error_code)
926 {
927     ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
928     ADIOI_R_Iexchange_data_vars *red_vars = NULL;
929 
930     /* loop exit condition */
931     if (vars->m >= vars->max_ntimes) {
932         ADIOI_Iread_and_exch_fini(nbc_req, error_code);
933         return;
934     }
935 
936     /* create a struct for ADIOI_R_Iexchange_data() */
937     red_vars = (ADIOI_R_Iexchange_data_vars *)ADIOI_Calloc(
938             1, sizeof(ADIOI_R_Iexchange_data_vars));
939     nbc_req->data.rd.red_vars = red_vars;
940     red_vars->fd = vars->fd;
941     red_vars->buf = vars->buf;
942     red_vars->flat_buf = vars->flat_buf;
943     red_vars->offset_list = vars->offset_list;
944     red_vars->len_list = vars->len_list;
945     red_vars->send_size = vars->send_size;
946     red_vars->recv_size = vars->recv_size;
947     red_vars->count = vars->count;
948     red_vars->start_pos = vars->start_pos;
949     red_vars->partial_send = vars->partial_send;
950     red_vars->recd_from_proc = vars->recd_from_proc;
951     red_vars->nprocs = vars->nprocs;
952     red_vars->myrank = vars->myrank;
953     red_vars->buftype_is_contig = vars->buftype_is_contig;
954     red_vars->contig_access_count = vars->contig_access_count;
955     red_vars->min_st_offset = vars->min_st_offset;
956     red_vars->fd_size = vars->fd_size;
957     red_vars->fd_start = vars->fd_start;
958     red_vars->fd_end = vars->fd_end;
959     red_vars->others_req = vars->others_req;
960     red_vars->iter = vars->m;
961     red_vars->buftype_extent = vars->buftype_extent;
962     red_vars->buf_idx = vars->buf_idx;
963     red_vars->next_fn = ADIOI_Iread_and_exch_l2_end;
964 
965     ADIOI_R_Iexchange_data(nbc_req, error_code);
966 }
967 
ADIOI_Iread_and_exch_l2_end(ADIOI_NBC_Request * nbc_req,int * error_code)968 static void ADIOI_Iread_and_exch_l2_end(ADIOI_NBC_Request *nbc_req,
969                                         int *error_code)
970 {
971     ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
972 
973     vars->m++;
974     ADIOI_Iread_and_exch_l2_begin(nbc_req, error_code);
975 }
976 
ADIOI_Iread_and_exch_fini(ADIOI_NBC_Request * nbc_req,int * error_code)977 static void ADIOI_Iread_and_exch_fini(ADIOI_NBC_Request *nbc_req, int *error_code)
978 {
979     ADIOI_Iread_and_exch_vars *vars = nbc_req->data.rd.rae_vars;
980     void (*next_fn)(ADIOI_NBC_Request *, int *);
981 
982     ADIOI_Free(vars->curr_offlen_ptr);
983     ADIOI_Free(vars->count);
984     ADIOI_Free(vars->partial_send);
985     ADIOI_Free(vars->send_size);
986     ADIOI_Free(vars->recv_size);
987     ADIOI_Free(vars->recd_from_proc);
988     ADIOI_Free(vars->start_pos);
989 
990     next_fn = vars->next_fn;
991 
992     /* free the struct for parameters and variables */
993     ADIOI_Free(nbc_req->data.rd.rae_vars);
994     nbc_req->data.rd.rae_vars = NULL;
995 
996     /* move to the next function */
997     next_fn(nbc_req, error_code);
998 }
999 
1000 
ADIOI_R_Iexchange_data(ADIOI_NBC_Request * nbc_req,int * error_code)1001 static void ADIOI_R_Iexchange_data(ADIOI_NBC_Request *nbc_req, int *error_code)
1002 {
1003     ADIOI_R_Iexchange_data_vars *vars = nbc_req->data.rd.red_vars;
1004 
1005     /* exchange send_size info so that each process knows how much to
1006        receive from whom and how much memory to allocate. */
1007     *error_code = MPI_Ialltoall(vars->send_size, 1, MPI_INT, vars->recv_size, 1,
1008                                 MPI_INT, vars->fd->comm, &vars->req1);
1009 
1010     nbc_req->data.rd.state = ADIOI_IRC_STATE_R_IEXCHANGE_DATA;
1011 }
1012 
ADIOI_R_Iexchange_data_recv(ADIOI_NBC_Request * nbc_req,int * error_code)1013 static void ADIOI_R_Iexchange_data_recv(ADIOI_NBC_Request *nbc_req,
1014                                         int *error_code)
1015 {
1016     ADIOI_R_Iexchange_data_vars *vars = nbc_req->data.rd.red_vars;
1017     ADIO_File fd = vars->fd;
1018     int *send_size = vars->send_size;
1019     int *recv_size = vars->recv_size;
1020     int *count = vars->count;
1021     int *start_pos = vars->start_pos;
1022     int *partial_send = vars->partial_send;
1023     int nprocs = vars->nprocs;
1024     int myrank = vars->myrank;
1025     ADIOI_Access *others_req = vars->others_req;
1026     int iter = vars->iter;
1027     int *buf_idx = vars->buf_idx;
1028 
1029     int i, j, k = 0, tmp = 0, nprocs_recv, nprocs_send;
1030     char **recv_buf = NULL;
1031     MPI_Datatype send_type;
1032 
1033     nprocs_recv = 0;
1034     for (i = 0; i < nprocs; i++) if (recv_size[i]) nprocs_recv++;
1035     vars->nprocs_recv = nprocs_recv;
1036 
1037     nprocs_send = 0;
1038     for (i = 0; i < nprocs; i++) if (send_size[i]) nprocs_send++;
1039     vars->nprocs_send = nprocs_send;
1040 
1041     vars->req2 = (MPI_Request *)
1042         ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
1043     /* +1 to avoid a 0-size malloc */
1044 
1045     /* post recvs. if buftype_is_contig, data can be directly recd. into
1046        user buf at location given by buf_idx. else use recv_buf. */
1047 
1048 #ifdef AGGREGATION_PROFILE
1049     MPE_Log_event (5032, 0, NULL);
1050 #endif
1051 
1052     if (vars->buftype_is_contig) {
1053         j = 0;
1054         for (i = 0; i < nprocs; i++)
1055             if (recv_size[i]) {
1056                 MPI_Irecv(((char *)vars->buf) + buf_idx[i], recv_size[i],
1057                           MPI_BYTE, i, myrank+i+100*iter, fd->comm,
1058                           vars->req2 + j);
1059                 j++;
1060                 buf_idx[i] += recv_size[i];
1061             }
1062     }
1063     else {
1064         /* allocate memory for recv_buf and post receives */
1065         recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char*));
1066         vars->recv_buf = recv_buf;
1067         for (i = 0; i < nprocs; i++)
1068             if (recv_size[i]) recv_buf[i] = (char *)ADIOI_Malloc(recv_size[i]);
1069 
1070         j = 0;
1071         for (i = 0; i < nprocs; i++)
1072             if (recv_size[i]) {
1073                 MPI_Irecv(recv_buf[i], recv_size[i], MPI_BYTE, i,
1074                           myrank+i+100*iter, fd->comm,
1075                           vars->req2 + j);
1076                 j++;
1077 #ifdef RDCOLL_DEBUG
1078                 DBG_FPRINTF(stderr, "node %d, recv_size %d, tag %d \n",
1079                             myrank, recv_size[i], myrank+i+100*iter);
1080 #endif
1081             }
1082     }
1083 
1084     /* create derived datatypes and send data */
1085 
1086     j = 0;
1087     for (i = 0; i < nprocs; i++) {
1088         if (send_size[i]) {
1089             /* take care if the last off-len pair is a partial send */
1090             if (partial_send[i]) {
1091                 k = start_pos[i] + count[i] - 1;
1092                 tmp = others_req[i].lens[k];
1093                 others_req[i].lens[k] = partial_send[i];
1094             }
1095             ADIOI_Type_create_hindexed_x(count[i],
1096                     &(others_req[i].lens[start_pos[i]]),
1097                     &(others_req[i].mem_ptrs[start_pos[i]]),
1098                     MPI_BYTE, &send_type);
1099             /* absolute displacement; use MPI_BOTTOM in send */
1100             MPI_Type_commit(&send_type);
1101             MPI_Isend(MPI_BOTTOM, 1, send_type, i, myrank+i+100*iter,
1102                       fd->comm, vars->req2 + nprocs_recv + j);
1103             MPI_Type_free(&send_type);
1104             if (partial_send[i]) others_req[i].lens[k] = tmp;
1105             j++;
1106         }
1107     }
1108 
1109     /* wait on the receives */
1110     if (nprocs_recv) {
1111         nbc_req->data.rd.state = ADIOI_IRC_STATE_R_IEXCHANGE_DATA_RECV;
1112         return;
1113     }
1114 
1115     ADIOI_R_Iexchange_data_fill(nbc_req, error_code);
1116 }
1117 
ADIOI_R_Iexchange_data_fill(ADIOI_NBC_Request * nbc_req,int * error_code)1118 static void ADIOI_R_Iexchange_data_fill(ADIOI_NBC_Request *nbc_req,
1119                                         int *error_code)
1120 {
1121     ADIOI_R_Iexchange_data_vars *vars = nbc_req->data.rd.red_vars;
1122 
1123     if (vars->nprocs_recv) {
1124         /* if noncontiguous, to the copies from the recv buffers */
1125         if (!vars->buftype_is_contig)
1126             ADIOI_Fill_user_buffer(vars->fd, vars->buf, vars->flat_buf,
1127                     vars->recv_buf, vars->offset_list, vars->len_list,
1128                     (unsigned*)vars->recv_size,
1129                     vars->req2, NULL, vars->recd_from_proc,
1130                     vars->nprocs, vars->contig_access_count,
1131                     vars->min_st_offset, vars->fd_size, vars->fd_start,
1132                     vars->fd_end, vars->buftype_extent);
1133     }
1134 
1135     nbc_req->data.rd.state = ADIOI_IRC_STATE_R_IEXCHANGE_DATA_FILL;
1136 }
1137 
ADIOI_R_Iexchange_data_fini(ADIOI_NBC_Request * nbc_req,int * error_code)1138 static void ADIOI_R_Iexchange_data_fini(ADIOI_NBC_Request *nbc_req, int *error_code)
1139 {
1140     ADIOI_R_Iexchange_data_vars *vars = nbc_req->data.rd.red_vars;
1141     void (*next_fn)(ADIOI_NBC_Request *, int *);
1142     int i;
1143 
1144     ADIOI_Free(vars->req2);
1145 
1146     if (!vars->buftype_is_contig) {
1147         for (i = 0; i < vars->nprocs; i++)
1148             if (vars->recv_size[i]) ADIOI_Free(vars->recv_buf[i]);
1149         ADIOI_Free(vars->recv_buf);
1150     }
1151 #ifdef AGGREGATION_PROFILE
1152     MPE_Log_event (5033, 0, NULL);
1153 #endif
1154 
1155     next_fn = vars->next_fn;
1156 
1157     /* free the structure for parameters and variables */
1158     ADIOI_Free(vars);
1159     nbc_req->data.rd.red_vars = NULL;
1160 
1161     /* move to the next function */
1162     next_fn(nbc_req, error_code);
1163 }
1164 
1165 
ADIOI_GEN_irc_query_fn(void * extra_state,MPI_Status * status)1166 static int ADIOI_GEN_irc_query_fn(void *extra_state, MPI_Status *status)
1167 {
1168     ADIOI_NBC_Request *nbc_req;
1169 
1170     nbc_req = (ADIOI_NBC_Request *)extra_state;
1171 
1172     MPI_Status_set_elements_x(status, MPI_BYTE, nbc_req->nbytes);
1173 
1174     /* can never cancel so always true */
1175     MPI_Status_set_cancelled(status, 0);
1176 
1177     /* choose not to return a value for this */
1178     status->MPI_SOURCE = MPI_UNDEFINED;
1179     /* tag has no meaning for this generalized request */
1180     status->MPI_TAG = MPI_UNDEFINED;
1181 
1182     /* this generalized request never fails */
1183     return MPI_SUCCESS;
1184 }
1185 
ADIOI_GEN_irc_free_fn(void * extra_state)1186 static int ADIOI_GEN_irc_free_fn(void *extra_state)
1187 {
1188     ADIOI_NBC_Request *nbc_req;
1189 
1190     nbc_req = (ADIOI_NBC_Request *)extra_state;
1191     ADIOI_Free(nbc_req);
1192 
1193     return MPI_SUCCESS;
1194 }
1195 
ADIOI_GEN_irc_poll_fn(void * extra_state,MPI_Status * status)1196 static int ADIOI_GEN_irc_poll_fn(void *extra_state, MPI_Status *status)
1197 {
1198     ADIOI_NBC_Request *nbc_req;
1199     ADIOI_GEN_IreadStridedColl_vars *rsc_vars = NULL;
1200     ADIOI_Icalc_others_req_vars     *cor_vars = NULL;
1201     ADIOI_Iread_and_exch_vars       *rae_vars = NULL;
1202     ADIOI_R_Iexchange_data_vars     *red_vars = NULL;
1203     int errcode = MPI_SUCCESS;
1204     int flag;
1205 
1206     nbc_req = (ADIOI_NBC_Request *)extra_state;
1207 
1208     switch (nbc_req->data.rd.state) {
1209         case ADIOI_IRC_STATE_GEN_IREADSTRIDEDCOLL:
1210             rsc_vars = nbc_req->data.rd.rsc_vars;
1211             errcode = MPI_Testall(2, rsc_vars->req_offset, &flag,
1212                                   MPI_STATUSES_IGNORE);
1213             if (errcode == MPI_SUCCESS && flag) {
1214                 ADIOI_GEN_IreadStridedColl_inter(nbc_req, &errcode);
1215             }
1216             break;
1217 
1218         case ADIOI_IRC_STATE_GEN_IREADSTRIDEDCOLL_INDIO:
1219             rsc_vars = nbc_req->data.rd.rsc_vars;
1220             errcode = MPI_Test(&rsc_vars->req_ind_io, &flag, MPI_STATUS_IGNORE);
1221             if (errcode == MPI_SUCCESS && flag) {
1222                 /* call the last function */
1223                 ADIOI_GEN_IreadStridedColl_fini(nbc_req, &errcode);
1224             }
1225             break;
1226 
1227         case ADIOI_IRC_STATE_ICALC_OTHERS_REQ:
1228             cor_vars = nbc_req->cor_vars;
1229             errcode = MPI_Test(&cor_vars->req1, &flag, MPI_STATUS_IGNORE);
1230             if (errcode == MPI_SUCCESS && flag) {
1231                 ADIOI_Icalc_others_req_main(nbc_req, &errcode);
1232             }
1233             break;
1234 
1235         case ADIOI_IRC_STATE_ICALC_OTHERS_REQ_MAIN:
1236             cor_vars = nbc_req->cor_vars;
1237             if (cor_vars->num_req2) {
1238                 errcode = MPI_Testall(cor_vars->num_req2, cor_vars->req2,
1239                                       &flag, MPI_STATUSES_IGNORE);
1240                 if (errcode == MPI_SUCCESS && flag) {
1241                     ADIOI_Icalc_others_req_fini(nbc_req, &errcode);
1242                 }
1243             } else {
1244                 ADIOI_Icalc_others_req_fini(nbc_req, &errcode);
1245             }
1246             break;
1247 
1248         case ADIOI_IRC_STATE_IREAD_AND_EXCH:
1249             rae_vars = nbc_req->data.rd.rae_vars;
1250             errcode = MPI_Test(&rae_vars->req1, &flag, MPI_STATUS_IGNORE);
1251             if (errcode == MPI_SUCCESS && flag) {
1252                 rae_vars->m = 0;
1253                 ADIOI_Iread_and_exch_l1_begin(nbc_req, &errcode);
1254             }
1255             break;
1256 
1257         case ADIOI_IRC_STATE_IREAD_AND_EXCH_L1_BEGIN:
1258             rae_vars = nbc_req->data.rd.rae_vars;
1259             errcode = MPI_Test(&rae_vars->req2, &flag, MPI_STATUS_IGNORE);
1260             if (errcode == MPI_SUCCESS && flag) {
1261                 ADIOI_R_Iexchange_data(nbc_req, &errcode);
1262             }
1263             break;
1264 
1265         case ADIOI_IRC_STATE_R_IEXCHANGE_DATA:
1266             red_vars = nbc_req->data.rd.red_vars;
1267             errcode = MPI_Test(&red_vars->req1, &flag, MPI_STATUS_IGNORE);
1268             if (errcode == MPI_SUCCESS && flag) {
1269                 ADIOI_R_Iexchange_data_recv(nbc_req, &errcode);
1270             }
1271             break;
1272 
1273         case ADIOI_IRC_STATE_R_IEXCHANGE_DATA_RECV:
1274             red_vars = nbc_req->data.rd.red_vars;
1275             errcode = MPI_Testall(red_vars->nprocs_recv, red_vars->req2, &flag,
1276                                   MPI_STATUSES_IGNORE);
1277             if (errcode == MPI_SUCCESS && flag) {
1278                 ADIOI_R_Iexchange_data_fill(nbc_req, &errcode);
1279             }
1280             break;
1281 
1282         case ADIOI_IRC_STATE_R_IEXCHANGE_DATA_FILL:
1283             red_vars = nbc_req->data.rd.red_vars;
1284             errcode = MPI_Testall(red_vars->nprocs_send,
1285                                   red_vars->req2 + red_vars->nprocs_recv,
1286                                   &flag, MPI_STATUSES_IGNORE);
1287             if (errcode == MPI_SUCCESS && flag) {
1288                 ADIOI_R_Iexchange_data_fini(nbc_req, &errcode);
1289             }
1290             break;
1291 
1292         default:
1293             break;
1294     }
1295 
1296     /* --BEGIN ERROR HANDLING-- */
1297     if (errcode != MPI_SUCCESS) {
1298         errcode = MPIO_Err_create_code(MPI_SUCCESS,
1299                 MPIR_ERR_RECOVERABLE,
1300                 "ADIOI_GEN_irc_poll_fn", __LINE__,
1301                 MPI_ERR_IO, "**mpi_grequest_complete",
1302                 0);
1303     }
1304     /* --END ERROR HANDLING-- */
1305 
1306     return errcode;
1307 }
1308 
1309 /* wait for multiple requests to complete */
ADIOI_GEN_irc_wait_fn(int count,void ** array_of_states,double timeout,MPI_Status * status)1310 static int ADIOI_GEN_irc_wait_fn(int count, void **array_of_states,
1311                                  double timeout, MPI_Status *status)
1312 {
1313     int i, errcode = MPI_SUCCESS;
1314     double starttime;
1315     ADIOI_NBC_Request **nbc_reqlist;
1316 
1317     nbc_reqlist = (ADIOI_NBC_Request **)array_of_states;
1318 
1319     starttime = MPI_Wtime();
1320     for (i = 0; i < count ; i++) {
1321         while (nbc_reqlist[i]->data.rd.state != ADIOI_IRC_STATE_COMPLETE) {
1322             errcode = ADIOI_GEN_irc_poll_fn(nbc_reqlist[i], MPI_STATUS_IGNORE);
1323             /* --BEGIN ERROR HANDLING-- */
1324             if (errcode != MPI_SUCCESS) {
1325                 errcode = MPIO_Err_create_code(MPI_SUCCESS,
1326                         MPIR_ERR_RECOVERABLE,
1327                         "ADIOI_GEN_irc_wait_fn",
1328                         __LINE__, MPI_ERR_IO,
1329                         "**mpi_grequest_complete", 0);
1330             }
1331             /* --END ERROR HANDLING-- */
1332 
1333             if ((timeout > 0) && (timeout < (MPI_Wtime() - starttime)))
1334                 goto fn_exit;
1335 
1336             /* If the progress engine is blocked, we have to yield for another
1337              * thread to be able to unblock the progress engine. */
1338             MPIR_Ext_cs_yield();
1339         }
1340     }
1341 
1342   fn_exit:
1343     return errcode;
1344 }
1345 
1346 #endif /* HAVE_MPI_GREQUEST_EXTENSIONS */
1347