1 /* ---------------------------------------------------------------- */
2 /* (C)Copyright IBM Corp.  2007, 2008                               */
3 /* ---------------------------------------------------------------- */
4 /**
5  * \file ad_bgl_rdcoll.c
6  * \brief ???
7  */
8 
9 /* -*- Mode: C; c-basic-offset:4 ; -*- */
10 /*
11  *
12  *   Copyright (C) 1997 University of Chicago.
13  *   See COPYRIGHT notice in top-level directory.
14  */
15 
16 #include "adio.h"
17 #include "adio_extern.h"
18 #include "ad_bgl.h"
19 #include "ad_bgl_pset.h"
20 #include "ad_bgl_aggrs.h"
21 
22 #ifdef PROFILE
23 #include "mpe.h"
24 #endif
25 
26 #ifdef USE_DBG_LOGGING
27   #define RDCOLL_DEBUG 1
28 #endif
29 #ifdef AGGREGATION_PROFILE
30 #include "mpe.h"
31 #endif
32 
33 /* prototypes of functions used for collective reads only. */
34 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
35 				datatype, int nprocs,
36 				int myrank, ADIOI_Access
37 				*others_req, ADIO_Offset *offset_list,
38 				ADIO_Offset *len_list, int contig_access_count,
39 				ADIO_Offset
40 				min_st_offset, ADIO_Offset fd_size,
41 				ADIO_Offset *fd_start, ADIO_Offset *fd_end,
42 				int *buf_idx, int *error_code);
43 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
44 				  *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
45 				  *len_list, int *send_size, int *recv_size,
46 				  int *count, int *start_pos,
47 				  int *partial_send,
48 				  int *recd_from_proc, int nprocs,
49 				  int myrank, int
50 				  buftype_is_contig, int contig_access_count,
51 				  ADIO_Offset min_st_offset,
52 				  ADIO_Offset fd_size,
53 				  ADIO_Offset *fd_start, ADIO_Offset *fd_end,
54 				  ADIOI_Access *others_req,
55 				  int iter,
56 				  MPI_Aint buftype_extent, int *buf_idx);
57 static void ADIOI_R_Exchange_data_alltoallv(ADIO_File fd, void *buf, ADIOI_Flatlist_node
58                                   *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
59                                   *len_list, int *send_size, int *recv_size,
60                                   int *count, int *start_pos,
61                                   int *partial_send,
62                                   int *recd_from_proc, int nprocs,
63                                   int myrank, int
64                                   buftype_is_contig, int contig_access_count,
65                                   ADIO_Offset min_st_offset,
66                                   ADIO_Offset fd_size,
67                                   ADIO_Offset *fd_start, ADIO_Offset *fd_end,
68                                   ADIOI_Access *others_req,
69                                   int iter,
70                                   MPI_Aint buftype_extent, int *buf_idx);
71 static void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
72 				   *flat_buf, char **recv_buf, ADIO_Offset
73 				   *offset_list, ADIO_Offset *len_list,
74 				   unsigned *recv_size,
75 				   MPI_Request *requests, MPI_Status *statuses,
76 				   int *recd_from_proc, int nprocs,
77 				   int contig_access_count,
78 				   ADIO_Offset min_st_offset,
79 				   ADIO_Offset fd_size, ADIO_Offset *fd_start,
80 				   ADIO_Offset *fd_end,
81 				   MPI_Aint buftype_extent);
82 
83 extern void ADIOI_Calc_my_off_len(ADIO_File fd, int bufcount, MPI_Datatype
84 			    datatype, int file_ptr_type, ADIO_Offset
85 			    offset, ADIO_Offset **offset_list_ptr, ADIO_Offset
86 			    **len_list_ptr, ADIO_Offset *start_offset_ptr,
87 			    ADIO_Offset *end_offset_ptr, int
88 			   *contig_access_count_ptr);
89 
ADIOI_BGL_ReadStridedColl(ADIO_File fd,void * buf,int count,MPI_Datatype datatype,int file_ptr_type,ADIO_Offset offset,ADIO_Status * status,int * error_code)90 void ADIOI_BGL_ReadStridedColl(ADIO_File fd, void *buf, int count,
91 			       MPI_Datatype datatype, int file_ptr_type,
92 			       ADIO_Offset offset, ADIO_Status *status, int
93 			       *error_code)
94 {
95 /* Uses a generalized version of the extended two-phase method described
96    in "An Extended Two-Phase Method for Accessing Sections of
97    Out-of-Core Arrays", Rajeev Thakur and Alok Choudhary,
98    Scientific Programming, (5)4:301--317, Winter 1996.
99    http://www.mcs.anl.gov/home/thakur/ext2ph.ps */
100 
101     ADIOI_Access *my_req;
102     /* array of nprocs structures, one for each other process in
103        whose file domain this process's request lies */
104 
105     ADIOI_Access *others_req;
106     /* array of nprocs structures, one for each other process
107        whose request lies in this process's file domain. */
108 
109     int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank;
110     int contig_access_count=0, interleave_count = 0, buftype_is_contig;
111     int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
112     ADIO_Offset start_offset, end_offset, orig_fp, fd_size, min_st_offset, off;
113     ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL,
114 	*fd_end = NULL, *end_offsets = NULL;
115     ADIO_Offset *bgl_offsets0 = NULL, *bgl_offsets = NULL;
116     int  ii;
117     ADIO_Offset *len_list = NULL;
118     int *buf_idx = NULL;
119 #if BGL_PROFILE
120     BGLMPIO_T_CIO_RESET( 0, r )
121 #endif
122 
123 #ifdef HAVE_STATUS_SET_BYTES
124     int bufsize, size;
125 #endif
126 
127 #if 0
128 /*   From common code - not implemented for bgl. */
129     if (fd->hints->cb_pfr != ADIOI_HINT_DISABLE) {
130         ADIOI_IOStridedColl (fd, buf, count, ADIOI_READ, datatype,
131 			file_ptr_type, offset, status, error_code);
132         return;
133     } */
134 #endif
135 #ifdef PROFILE
136         MPE_Log_event(13, 0, "start computation");
137 #endif
138 
139     MPI_Comm_size(fd->comm, &nprocs);
140     MPI_Comm_rank(fd->comm, &myrank);
141 
142     /* number of aggregators, cb_nodes, is stored in the hints */
143     nprocs_for_coll = fd->hints->cb_nodes;
144     orig_fp = fd->fp_ind;
145 
146 #if BGL_PROFILE
147     BGLMPIO_T_CIO_SET_GET( 0, r, 0, 1, 0, BGLMPIO_CIO_LCOMP, BGLMPIO_CIO_LAST )
148 #endif
149 
150     /* only check for interleaving if cb_read isn't disabled */
151     if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
152 	/* For this process's request, calculate the list of offsets and
153 	   lengths in the file and determine the start and end offsets. */
154 
155 	/* Note: end_offset points to the last byte-offset that will be accessed.
156 	   e.g., if start_offset=0 and 100 bytes to be read, end_offset=99*/
157 
158 	ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
159 			      &offset_list, &len_list, &start_offset,
160 			      &end_offset, &contig_access_count);
161 
162 #if BGL_PROFILE
163     BGLMPIO_T_CIO_SET_GET( 0, r, 1, 1, 1, BGLMPIO_CIO_GATHER, BGLMPIO_CIO_LCOMP )
164 #endif
165 
166 #ifdef RDCOLL_DEBUG
167     for (i=0; i<contig_access_count; i++) {
168 	      DBG_FPRINTF(stderr, "rank %d  off %lld  len %lld\n",
169 			      myrank, offset_list[i], len_list[i]);
170     }
171 #endif
172 
173 	/* each process communicates its start and end offsets to other
174 	   processes. The result is an array each of start and end offsets
175 	   stored in order of process rank. */
176 
177 	st_offsets   = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
178 	end_offsets  = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
179 
180     if (bglmpio_tunegather) {
181 	    bgl_offsets0 = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset));
182 	    bgl_offsets  = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset));
183 	    for (ii=0; ii<nprocs; ii++)  {
184 		bgl_offsets0[ii*2]   = 0;
185 		bgl_offsets0[ii*2+1] = 0;
186 	    }
187 	    bgl_offsets0[myrank*2]   = start_offset;
188 	    bgl_offsets0[myrank*2+1] =   end_offset;
189 
190 	MPI_Allreduce( bgl_offsets0, bgl_offsets, nprocs*2, ADIO_OFFSET, MPI_MAX, fd->comm );
191 
192 	    for (ii=0; ii<nprocs; ii++)  {
193 		st_offsets [ii] = bgl_offsets[ii*2]  ;
194 		end_offsets[ii] = bgl_offsets[ii*2+1];
195 	    }
196 	    ADIOI_Free( bgl_offsets0 );
197 	    ADIOI_Free( bgl_offsets  );
198     } else {
199         MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
200                       ADIO_OFFSET, fd->comm);
201         MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
202                       ADIO_OFFSET, fd->comm);
203     }
204 
205 #if BGL_PROFILE
206     BGLMPIO_T_CIO_SET_GET( 0, r, 0, 1, 1, BGLMPIO_CIO_PATANA, BGLMPIO_CIO_GATHER )
207 #endif
208 
209 	/* are the accesses of different processes interleaved? */
210 	for (i=1; i<nprocs; i++)
211 	    if ((st_offsets[i] < end_offsets[i-1]) &&
212                 (st_offsets[i] <= end_offsets[i]))
213                 interleave_count++;
214 	/* This is a rudimentary check for interleaving, but should suffice
215 	   for the moment. */
216     }
217 
218     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
219 
220     if (fd->hints->cb_read == ADIOI_HINT_DISABLE
221 	|| (!interleave_count && (fd->hints->cb_read == ADIOI_HINT_AUTO)))
222     {
223 	/* don't do aggregation */
224 	if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
225 	    ADIOI_Free(offset_list);
226 	    ADIOI_Free(len_list);
227 	    ADIOI_Free(st_offsets);
228 	    ADIOI_Free(end_offsets);
229 	}
230 
231 	fd->fp_ind = orig_fp;
232 	ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
233 
234 	if (buftype_is_contig && filetype_is_contig) {
235 	    if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
236 		off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
237 		ADIO_ReadContig(fd, buf, count, datatype, ADIO_EXPLICIT_OFFSET,
238                        off, status, error_code);
239 	    }
240 	    else ADIO_ReadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
241                        0, status, error_code);
242 	}
243 	else ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type,
244                        offset, status, error_code);
245 
246 	return;
247     }
248 
249 #if BGL_PROFILE
250     BGLMPIO_T_CIO_SET_GET( 0, r, 1, 1, 1, BGLMPIO_CIO_FD_PART, BGLMPIO_CIO_PATANA )
251 #endif
252 
253     /* We're going to perform aggregation of I/O.  Here we call
254      * ADIOI_Calc_file_domains() to determine what processes will handle I/O
255      * to what regions.  We pass nprocs_for_coll into this function; it is
256      * used to determine how many processes will perform I/O, which is also
257      * the number of regions into which the range of bytes must be divided.
258      * These regions are called "file domains", or FDs.
259      *
260      * When this function returns, fd_start, fd_end, fd_size, and
261      * min_st_offset will be filled in.  fd_start holds the starting byte
262      * location for each file domain.  fd_end holds the ending byte location.
263      * min_st_offset holds the minimum byte location that will be accessed.
264      *
265      * Both fd_start[] and fd_end[] are indexed by an aggregator number; this
266      * needs to be mapped to an actual rank in the communicator later.
267      *
268      */
269     if (bglmpio_tuneblocking)
270     ADIOI_BGL_GPFS_Calc_file_domains(st_offsets, end_offsets, nprocs,
271 			    nprocs_for_coll, &min_st_offset,
272 			    &fd_start, &fd_end, &fd_size, fd->fs_ptr);
273     else
274     ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
275 			    nprocs_for_coll, &min_st_offset,
276 			    &fd_start, &fd_end,
277 			    fd->hints->min_fdomain_size, &fd_size,
278 			    fd->hints->striping_unit);
279 
280 #if BGL_PROFILE
281     BGLMPIO_T_CIO_SET_GET( 0, r, 0, 1, 1, BGLMPIO_CIO_MYREQ, BGLMPIO_CIO_FD_PART )
282 #endif
283 
284     /* calculate where the portions of the access requests of this process
285      * are located in terms of the file domains.  this could be on the same
286      * process or on other processes.  this function fills in:
287      * count_my_req_procs - number of processes (including this one) for which
288      *     this process has requests in their file domain
289      * count_my_req_per_proc - count of requests for each process, indexed
290      *     by rank of the process
291      * my_req[] - array of data structures describing the requests to be
292      *     performed by each process (including self).  indexed by rank.
293      * buf_idx[] - array of locations into which data can be directly moved;
294      *     this is only valid for contiguous buffer case
295      */
296     if (bglmpio_tuneblocking)
297     ADIOI_BGL_Calc_my_req(fd, offset_list, len_list, contig_access_count,
298 		      min_st_offset, fd_start, fd_end, fd_size,
299 		      nprocs, &count_my_req_procs,
300 		      &count_my_req_per_proc, &my_req,
301 		      &buf_idx);
302     else
303     ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count,
304 		      min_st_offset, fd_start, fd_end, fd_size,
305 		      nprocs, &count_my_req_procs,
306 		      &count_my_req_per_proc, &my_req,
307 		      &buf_idx);
308 
309 #if BGL_PROFILE
310     BGLMPIO_T_CIO_SET_GET( 0, r, 1, 1, 1, BGLMPIO_CIO_OTHREQ, BGLMPIO_CIO_MYREQ )
311 #endif
312 
313     /* perform a collective communication in order to distribute the
314      * data calculated above.  fills in the following:
315      * count_others_req_procs - number of processes (including this
316      *     one) which have requests in this process's file domain.
317      * count_others_req_per_proc[] - number of separate contiguous
318      *     requests from proc i lie in this process's file domain.
319      */
320     if (bglmpio_tuneblocking)
321     ADIOI_BGL_Calc_others_req(fd, count_my_req_procs,
322 			  count_my_req_per_proc, my_req,
323 			  nprocs, myrank, &count_others_req_procs,
324 			  &others_req);
325 
326     else
327     ADIOI_Calc_others_req(fd, count_my_req_procs,
328 			  count_my_req_per_proc, my_req,
329 			  nprocs, myrank, &count_others_req_procs,
330 			  &others_req);
331 
332 #if BGL_PROFILE
333     BGLMPIO_T_CIO_SET_GET( 0, r, 1, 1, 1, BGLMPIO_CIO_DEXCH, BGLMPIO_CIO_OTHREQ )
334 #endif
335 
336     /* my_req[] and count_my_req_per_proc aren't needed at this point, so
337      * let's free the memory
338      */
339     ADIOI_Free(count_my_req_per_proc);
340     for (i=0; i<nprocs; i++) {
341 	if (my_req[i].count) {
342 	    ADIOI_Free(my_req[i].offsets);
343 	    ADIOI_Free(my_req[i].lens);
344 	}
345     }
346     ADIOI_Free(my_req);
347 
348 
349     /* read data in sizes of no more than ADIOI_Coll_bufsize,
350      * communicate, and fill user buf.
351      */
352     ADIOI_Read_and_exch(fd, buf, datatype, nprocs, myrank,
353                         others_req, offset_list,
354 			len_list, contig_access_count, min_st_offset,
355 			fd_size, fd_start, fd_end, buf_idx, error_code);
356 
357 #if BGL_PROFILE
358     BGLMPIO_T_CIO_SET_GET( 0, r, 1, 0, 1, BGLMPIO_CIO_LAST, BGLMPIO_CIO_T_DEXCH )
359     BGLMPIO_T_CIO_SET_GET( 0, r, 0, 0, 1, BGLMPIO_CIO_LAST, BGLMPIO_CIO_T_MPIO_CRW )
360 
361     BGLMPIO_T_CIO_REPORT( 0, r, fd, myrank )
362 #endif
363 
364     if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
365 
366     /* free all memory allocated for collective I/O */
367     for (i=0; i<nprocs; i++) {
368 	if (others_req[i].count) {
369 	    ADIOI_Free(others_req[i].offsets);
370 	    ADIOI_Free(others_req[i].lens);
371 	    ADIOI_Free(others_req[i].mem_ptrs);
372 	}
373     }
374     ADIOI_Free(others_req);
375 
376     ADIOI_Free(buf_idx);
377     ADIOI_Free(offset_list);
378     ADIOI_Free(len_list);
379     ADIOI_Free(st_offsets);
380     ADIOI_Free(end_offsets);
381     ADIOI_Free(fd_start);
382     ADIOI_Free(fd_end);
383 
384 #ifdef HAVE_STATUS_SET_BYTES
385     MPI_Type_size(datatype, &size);
386     bufsize = size * count;
387     MPIR_Status_set_bytes(status, datatype, bufsize);
388 /* This is a temporary way of filling in status. The right way is to
389    keep track of how much data was actually read and placed in buf
390    during collective I/O. */
391 #endif
392 
393     fd->fp_sys_posn = -1;   /* set it to null. */
394 }
395 
ADIOI_Read_and_exch(ADIO_File fd,void * buf,MPI_Datatype datatype,int nprocs,int myrank,ADIOI_Access * others_req,ADIO_Offset * offset_list,ADIO_Offset * len_list,int contig_access_count,ADIO_Offset min_st_offset,ADIO_Offset fd_size,ADIO_Offset * fd_start,ADIO_Offset * fd_end,int * buf_idx,int * error_code)396 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
397 			 datatype, int nprocs,
398 			 int myrank, ADIOI_Access
399 			 *others_req, ADIO_Offset *offset_list,
400 			 ADIO_Offset *len_list, int contig_access_count, ADIO_Offset
401                          min_st_offset, ADIO_Offset fd_size,
402 			 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
403                          int *buf_idx, int *error_code)
404 {
405 /* Read in sizes of no more than coll_bufsize, an info parameter.
406    Send data to appropriate processes.
407    Place recd. data in user buf.
408    The idea is to reduce the amount of extra memory required for
409    collective I/O. If all data were read all at once, which is much
410    easier, it would require temp space more than the size of user_buf,
411    which is often unacceptable. For example, to read a distributed
412    array from a file, where each local array is 8Mbytes, requiring
413    at least another 8Mbytes of temp space is unacceptable. */
414 
415     int i, j, m, ntimes, max_ntimes, buftype_is_contig;
416     ADIO_Offset st_loc=-1, end_loc=-1, off, done, real_off, req_off;
417     char *read_buf = NULL, *tmp_buf;
418     int *curr_offlen_ptr, *count, *send_size, *recv_size;
419     int *partial_send, *recd_from_proc, *start_pos;
420     /* Not convinced end_loc-st_loc couldn't be > int, so make these offsets*/
421     ADIO_Offset real_size, size, for_curr_iter, for_next_iter;
422     int req_len, flag, rank;
423     MPI_Status status;
424     ADIOI_Flatlist_node *flat_buf=NULL;
425     MPI_Aint buftype_extent;
426     int coll_bufsize;
427 #ifdef RDCOLL_DEBUG
428     int iii;
429 #endif
430     *error_code = MPI_SUCCESS;  /* changed below if error */
431     /* only I/O errors are currently reported */
432 
433 /* calculate the number of reads of size coll_bufsize
434    to be done by each process and the max among all processes.
435    That gives the no. of communication phases as well.
436    coll_bufsize is obtained from the hints object. */
437 
438     coll_bufsize = fd->hints->cb_buffer_size;
439 
440     /* grab some initial values for st_loc and end_loc */
441     for (i=0; i < nprocs; i++) {
442 	if (others_req[i].count) {
443 	    st_loc = others_req[i].offsets[0];
444 	    end_loc = others_req[i].offsets[0];
445 	    break;
446 	}
447     }
448 
449     /* now find the real values */
450     for (i=0; i < nprocs; i++)
451 	for (j=0; j<others_req[i].count; j++) {
452 	    st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
453 	    end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
454 					  + others_req[i].lens[j] - 1));
455 	}
456 
457     /* calculate ntimes, the number of times this process must perform I/O
458      * operations in order to complete all the requests it has received.
459      * the need for multiple I/O operations comes from the restriction that
460      * we only use coll_bufsize bytes of memory for internal buffering.
461      */
462     if ((st_loc==-1) && (end_loc==-1)) {
463 	/* this process does no I/O. */
464 	ntimes = 0;
465     }
466     else {
467 	/* ntimes=ceiling_div(end_loc - st_loc + 1, coll_bufsize)*/
468 	ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);
469     }
470 
471     MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX, fd->comm);
472 
473     if (ntimes) read_buf = (char *) ADIOI_Malloc(coll_bufsize);
474 
475     curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
476     /* its use is explained below. calloc initializes to 0. */
477 
478     count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
479     /* to store count of how many off-len pairs per proc are satisfied
480        in an iteration. */
481 
482     partial_send = (int *) ADIOI_Calloc(nprocs, sizeof(int));
483     /* if only a portion of the last off-len pair is sent to a process
484        in a particular iteration, the length sent is stored here.
485        calloc initializes to 0. */
486 
487     send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
488     /* total size of data to be sent to each proc. in an iteration */
489 
490     recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
491     /* total size of data to be recd. from each proc. in an iteration.
492        Of size nprocs so that I can use MPI_Alltoall later. */
493 
494     recd_from_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
495     /* amount of data recd. so far from each proc. Used in
496        ADIOI_Fill_user_buffer. initialized to 0 here. */
497 
498     start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));
499     /* used to store the starting value of curr_offlen_ptr[i] in
500        this iteration */
501 
502     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
503     if (!buftype_is_contig) {
504 	ADIOI_Flatten_datatype(datatype);
505 	flat_buf = ADIOI_Flatlist;
506         while (flat_buf->type != datatype) flat_buf = flat_buf->next;
507     }
508     MPI_Type_extent(datatype, &buftype_extent);
509 
510     done = 0;
511     off = st_loc;
512     for_curr_iter = for_next_iter = 0;
513 
514     MPI_Comm_rank(fd->comm, &rank);
515 
516 #ifdef PROFILE
517         MPE_Log_event(14, 0, "end computation");
518 #endif
519 
520     for (m=0; m<ntimes; m++) {
521        /* read buf of size coll_bufsize (or less) */
522        /* go through all others_req and check if any are satisfied
523           by the current read */
524 
525        /* since MPI guarantees that displacements in filetypes are in
526           monotonically nondecreasing order, I can maintain a pointer
527 	  (curr_offlen_ptr) to
528           current off-len pair for each process in others_req and scan
529           further only from there. There is still a problem of filetypes
530           such as:  (1, 2, 3 are not process nos. They are just numbers for
531           three chunks of data, specified by a filetype.)
532 
533                    1  -------!--
534                    2    -----!----
535                    3       --!-----
536 
537           where ! indicates where the current read_size limitation cuts
538           through the filetype.  I resolve this by reading up to !, but
539           filling the communication buffer only for 1. I copy the portion
540           left over for 2 into a tmp_buf for use in the next
541 	  iteration. i.e., 2 and 3 will be satisfied in the next
542 	  iteration. This simplifies filling in the user's buf at the
543 	  other end, as only one off-len pair with incomplete data
544 	  will be sent. I also don't need to send the individual
545 	  offsets and lens along with the data, as the data is being
546 	  sent in a particular order. */
547 
548           /* off = start offset in the file for the data actually read in
549                    this iteration
550              size = size of data read corresponding to off
551              real_off = off minus whatever data was retained in memory from
552                   previous iteration for cases like 2, 3 illustrated above
553              real_size = size plus the extra corresponding to real_off
554              req_off = off in file for a particular contiguous request
555                        minus what was satisfied in previous iteration
556              req_size = size corresponding to req_off */
557 
558 #ifdef PROFILE
559         MPE_Log_event(13, 0, "start computation");
560 #endif
561 	size = ADIOI_MIN((unsigned)coll_bufsize, end_loc-st_loc+1-done);
562 	real_off = off - for_curr_iter;
563 	real_size = size + for_curr_iter;
564 
565 	for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
566 	for_next_iter = 0;
567 
568 	for (i=0; i<nprocs; i++) {
569 #ifdef RDCOLL_DEBUG
570 	    DBG_FPRINTF(stderr, "rank %d, i %d, others_count %d\n", rank, i, others_req[i].count);
571 #endif
572 	    if (others_req[i].count) {
573 		start_pos[i] = curr_offlen_ptr[i];
574 		for (j=curr_offlen_ptr[i]; j<others_req[i].count;
575 		     j++) {
576 		    if (partial_send[i]) {
577 			/* this request may have been partially
578 			   satisfied in the previous iteration. */
579 			req_off = others_req[i].offsets[j] +
580 			    partial_send[i];
581                         req_len = others_req[i].lens[j] -
582 			    partial_send[i];
583 			partial_send[i] = 0;
584 			/* modify the off-len pair to reflect this change */
585 			others_req[i].offsets[j] = req_off;
586 			others_req[i].lens[j] = req_len;
587 		    }
588 		    else {
589 			req_off = others_req[i].offsets[j];
590                         req_len = others_req[i].lens[j];
591 		    }
592 		    if (req_off < real_off + real_size) {
593 			count[i]++;
594       ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)read_buf)+req_off-real_off) == (ADIO_Offset)(MPIR_Upint)(read_buf+req_off-real_off));
595 			MPI_Address(read_buf+req_off-real_off,
596                                &(others_req[i].mem_ptrs[j]));
597       ADIOI_Assert((real_off + real_size - req_off) == (int)(real_off + real_size - req_off));
598 			send_size[i] += (int)(ADIOI_MIN(real_off + real_size - req_off,
599                                       (ADIO_Offset)(unsigned)req_len));
600 
601 			if (real_off+real_size-req_off < (ADIO_Offset)(unsigned)req_len) {
602 			    partial_send[i] = (int) (real_off + real_size - req_off);
603 			    if ((j+1 < others_req[i].count) &&
604                                  (others_req[i].offsets[j+1] <
605                                      real_off+real_size)) {
606 				/* this is the case illustrated in the
607 				   figure above. */
608 				for_next_iter = ADIOI_MAX(for_next_iter,
609 					  real_off + real_size - others_req[i].offsets[j+1]);
610 				/* max because it must cover requests
611 				   from different processes */
612 			    }
613 			    break;
614 			}
615 		    }
616 		    else break;
617 		}
618 		curr_offlen_ptr[i] = j;
619 	    }
620 	}
621 
622 	flag = 0;
623 	for (i=0; i<nprocs; i++)
624 	    if (count[i]) flag = 1;
625 
626 #ifdef PROFILE
627         MPE_Log_event(14, 0, "end computation");
628 #endif
629 	if (flag) {
630       ADIOI_Assert(size == (int)size);
631 	    ADIO_ReadContig(fd, read_buf+for_curr_iter, (int)size, MPI_BYTE,
632 			    ADIO_EXPLICIT_OFFSET, off, &status, error_code);
633 #ifdef RDCOLL_DEBUG
634 	    DBG_FPRINTF(stderr, "\tread_coll: 700, data read [%lld] = ", size );
635 	    for (iii=0; iii<size && iii<80; iii++) { DBGV_FPRINTF(stderr, "%3d,", *((unsigned char *)read_buf + for_curr_iter + iii) ); }
636 	    DBG_FPRINTF(stderr, "\n" );
637 #endif
638 
639 	    if (*error_code != MPI_SUCCESS) return;
640 	}
641 
642 	for_curr_iter = for_next_iter;
643 
644 #ifdef PROFILE
645         MPE_Log_event(7, 0, "start communication");
646 #endif
647 	if (bglmpio_comm == 1)
648 	ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
649 			    send_size, recv_size, count,
650        			    start_pos, partial_send, recd_from_proc, nprocs,
651 			    myrank,
652 			    buftype_is_contig, contig_access_count,
653 			    min_st_offset, fd_size, fd_start, fd_end,
654 			    others_req,
655                             m, buftype_extent, buf_idx);
656         else
657 	if (bglmpio_comm == 0) {
658         ADIOI_R_Exchange_data_alltoallv(fd, buf, flat_buf, offset_list, len_list,
659                             send_size, recv_size, count,
660                             start_pos, partial_send, recd_from_proc, nprocs,
661                             myrank,
662                             buftype_is_contig, contig_access_count,
663                             min_st_offset, fd_size, fd_start, fd_end,
664                             others_req,
665                             m, buftype_extent, buf_idx);
666 	}
667 
668 
669 #ifdef PROFILE
670         MPE_Log_event(8, 0, "end communication");
671 #endif
672 
673 	if (for_next_iter) {
674 	    tmp_buf = (char *) ADIOI_Malloc(for_next_iter);
675       ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)read_buf)+real_size-for_next_iter) == (ADIO_Offset)(MPIR_Upint)(read_buf+real_size-for_next_iter));
676       ADIOI_Assert((for_next_iter+coll_bufsize) == (size_t)(for_next_iter+coll_bufsize));
677 	    memcpy(tmp_buf, read_buf+real_size-for_next_iter, for_next_iter);
678 	    ADIOI_Free(read_buf);
679 	    read_buf = (char *) ADIOI_Malloc(for_next_iter+coll_bufsize);
680 	    memcpy(read_buf, tmp_buf, for_next_iter);
681 	    ADIOI_Free(tmp_buf);
682 	}
683 
684 	off += size;
685 	done += size;
686     }
687 
688     for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
689 #ifdef PROFILE
690         MPE_Log_event(7, 0, "start communication");
691 #endif
692     for (m=ntimes; m<max_ntimes; m++)
693 /* nothing to send, but check for recv. */
694 
695 	if (bglmpio_comm == 1)
696 	ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
697 			    send_size, recv_size, count,
698 			    start_pos, partial_send, recd_from_proc, nprocs,
699 			    myrank,
700 			    buftype_is_contig, contig_access_count,
701 			    min_st_offset, fd_size, fd_start, fd_end,
702 			    others_req, m,
703                             buftype_extent, buf_idx);
704         else    /* strncmp( env_switch, "alltoall", 8 ) == 0 */
705 	if (bglmpio_comm == 0)
706         ADIOI_R_Exchange_data_alltoallv(fd, buf, flat_buf, offset_list, len_list,
707                             send_size, recv_size, count,
708                             start_pos, partial_send, recd_from_proc, nprocs,
709                             myrank,
710                             buftype_is_contig, contig_access_count,
711                             min_st_offset, fd_size, fd_start, fd_end,
712                             others_req,
713                             m, buftype_extent, buf_idx);
714 
715 #ifdef PROFILE
716         MPE_Log_event(8, 0, "end communication");
717 #endif
718 
719     if (ntimes) ADIOI_Free(read_buf);
720     ADIOI_Free(curr_offlen_ptr);
721     ADIOI_Free(count);
722     ADIOI_Free(partial_send);
723     ADIOI_Free(send_size);
724     ADIOI_Free(recv_size);
725     ADIOI_Free(recd_from_proc);
726     ADIOI_Free(start_pos);
727 }
728 
ADIOI_R_Exchange_data(ADIO_File fd,void * buf,ADIOI_Flatlist_node * flat_buf,ADIO_Offset * offset_list,ADIO_Offset * len_list,int * send_size,int * recv_size,int * count,int * start_pos,int * partial_send,int * recd_from_proc,int nprocs,int myrank,int buftype_is_contig,int contig_access_count,ADIO_Offset min_st_offset,ADIO_Offset fd_size,ADIO_Offset * fd_start,ADIO_Offset * fd_end,ADIOI_Access * others_req,int iter,MPI_Aint buftype_extent,int * buf_idx)729 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
730 			 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
731                          *len_list, int *send_size, int *recv_size,
732 			 int *count, int *start_pos, int *partial_send,
733 			 int *recd_from_proc, int nprocs,
734 			 int myrank, int
735 			 buftype_is_contig, int contig_access_count,
736 			 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
737 			 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
738 			 ADIOI_Access *others_req,
739                          int iter, MPI_Aint buftype_extent, int *buf_idx)
740 {
741     int i, j, k=0, tmp=0, nprocs_recv, nprocs_send;
742     char **recv_buf = NULL;
743     MPI_Request *requests;
744     MPI_Datatype send_type;
745     MPI_Status *statuses;
746 
747 /* exchange send_size info so that each process knows how much to
748    receive from whom and how much memory to allocate. */
749 
750     MPI_Alltoall(send_size, 1, MPI_INT, recv_size, 1, MPI_INT, fd->comm);
751 
752     nprocs_recv = 0;
753     for (i=0; i < nprocs; i++) if (recv_size[i]) nprocs_recv++;
754 
755     nprocs_send = 0;
756     for (i=0; i<nprocs; i++) if (send_size[i]) nprocs_send++;
757 
758     requests = (MPI_Request *)
759 	ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
760 /* +1 to avoid a 0-size malloc */
761 
762 /* post recvs. if buftype_is_contig, data can be directly recd. into
763    user buf at location given by buf_idx. else use recv_buf. */
764 
765 #ifdef AGGREGATION_PROFILE
766     MPE_Log_event (5032, 0, NULL);
767 #endif
768 
769     if (buftype_is_contig) {
770 	j = 0;
771 	for (i=0; i < nprocs; i++)
772 	    if (recv_size[i]) {
773 		MPI_Irecv(((char *) buf) + buf_idx[i], recv_size[i],
774 		  MPI_BYTE, i, myrank+i+100*iter, fd->comm, requests+j);
775 		j++;
776 		buf_idx[i] += recv_size[i];
777 	    }
778     }
779     else {
780 /* allocate memory for recv_buf and post receives */
781 	recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char*));
782 	for (i=0; i < nprocs; i++)
783 	    if (recv_size[i]) recv_buf[i] =
784                                   (char *) ADIOI_Malloc(recv_size[i]);
785 
786 	    j = 0;
787 	    for (i=0; i < nprocs; i++)
788 		if (recv_size[i]) {
789 		    MPI_Irecv(recv_buf[i], recv_size[i], MPI_BYTE, i,
790 			      myrank+i+100*iter, fd->comm, requests+j);
791 		    j++;
792 #ifdef RDCOLL_DEBUG
793 		    DBG_FPRINTF(stderr, "node %d, recv_size %d, tag %d \n",
794 		       myrank, recv_size[i], myrank+i+100*iter);
795 #endif
796 		}
797     }
798 
799 /* create derived datatypes and send data */
800 
801     j = 0;
802     for (i=0; i<nprocs; i++) {
803 	if (send_size[i]) {
804 /* take care if the last off-len pair is a partial send */
805 	    if (partial_send[i]) {
806 		k = start_pos[i] + count[i] - 1;
807 		tmp = others_req[i].lens[k];
808 		others_req[i].lens[k] = partial_send[i];
809 	    }
810 	    MPI_Type_hindexed(count[i],
811                  &(others_req[i].lens[start_pos[i]]),
812 	            &(others_req[i].mem_ptrs[start_pos[i]]),
813 			 MPI_BYTE, &send_type);
814 	    /* absolute displacement; use MPI_BOTTOM in send */
815 	    MPI_Type_commit(&send_type);
816 	    MPI_Isend(MPI_BOTTOM, 1, send_type, i, myrank+i+100*iter,
817 		      fd->comm, requests+nprocs_recv+j);
818 	    MPI_Type_free(&send_type);
819 	    if (partial_send[i]) others_req[i].lens[k] = tmp;
820 	    j++;
821 	}
822     }
823 
824     statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \
825                                      sizeof(MPI_Status));
826      /* +1 to avoid a 0-size malloc */
827 
828     /* wait on the receives */
829     if (nprocs_recv) {
830 #ifdef NEEDS_MPI_TEST
831 	j = 0;
832 	while (!j) MPI_Testall(nprocs_recv, requests, &j, statuses);
833 #else
834 	MPI_Waitall(nprocs_recv, requests, statuses);
835 #endif
836 
837 	/* if noncontiguous, to the copies from the recv buffers */
838 	if (!buftype_is_contig)
839 	    ADIOI_Fill_user_buffer(fd, buf, flat_buf, recv_buf,
840 				   offset_list, len_list, (unsigned*)recv_size,
841 				   requests, statuses, recd_from_proc,
842 				   nprocs, contig_access_count,
843 				   min_st_offset, fd_size, fd_start, fd_end,
844 				   buftype_extent);
845     }
846 
847     /* wait on the sends*/
848     MPI_Waitall(nprocs_send, requests+nprocs_recv, statuses+nprocs_recv);
849 
850     ADIOI_Free(statuses);
851     ADIOI_Free(requests);
852 
853     if (!buftype_is_contig) {
854 	for (i=0; i < nprocs; i++)
855 	    if (recv_size[i]) ADIOI_Free(recv_buf[i]);
856 	ADIOI_Free(recv_buf);
857     }
858 #ifdef AGGREGATION_PROFILE
859     MPE_Log_event (5033, 0, NULL);
860 #endif
861 }
862 
863 #define ADIOI_BUF_INCR \
864 { \
865     while (buf_incr) { \
866 	size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
867 	user_buf_idx += size_in_buf; \
868 	flat_buf_sz -= size_in_buf; \
869 	if (!flat_buf_sz) { \
870             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
871             else { \
872                 flat_buf_idx = 0; \
873                 n_buftypes++; \
874             } \
875             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
876                               (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
877 	    flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
878 	} \
879 	buf_incr -= size_in_buf; \
880     } \
881 }
882 
883 
884 #define ADIOI_BUF_COPY \
885 { \
886     while (size) { \
887 	size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
888   ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIR_Upint)(buf + user_buf_idx)); \
889   ADIOI_Assert(size_in_buf == (size_t)size_in_buf); \
890 	memcpy(((char *) buf) + user_buf_idx, \
891 	       &(recv_buf[p][recv_buf_idx[p]]), size_in_buf); \
892 	recv_buf_idx[p] += size_in_buf; /* already tested (size_t)size_in_buf*/ \
893 	user_buf_idx += size_in_buf; \
894 	flat_buf_sz -= size_in_buf; \
895 	if (!flat_buf_sz) { \
896             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
897             else { \
898                 flat_buf_idx = 0; \
899                 n_buftypes++; \
900             } \
901             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
902                               (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
903 	    flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
904 	} \
905 	size -= size_in_buf; \
906 	buf_incr -= size_in_buf; \
907     } \
908     ADIOI_BUF_INCR \
909 }
910 
ADIOI_Fill_user_buffer(ADIO_File fd,void * buf,ADIOI_Flatlist_node * flat_buf,char ** recv_buf,ADIO_Offset * offset_list,ADIO_Offset * len_list,unsigned * recv_size,MPI_Request * requests,MPI_Status * statuses,int * recd_from_proc,int nprocs,int contig_access_count,ADIO_Offset min_st_offset,ADIO_Offset fd_size,ADIO_Offset * fd_start,ADIO_Offset * fd_end,MPI_Aint buftype_extent)911 static void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
912 				   *flat_buf, char **recv_buf, ADIO_Offset
913 				   *offset_list, ADIO_Offset *len_list,
914 				   unsigned *recv_size,
915 				   MPI_Request *requests, MPI_Status *statuses,
916 				   int *recd_from_proc, int nprocs,
917 				   int contig_access_count,
918 				   ADIO_Offset min_st_offset,
919 				   ADIO_Offset fd_size, ADIO_Offset *fd_start,
920 				   ADIO_Offset *fd_end,
921 				   MPI_Aint buftype_extent)
922 {
923 
924 /* this function is only called if buftype is not contig */
925 
926     int i, p, flat_buf_idx;
927     ADIO_Offset flat_buf_sz, size_in_buf, buf_incr, size;
928     int n_buftypes;
929     ADIO_Offset off, len, rem_len, user_buf_idx;
930     /* Not sure unsigned is necessary, but it makes the math safer */
931     unsigned *curr_from_proc, *done_from_proc, *recv_buf_idx;
932 
933     ADIOI_UNREFERENCED_ARG(requests);
934     ADIOI_UNREFERENCED_ARG(statuses);
935 
936 /*  curr_from_proc[p] = amount of data recd from proc. p that has already
937                         been accounted for so far
938     done_from_proc[p] = amount of data already recd from proc. p and
939                         filled into user buffer in previous iterations
940     user_buf_idx = current location in user buffer
941     recv_buf_idx[p] = current location in recv_buf of proc. p  */
942     curr_from_proc = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
943     done_from_proc = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
944     recv_buf_idx   = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
945 
946     for (i=0; i < nprocs; i++) {
947 	recv_buf_idx[i] = curr_from_proc[i] = 0;
948 	done_from_proc[i] = recd_from_proc[i];
949     }
950 
951     user_buf_idx = flat_buf->indices[0];
952     flat_buf_idx = 0;
953     n_buftypes = 0;
954     flat_buf_sz = flat_buf->blocklens[0];
955 
956     /* flat_buf_idx = current index into flattened buftype
957        flat_buf_sz = size of current contiguous component in
958                 flattened buf */
959 
960     for (i=0; i<contig_access_count; i++) {
961 	off     = offset_list[i];
962 	rem_len = len_list[i];
963 
964 	/* this request may span the file domains of more than one process */
965 	while (rem_len > 0) {
966 	    len = rem_len;
967 	    /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no
968 	     * longer than the single region that processor "p" is responsible
969 	     * for.
970 	     */
971 	    p = ADIOI_BGL_Calc_aggregator(fd,
972 				      off,
973 				      min_st_offset,
974 				      &len,
975 				      fd_size,
976 				      fd_start,
977 				      fd_end);
978 
979 	    if (recv_buf_idx[p] < recv_size[p]) {
980 		if (curr_from_proc[p]+len > done_from_proc[p]) {
981 		    if (done_from_proc[p] > curr_from_proc[p]) {
982 			size = ADIOI_MIN(curr_from_proc[p] + len -
983 			      done_from_proc[p], recv_size[p]-recv_buf_idx[p]);
984 			buf_incr = done_from_proc[p] - curr_from_proc[p];
985 			ADIOI_BUF_INCR
986 			buf_incr = curr_from_proc[p]+len-done_from_proc[p];
987       ADIOI_Assert((done_from_proc[p] + size) == (unsigned)((ADIO_Offset)done_from_proc[p] + size));
988 			curr_from_proc[p] = done_from_proc[p] + size;
989 			ADIOI_BUF_COPY
990 		    }
991 		    else {
992 			size = ADIOI_MIN(len,recv_size[p]-recv_buf_idx[p]);
993 			buf_incr = len;
994       ADIOI_Assert((curr_from_proc[p] + size) == (unsigned)((ADIO_Offset)curr_from_proc[p] + size));
995 			curr_from_proc[p] += (unsigned) size;
996 			ADIOI_BUF_COPY
997 		    }
998 		}
999 		else {
1000         ADIOI_Assert((curr_from_proc[p] + len) == (unsigned)((ADIO_Offset)curr_from_proc[p] + len));
1001 		    curr_from_proc[p] += (unsigned) len;
1002 		    buf_incr = len;
1003 		    ADIOI_BUF_INCR
1004 		}
1005 	    }
1006 	    else {
1007 		buf_incr = len;
1008 		ADIOI_BUF_INCR
1009 	    }
1010 	    off     += len;
1011 	    rem_len -= len;
1012 	}
1013     }
1014     for (i=0; i < nprocs; i++)
1015 	if (recv_size[i]) recd_from_proc[i] = curr_from_proc[i];
1016 
1017     ADIOI_Free(curr_from_proc);
1018     ADIOI_Free(done_from_proc);
1019     ADIOI_Free(recv_buf_idx);
1020 }
1021 
ADIOI_R_Exchange_data_alltoallv(ADIO_File fd,void * buf,ADIOI_Flatlist_node * flat_buf,ADIO_Offset * offset_list,ADIO_Offset * len_list,int * send_size,int * recv_size,int * count,int * start_pos,int * partial_send,int * recd_from_proc,int nprocs,int myrank,int buftype_is_contig,int contig_access_count,ADIO_Offset min_st_offset,ADIO_Offset fd_size,ADIO_Offset * fd_start,ADIO_Offset * fd_end,ADIOI_Access * others_req,int iter,MPI_Aint buftype_extent,int * buf_idx)1022 static void ADIOI_R_Exchange_data_alltoallv(
1023                 ADIO_File fd, void *buf, ADIOI_Flatlist_node
1024                 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
1025                 *len_list, int *send_size, int *recv_size,
1026                 int *count, int *start_pos, int *partial_send,
1027                 int *recd_from_proc, int nprocs,
1028                 int myrank, int
1029                 buftype_is_contig, int contig_access_count,
1030                 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
1031                 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
1032                 ADIOI_Access *others_req,
1033                 int iter, MPI_Aint buftype_extent, int *buf_idx)
1034 {
1035     int i, j, k=0, tmp=0, nprocs_recv, nprocs_send;
1036     char **recv_buf = NULL;
1037     MPI_Request *requests=NULL;
1038     MPI_Status *statuses=NULL;
1039     int rtail, stail;
1040     char *sbuf_ptr, *from_ptr;
1041     int  len;
1042     int  *sdispls, *rdispls;
1043     char *all_recv_buf, *all_send_buf;
1044 
1045   /* exchange send_size info so that each process knows how much to
1046      receive from whom and how much memory to allocate. */
1047     MPI_Alltoall(send_size, 1, MPI_INT, recv_size, 1, MPI_INT, fd->comm);
1048 
1049     nprocs_recv = 0;
1050     for (i=0; i<nprocs; i++) if (recv_size[i]) { nprocs_recv++; break; }
1051 
1052     nprocs_send = 0;
1053     for (i=0; i<nprocs; i++) if (send_size[i]) { nprocs_send++; break; }
1054 
1055   /* receiver side data structures */
1056     rdispls = (int *) ADIOI_Malloc( nprocs * sizeof(int) );
1057     rtail = 0;
1058     for (i=0; i<nprocs; i++) { rdispls[i] = rtail; rtail += recv_size[i]; }
1059 
1060         /* data buffer */
1061     all_recv_buf = (char *) ADIOI_Malloc( rtail );
1062     recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char *));
1063     for (i=0; i<nprocs; i++) { recv_buf[i] = all_recv_buf + rdispls[i]; }
1064 
1065   /* sender side data structures */
1066     sdispls = (int *) ADIOI_Malloc( nprocs * sizeof(int) );
1067     stail = 0;
1068     for (i=0; i<nprocs; i++) { sdispls[i] = stail; stail += send_size[i]; }
1069 
1070         /* data buffer */
1071     all_send_buf = (char *) ADIOI_Malloc( stail );
1072     for (i=0; i<nprocs; i++)
1073     {
1074         if (send_size[i]) {
1075 	    if (partial_send[i]) {
1076 		k = start_pos[i] + count[i] - 1;
1077 		tmp = others_req[i].lens[k];
1078 		others_req[i].lens[k] = partial_send[i];
1079 	    }
1080             sbuf_ptr = all_send_buf + sdispls[i];
1081             for (j=0; j<count[i]; j++) {
1082                 ADIOI_ENSURE_AINT_FITS_IN_PTR( others_req[i].mem_ptrs[ start_pos[i]+j ]);
1083                 from_ptr = (char *) ADIOI_AINT_CAST_TO_VOID_PTR ( others_req[i].mem_ptrs[ start_pos[i]+j ] );
1084                 len      =           others_req[i].lens[     start_pos[i]+j ]  ;
1085                 memcpy( sbuf_ptr, from_ptr, len );
1086                 sbuf_ptr += len;
1087             }
1088 	    if (partial_send[i]) others_req[i].lens[k] = tmp;
1089         }
1090     }
1091 
1092 #if RDCOLL_DEBUG
1093     DBG_FPRINTF(stderr, "\tsend_size = [%d]%2d,",0,send_size[0]);
1094     for (i=1; i<nprocs; i++) if(send_size[i-1]!=send_size[i]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i,send_size[i] ); }
1095     DBG_FPRINTF(stderr, "\trecv_size =  [%d]%2d,",0,recv_size[0]);
1096     for (i=1; i<nprocs; i++) if(recv_size[i-1]!=recv_size[i]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i,recv_size[i] ); }
1097     DBG_FPRINTF(stderr, "\tsdispls   =  [%d]%2d,",0,sdispls[0]);
1098     for (i=1; i<nprocs; i++) if(sdispls[i-1]!=sdispls[i]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i,sdispls  [i] ); }
1099     DBG_FPRINTF(stderr, "\trdispls   =  [%d]%2d,",0,rdispls[0]);
1100     for (i=1; i<nprocs; i++) if(rdispls[i-1]!=rdispls[i]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i,rdispls  [i] ); }
1101     DBG_FPRINTF(stderr, "\ttails = %4d, %4d\n", stail, rtail );
1102     if (nprocs_send) {
1103     DBG_FPRINTF(stderr, "\tall_send_buf =  [%d]%2d,",0,all_send_buf[0]);
1104     for (i=1; i<nprocs; i++) if(all_send_buf[(i-1)*131072]!=all_send_buf[i*131072]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i, all_send_buf  [i*131072] ); }
1105     }
1106 #endif
1107 
1108   /* alltoallv */
1109     MPI_Alltoallv(
1110             all_send_buf, send_size, sdispls, MPI_BYTE,
1111             all_recv_buf, recv_size, rdispls, MPI_BYTE,
1112             fd->comm );
1113 
1114 #if 0
1115     DBG_FPRINTF(stderr, "\tall_recv_buf = " );
1116     for (i=131072; i<131073; i++) { DBG_FPRINTF(stderr, "%2d,", all_recv_buf  [i] ); }
1117     DBG_FPRINTF(stderr, "\n" );
1118 #endif
1119 
1120   /* unpack at the receiver side */
1121     if (nprocs_recv) {
1122         if (!buftype_is_contig)
1123             ADIOI_Fill_user_buffer(fd, buf, flat_buf, recv_buf,
1124                                    offset_list, len_list, (unsigned*)recv_size,
1125                                    requests, statuses,          /* never used inside */
1126                                    recd_from_proc,
1127                                    nprocs, contig_access_count,
1128                                    min_st_offset, fd_size, fd_start, fd_end,
1129                                buftype_extent);
1130         else {
1131 	    rtail = 0;
1132             for (i=0; i < nprocs; i++)
1133                 if (recv_size[i]) {
1134                     memcpy( (char *)buf + buf_idx[i], all_recv_buf + rtail, recv_size[i] );
1135 		    buf_idx[i] += recv_size[i];
1136 		    rtail += recv_size[i];
1137 		}
1138         }
1139     }
1140 
1141     ADIOI_Free( all_send_buf );
1142     ADIOI_Free( all_recv_buf );
1143     ADIOI_Free( recv_buf  );
1144     ADIOI_Free( sdispls );
1145     ADIOI_Free( rdispls );
1146     return;
1147 }
1148