1 /* ---------------------------------------------------------------- */
2 /* (C)Copyright IBM Corp. 2007, 2008 */
3 /* ---------------------------------------------------------------- */
4 /**
5 * \file ad_bgl_rdcoll.c
6 * \brief ???
7 */
8
9 /* -*- Mode: C; c-basic-offset:4 ; -*- */
10 /*
11 *
12 * Copyright (C) 1997 University of Chicago.
13 * See COPYRIGHT notice in top-level directory.
14 */
15
16 #include "adio.h"
17 #include "adio_extern.h"
18 #include "ad_bgl.h"
19 #include "ad_bgl_pset.h"
20 #include "ad_bgl_aggrs.h"
21
22 #ifdef PROFILE
23 #include "mpe.h"
24 #endif
25
26 #ifdef USE_DBG_LOGGING
27 #define RDCOLL_DEBUG 1
28 #endif
29 #ifdef AGGREGATION_PROFILE
30 #include "mpe.h"
31 #endif
32
33 /* prototypes of functions used for collective reads only. */
34 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
35 datatype, int nprocs,
36 int myrank, ADIOI_Access
37 *others_req, ADIO_Offset *offset_list,
38 ADIO_Offset *len_list, int contig_access_count,
39 ADIO_Offset
40 min_st_offset, ADIO_Offset fd_size,
41 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
42 int *buf_idx, int *error_code);
43 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
44 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
45 *len_list, int *send_size, int *recv_size,
46 int *count, int *start_pos,
47 int *partial_send,
48 int *recd_from_proc, int nprocs,
49 int myrank, int
50 buftype_is_contig, int contig_access_count,
51 ADIO_Offset min_st_offset,
52 ADIO_Offset fd_size,
53 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
54 ADIOI_Access *others_req,
55 int iter,
56 MPI_Aint buftype_extent, int *buf_idx);
57 static void ADIOI_R_Exchange_data_alltoallv(ADIO_File fd, void *buf, ADIOI_Flatlist_node
58 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
59 *len_list, int *send_size, int *recv_size,
60 int *count, int *start_pos,
61 int *partial_send,
62 int *recd_from_proc, int nprocs,
63 int myrank, int
64 buftype_is_contig, int contig_access_count,
65 ADIO_Offset min_st_offset,
66 ADIO_Offset fd_size,
67 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
68 ADIOI_Access *others_req,
69 int iter,
70 MPI_Aint buftype_extent, int *buf_idx);
71 static void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
72 *flat_buf, char **recv_buf, ADIO_Offset
73 *offset_list, ADIO_Offset *len_list,
74 unsigned *recv_size,
75 MPI_Request *requests, MPI_Status *statuses,
76 int *recd_from_proc, int nprocs,
77 int contig_access_count,
78 ADIO_Offset min_st_offset,
79 ADIO_Offset fd_size, ADIO_Offset *fd_start,
80 ADIO_Offset *fd_end,
81 MPI_Aint buftype_extent);
82
83 extern void ADIOI_Calc_my_off_len(ADIO_File fd, int bufcount, MPI_Datatype
84 datatype, int file_ptr_type, ADIO_Offset
85 offset, ADIO_Offset **offset_list_ptr, ADIO_Offset
86 **len_list_ptr, ADIO_Offset *start_offset_ptr,
87 ADIO_Offset *end_offset_ptr, int
88 *contig_access_count_ptr);
89
ADIOI_BGL_ReadStridedColl(ADIO_File fd,void * buf,int count,MPI_Datatype datatype,int file_ptr_type,ADIO_Offset offset,ADIO_Status * status,int * error_code)90 void ADIOI_BGL_ReadStridedColl(ADIO_File fd, void *buf, int count,
91 MPI_Datatype datatype, int file_ptr_type,
92 ADIO_Offset offset, ADIO_Status *status, int
93 *error_code)
94 {
95 /* Uses a generalized version of the extended two-phase method described
96 in "An Extended Two-Phase Method for Accessing Sections of
97 Out-of-Core Arrays", Rajeev Thakur and Alok Choudhary,
98 Scientific Programming, (5)4:301--317, Winter 1996.
99 http://www.mcs.anl.gov/home/thakur/ext2ph.ps */
100
101 ADIOI_Access *my_req;
102 /* array of nprocs structures, one for each other process in
103 whose file domain this process's request lies */
104
105 ADIOI_Access *others_req;
106 /* array of nprocs structures, one for each other process
107 whose request lies in this process's file domain. */
108
109 int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank;
110 int contig_access_count=0, interleave_count = 0, buftype_is_contig;
111 int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
112 ADIO_Offset start_offset, end_offset, orig_fp, fd_size, min_st_offset, off;
113 ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL,
114 *fd_end = NULL, *end_offsets = NULL;
115 ADIO_Offset *bgl_offsets0 = NULL, *bgl_offsets = NULL;
116 int ii;
117 ADIO_Offset *len_list = NULL;
118 int *buf_idx = NULL;
119 #if BGL_PROFILE
120 BGLMPIO_T_CIO_RESET( 0, r )
121 #endif
122
123 #ifdef HAVE_STATUS_SET_BYTES
124 int bufsize, size;
125 #endif
126
127 #if 0
128 /* From common code - not implemented for bgl. */
129 if (fd->hints->cb_pfr != ADIOI_HINT_DISABLE) {
130 ADIOI_IOStridedColl (fd, buf, count, ADIOI_READ, datatype,
131 file_ptr_type, offset, status, error_code);
132 return;
133 } */
134 #endif
135 #ifdef PROFILE
136 MPE_Log_event(13, 0, "start computation");
137 #endif
138
139 MPI_Comm_size(fd->comm, &nprocs);
140 MPI_Comm_rank(fd->comm, &myrank);
141
142 /* number of aggregators, cb_nodes, is stored in the hints */
143 nprocs_for_coll = fd->hints->cb_nodes;
144 orig_fp = fd->fp_ind;
145
146 #if BGL_PROFILE
147 BGLMPIO_T_CIO_SET_GET( 0, r, 0, 1, 0, BGLMPIO_CIO_LCOMP, BGLMPIO_CIO_LAST )
148 #endif
149
150 /* only check for interleaving if cb_read isn't disabled */
151 if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
152 /* For this process's request, calculate the list of offsets and
153 lengths in the file and determine the start and end offsets. */
154
155 /* Note: end_offset points to the last byte-offset that will be accessed.
156 e.g., if start_offset=0 and 100 bytes to be read, end_offset=99*/
157
158 ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
159 &offset_list, &len_list, &start_offset,
160 &end_offset, &contig_access_count);
161
162 #if BGL_PROFILE
163 BGLMPIO_T_CIO_SET_GET( 0, r, 1, 1, 1, BGLMPIO_CIO_GATHER, BGLMPIO_CIO_LCOMP )
164 #endif
165
166 #ifdef RDCOLL_DEBUG
167 for (i=0; i<contig_access_count; i++) {
168 DBG_FPRINTF(stderr, "rank %d off %lld len %lld\n",
169 myrank, offset_list[i], len_list[i]);
170 }
171 #endif
172
173 /* each process communicates its start and end offsets to other
174 processes. The result is an array each of start and end offsets
175 stored in order of process rank. */
176
177 st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
178 end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
179
180 if (bglmpio_tunegather) {
181 bgl_offsets0 = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset));
182 bgl_offsets = (ADIO_Offset *) ADIOI_Malloc(2*nprocs*sizeof(ADIO_Offset));
183 for (ii=0; ii<nprocs; ii++) {
184 bgl_offsets0[ii*2] = 0;
185 bgl_offsets0[ii*2+1] = 0;
186 }
187 bgl_offsets0[myrank*2] = start_offset;
188 bgl_offsets0[myrank*2+1] = end_offset;
189
190 MPI_Allreduce( bgl_offsets0, bgl_offsets, nprocs*2, ADIO_OFFSET, MPI_MAX, fd->comm );
191
192 for (ii=0; ii<nprocs; ii++) {
193 st_offsets [ii] = bgl_offsets[ii*2] ;
194 end_offsets[ii] = bgl_offsets[ii*2+1];
195 }
196 ADIOI_Free( bgl_offsets0 );
197 ADIOI_Free( bgl_offsets );
198 } else {
199 MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
200 ADIO_OFFSET, fd->comm);
201 MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
202 ADIO_OFFSET, fd->comm);
203 }
204
205 #if BGL_PROFILE
206 BGLMPIO_T_CIO_SET_GET( 0, r, 0, 1, 1, BGLMPIO_CIO_PATANA, BGLMPIO_CIO_GATHER )
207 #endif
208
209 /* are the accesses of different processes interleaved? */
210 for (i=1; i<nprocs; i++)
211 if ((st_offsets[i] < end_offsets[i-1]) &&
212 (st_offsets[i] <= end_offsets[i]))
213 interleave_count++;
214 /* This is a rudimentary check for interleaving, but should suffice
215 for the moment. */
216 }
217
218 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
219
220 if (fd->hints->cb_read == ADIOI_HINT_DISABLE
221 || (!interleave_count && (fd->hints->cb_read == ADIOI_HINT_AUTO)))
222 {
223 /* don't do aggregation */
224 if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
225 ADIOI_Free(offset_list);
226 ADIOI_Free(len_list);
227 ADIOI_Free(st_offsets);
228 ADIOI_Free(end_offsets);
229 }
230
231 fd->fp_ind = orig_fp;
232 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
233
234 if (buftype_is_contig && filetype_is_contig) {
235 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
236 off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
237 ADIO_ReadContig(fd, buf, count, datatype, ADIO_EXPLICIT_OFFSET,
238 off, status, error_code);
239 }
240 else ADIO_ReadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
241 0, status, error_code);
242 }
243 else ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type,
244 offset, status, error_code);
245
246 return;
247 }
248
249 #if BGL_PROFILE
250 BGLMPIO_T_CIO_SET_GET( 0, r, 1, 1, 1, BGLMPIO_CIO_FD_PART, BGLMPIO_CIO_PATANA )
251 #endif
252
253 /* We're going to perform aggregation of I/O. Here we call
254 * ADIOI_Calc_file_domains() to determine what processes will handle I/O
255 * to what regions. We pass nprocs_for_coll into this function; it is
256 * used to determine how many processes will perform I/O, which is also
257 * the number of regions into which the range of bytes must be divided.
258 * These regions are called "file domains", or FDs.
259 *
260 * When this function returns, fd_start, fd_end, fd_size, and
261 * min_st_offset will be filled in. fd_start holds the starting byte
262 * location for each file domain. fd_end holds the ending byte location.
263 * min_st_offset holds the minimum byte location that will be accessed.
264 *
265 * Both fd_start[] and fd_end[] are indexed by an aggregator number; this
266 * needs to be mapped to an actual rank in the communicator later.
267 *
268 */
269 if (bglmpio_tuneblocking)
270 ADIOI_BGL_GPFS_Calc_file_domains(st_offsets, end_offsets, nprocs,
271 nprocs_for_coll, &min_st_offset,
272 &fd_start, &fd_end, &fd_size, fd->fs_ptr);
273 else
274 ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
275 nprocs_for_coll, &min_st_offset,
276 &fd_start, &fd_end,
277 fd->hints->min_fdomain_size, &fd_size,
278 fd->hints->striping_unit);
279
280 #if BGL_PROFILE
281 BGLMPIO_T_CIO_SET_GET( 0, r, 0, 1, 1, BGLMPIO_CIO_MYREQ, BGLMPIO_CIO_FD_PART )
282 #endif
283
284 /* calculate where the portions of the access requests of this process
285 * are located in terms of the file domains. this could be on the same
286 * process or on other processes. this function fills in:
287 * count_my_req_procs - number of processes (including this one) for which
288 * this process has requests in their file domain
289 * count_my_req_per_proc - count of requests for each process, indexed
290 * by rank of the process
291 * my_req[] - array of data structures describing the requests to be
292 * performed by each process (including self). indexed by rank.
293 * buf_idx[] - array of locations into which data can be directly moved;
294 * this is only valid for contiguous buffer case
295 */
296 if (bglmpio_tuneblocking)
297 ADIOI_BGL_Calc_my_req(fd, offset_list, len_list, contig_access_count,
298 min_st_offset, fd_start, fd_end, fd_size,
299 nprocs, &count_my_req_procs,
300 &count_my_req_per_proc, &my_req,
301 &buf_idx);
302 else
303 ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count,
304 min_st_offset, fd_start, fd_end, fd_size,
305 nprocs, &count_my_req_procs,
306 &count_my_req_per_proc, &my_req,
307 &buf_idx);
308
309 #if BGL_PROFILE
310 BGLMPIO_T_CIO_SET_GET( 0, r, 1, 1, 1, BGLMPIO_CIO_OTHREQ, BGLMPIO_CIO_MYREQ )
311 #endif
312
313 /* perform a collective communication in order to distribute the
314 * data calculated above. fills in the following:
315 * count_others_req_procs - number of processes (including this
316 * one) which have requests in this process's file domain.
317 * count_others_req_per_proc[] - number of separate contiguous
318 * requests from proc i lie in this process's file domain.
319 */
320 if (bglmpio_tuneblocking)
321 ADIOI_BGL_Calc_others_req(fd, count_my_req_procs,
322 count_my_req_per_proc, my_req,
323 nprocs, myrank, &count_others_req_procs,
324 &others_req);
325
326 else
327 ADIOI_Calc_others_req(fd, count_my_req_procs,
328 count_my_req_per_proc, my_req,
329 nprocs, myrank, &count_others_req_procs,
330 &others_req);
331
332 #if BGL_PROFILE
333 BGLMPIO_T_CIO_SET_GET( 0, r, 1, 1, 1, BGLMPIO_CIO_DEXCH, BGLMPIO_CIO_OTHREQ )
334 #endif
335
336 /* my_req[] and count_my_req_per_proc aren't needed at this point, so
337 * let's free the memory
338 */
339 ADIOI_Free(count_my_req_per_proc);
340 for (i=0; i<nprocs; i++) {
341 if (my_req[i].count) {
342 ADIOI_Free(my_req[i].offsets);
343 ADIOI_Free(my_req[i].lens);
344 }
345 }
346 ADIOI_Free(my_req);
347
348
349 /* read data in sizes of no more than ADIOI_Coll_bufsize,
350 * communicate, and fill user buf.
351 */
352 ADIOI_Read_and_exch(fd, buf, datatype, nprocs, myrank,
353 others_req, offset_list,
354 len_list, contig_access_count, min_st_offset,
355 fd_size, fd_start, fd_end, buf_idx, error_code);
356
357 #if BGL_PROFILE
358 BGLMPIO_T_CIO_SET_GET( 0, r, 1, 0, 1, BGLMPIO_CIO_LAST, BGLMPIO_CIO_T_DEXCH )
359 BGLMPIO_T_CIO_SET_GET( 0, r, 0, 0, 1, BGLMPIO_CIO_LAST, BGLMPIO_CIO_T_MPIO_CRW )
360
361 BGLMPIO_T_CIO_REPORT( 0, r, fd, myrank )
362 #endif
363
364 if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
365
366 /* free all memory allocated for collective I/O */
367 for (i=0; i<nprocs; i++) {
368 if (others_req[i].count) {
369 ADIOI_Free(others_req[i].offsets);
370 ADIOI_Free(others_req[i].lens);
371 ADIOI_Free(others_req[i].mem_ptrs);
372 }
373 }
374 ADIOI_Free(others_req);
375
376 ADIOI_Free(buf_idx);
377 ADIOI_Free(offset_list);
378 ADIOI_Free(len_list);
379 ADIOI_Free(st_offsets);
380 ADIOI_Free(end_offsets);
381 ADIOI_Free(fd_start);
382 ADIOI_Free(fd_end);
383
384 #ifdef HAVE_STATUS_SET_BYTES
385 MPI_Type_size(datatype, &size);
386 bufsize = size * count;
387 MPIR_Status_set_bytes(status, datatype, bufsize);
388 /* This is a temporary way of filling in status. The right way is to
389 keep track of how much data was actually read and placed in buf
390 during collective I/O. */
391 #endif
392
393 fd->fp_sys_posn = -1; /* set it to null. */
394 }
395
ADIOI_Read_and_exch(ADIO_File fd,void * buf,MPI_Datatype datatype,int nprocs,int myrank,ADIOI_Access * others_req,ADIO_Offset * offset_list,ADIO_Offset * len_list,int contig_access_count,ADIO_Offset min_st_offset,ADIO_Offset fd_size,ADIO_Offset * fd_start,ADIO_Offset * fd_end,int * buf_idx,int * error_code)396 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
397 datatype, int nprocs,
398 int myrank, ADIOI_Access
399 *others_req, ADIO_Offset *offset_list,
400 ADIO_Offset *len_list, int contig_access_count, ADIO_Offset
401 min_st_offset, ADIO_Offset fd_size,
402 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
403 int *buf_idx, int *error_code)
404 {
405 /* Read in sizes of no more than coll_bufsize, an info parameter.
406 Send data to appropriate processes.
407 Place recd. data in user buf.
408 The idea is to reduce the amount of extra memory required for
409 collective I/O. If all data were read all at once, which is much
410 easier, it would require temp space more than the size of user_buf,
411 which is often unacceptable. For example, to read a distributed
412 array from a file, where each local array is 8Mbytes, requiring
413 at least another 8Mbytes of temp space is unacceptable. */
414
415 int i, j, m, ntimes, max_ntimes, buftype_is_contig;
416 ADIO_Offset st_loc=-1, end_loc=-1, off, done, real_off, req_off;
417 char *read_buf = NULL, *tmp_buf;
418 int *curr_offlen_ptr, *count, *send_size, *recv_size;
419 int *partial_send, *recd_from_proc, *start_pos;
420 /* Not convinced end_loc-st_loc couldn't be > int, so make these offsets*/
421 ADIO_Offset real_size, size, for_curr_iter, for_next_iter;
422 int req_len, flag, rank;
423 MPI_Status status;
424 ADIOI_Flatlist_node *flat_buf=NULL;
425 MPI_Aint buftype_extent;
426 int coll_bufsize;
427 #ifdef RDCOLL_DEBUG
428 int iii;
429 #endif
430 *error_code = MPI_SUCCESS; /* changed below if error */
431 /* only I/O errors are currently reported */
432
433 /* calculate the number of reads of size coll_bufsize
434 to be done by each process and the max among all processes.
435 That gives the no. of communication phases as well.
436 coll_bufsize is obtained from the hints object. */
437
438 coll_bufsize = fd->hints->cb_buffer_size;
439
440 /* grab some initial values for st_loc and end_loc */
441 for (i=0; i < nprocs; i++) {
442 if (others_req[i].count) {
443 st_loc = others_req[i].offsets[0];
444 end_loc = others_req[i].offsets[0];
445 break;
446 }
447 }
448
449 /* now find the real values */
450 for (i=0; i < nprocs; i++)
451 for (j=0; j<others_req[i].count; j++) {
452 st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
453 end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
454 + others_req[i].lens[j] - 1));
455 }
456
457 /* calculate ntimes, the number of times this process must perform I/O
458 * operations in order to complete all the requests it has received.
459 * the need for multiple I/O operations comes from the restriction that
460 * we only use coll_bufsize bytes of memory for internal buffering.
461 */
462 if ((st_loc==-1) && (end_loc==-1)) {
463 /* this process does no I/O. */
464 ntimes = 0;
465 }
466 else {
467 /* ntimes=ceiling_div(end_loc - st_loc + 1, coll_bufsize)*/
468 ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);
469 }
470
471 MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX, fd->comm);
472
473 if (ntimes) read_buf = (char *) ADIOI_Malloc(coll_bufsize);
474
475 curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
476 /* its use is explained below. calloc initializes to 0. */
477
478 count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
479 /* to store count of how many off-len pairs per proc are satisfied
480 in an iteration. */
481
482 partial_send = (int *) ADIOI_Calloc(nprocs, sizeof(int));
483 /* if only a portion of the last off-len pair is sent to a process
484 in a particular iteration, the length sent is stored here.
485 calloc initializes to 0. */
486
487 send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
488 /* total size of data to be sent to each proc. in an iteration */
489
490 recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
491 /* total size of data to be recd. from each proc. in an iteration.
492 Of size nprocs so that I can use MPI_Alltoall later. */
493
494 recd_from_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
495 /* amount of data recd. so far from each proc. Used in
496 ADIOI_Fill_user_buffer. initialized to 0 here. */
497
498 start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));
499 /* used to store the starting value of curr_offlen_ptr[i] in
500 this iteration */
501
502 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
503 if (!buftype_is_contig) {
504 ADIOI_Flatten_datatype(datatype);
505 flat_buf = ADIOI_Flatlist;
506 while (flat_buf->type != datatype) flat_buf = flat_buf->next;
507 }
508 MPI_Type_extent(datatype, &buftype_extent);
509
510 done = 0;
511 off = st_loc;
512 for_curr_iter = for_next_iter = 0;
513
514 MPI_Comm_rank(fd->comm, &rank);
515
516 #ifdef PROFILE
517 MPE_Log_event(14, 0, "end computation");
518 #endif
519
520 for (m=0; m<ntimes; m++) {
521 /* read buf of size coll_bufsize (or less) */
522 /* go through all others_req and check if any are satisfied
523 by the current read */
524
525 /* since MPI guarantees that displacements in filetypes are in
526 monotonically nondecreasing order, I can maintain a pointer
527 (curr_offlen_ptr) to
528 current off-len pair for each process in others_req and scan
529 further only from there. There is still a problem of filetypes
530 such as: (1, 2, 3 are not process nos. They are just numbers for
531 three chunks of data, specified by a filetype.)
532
533 1 -------!--
534 2 -----!----
535 3 --!-----
536
537 where ! indicates where the current read_size limitation cuts
538 through the filetype. I resolve this by reading up to !, but
539 filling the communication buffer only for 1. I copy the portion
540 left over for 2 into a tmp_buf for use in the next
541 iteration. i.e., 2 and 3 will be satisfied in the next
542 iteration. This simplifies filling in the user's buf at the
543 other end, as only one off-len pair with incomplete data
544 will be sent. I also don't need to send the individual
545 offsets and lens along with the data, as the data is being
546 sent in a particular order. */
547
548 /* off = start offset in the file for the data actually read in
549 this iteration
550 size = size of data read corresponding to off
551 real_off = off minus whatever data was retained in memory from
552 previous iteration for cases like 2, 3 illustrated above
553 real_size = size plus the extra corresponding to real_off
554 req_off = off in file for a particular contiguous request
555 minus what was satisfied in previous iteration
556 req_size = size corresponding to req_off */
557
558 #ifdef PROFILE
559 MPE_Log_event(13, 0, "start computation");
560 #endif
561 size = ADIOI_MIN((unsigned)coll_bufsize, end_loc-st_loc+1-done);
562 real_off = off - for_curr_iter;
563 real_size = size + for_curr_iter;
564
565 for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
566 for_next_iter = 0;
567
568 for (i=0; i<nprocs; i++) {
569 #ifdef RDCOLL_DEBUG
570 DBG_FPRINTF(stderr, "rank %d, i %d, others_count %d\n", rank, i, others_req[i].count);
571 #endif
572 if (others_req[i].count) {
573 start_pos[i] = curr_offlen_ptr[i];
574 for (j=curr_offlen_ptr[i]; j<others_req[i].count;
575 j++) {
576 if (partial_send[i]) {
577 /* this request may have been partially
578 satisfied in the previous iteration. */
579 req_off = others_req[i].offsets[j] +
580 partial_send[i];
581 req_len = others_req[i].lens[j] -
582 partial_send[i];
583 partial_send[i] = 0;
584 /* modify the off-len pair to reflect this change */
585 others_req[i].offsets[j] = req_off;
586 others_req[i].lens[j] = req_len;
587 }
588 else {
589 req_off = others_req[i].offsets[j];
590 req_len = others_req[i].lens[j];
591 }
592 if (req_off < real_off + real_size) {
593 count[i]++;
594 ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)read_buf)+req_off-real_off) == (ADIO_Offset)(MPIR_Upint)(read_buf+req_off-real_off));
595 MPI_Address(read_buf+req_off-real_off,
596 &(others_req[i].mem_ptrs[j]));
597 ADIOI_Assert((real_off + real_size - req_off) == (int)(real_off + real_size - req_off));
598 send_size[i] += (int)(ADIOI_MIN(real_off + real_size - req_off,
599 (ADIO_Offset)(unsigned)req_len));
600
601 if (real_off+real_size-req_off < (ADIO_Offset)(unsigned)req_len) {
602 partial_send[i] = (int) (real_off + real_size - req_off);
603 if ((j+1 < others_req[i].count) &&
604 (others_req[i].offsets[j+1] <
605 real_off+real_size)) {
606 /* this is the case illustrated in the
607 figure above. */
608 for_next_iter = ADIOI_MAX(for_next_iter,
609 real_off + real_size - others_req[i].offsets[j+1]);
610 /* max because it must cover requests
611 from different processes */
612 }
613 break;
614 }
615 }
616 else break;
617 }
618 curr_offlen_ptr[i] = j;
619 }
620 }
621
622 flag = 0;
623 for (i=0; i<nprocs; i++)
624 if (count[i]) flag = 1;
625
626 #ifdef PROFILE
627 MPE_Log_event(14, 0, "end computation");
628 #endif
629 if (flag) {
630 ADIOI_Assert(size == (int)size);
631 ADIO_ReadContig(fd, read_buf+for_curr_iter, (int)size, MPI_BYTE,
632 ADIO_EXPLICIT_OFFSET, off, &status, error_code);
633 #ifdef RDCOLL_DEBUG
634 DBG_FPRINTF(stderr, "\tread_coll: 700, data read [%lld] = ", size );
635 for (iii=0; iii<size && iii<80; iii++) { DBGV_FPRINTF(stderr, "%3d,", *((unsigned char *)read_buf + for_curr_iter + iii) ); }
636 DBG_FPRINTF(stderr, "\n" );
637 #endif
638
639 if (*error_code != MPI_SUCCESS) return;
640 }
641
642 for_curr_iter = for_next_iter;
643
644 #ifdef PROFILE
645 MPE_Log_event(7, 0, "start communication");
646 #endif
647 if (bglmpio_comm == 1)
648 ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
649 send_size, recv_size, count,
650 start_pos, partial_send, recd_from_proc, nprocs,
651 myrank,
652 buftype_is_contig, contig_access_count,
653 min_st_offset, fd_size, fd_start, fd_end,
654 others_req,
655 m, buftype_extent, buf_idx);
656 else
657 if (bglmpio_comm == 0) {
658 ADIOI_R_Exchange_data_alltoallv(fd, buf, flat_buf, offset_list, len_list,
659 send_size, recv_size, count,
660 start_pos, partial_send, recd_from_proc, nprocs,
661 myrank,
662 buftype_is_contig, contig_access_count,
663 min_st_offset, fd_size, fd_start, fd_end,
664 others_req,
665 m, buftype_extent, buf_idx);
666 }
667
668
669 #ifdef PROFILE
670 MPE_Log_event(8, 0, "end communication");
671 #endif
672
673 if (for_next_iter) {
674 tmp_buf = (char *) ADIOI_Malloc(for_next_iter);
675 ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)read_buf)+real_size-for_next_iter) == (ADIO_Offset)(MPIR_Upint)(read_buf+real_size-for_next_iter));
676 ADIOI_Assert((for_next_iter+coll_bufsize) == (size_t)(for_next_iter+coll_bufsize));
677 memcpy(tmp_buf, read_buf+real_size-for_next_iter, for_next_iter);
678 ADIOI_Free(read_buf);
679 read_buf = (char *) ADIOI_Malloc(for_next_iter+coll_bufsize);
680 memcpy(read_buf, tmp_buf, for_next_iter);
681 ADIOI_Free(tmp_buf);
682 }
683
684 off += size;
685 done += size;
686 }
687
688 for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
689 #ifdef PROFILE
690 MPE_Log_event(7, 0, "start communication");
691 #endif
692 for (m=ntimes; m<max_ntimes; m++)
693 /* nothing to send, but check for recv. */
694
695 if (bglmpio_comm == 1)
696 ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
697 send_size, recv_size, count,
698 start_pos, partial_send, recd_from_proc, nprocs,
699 myrank,
700 buftype_is_contig, contig_access_count,
701 min_st_offset, fd_size, fd_start, fd_end,
702 others_req, m,
703 buftype_extent, buf_idx);
704 else /* strncmp( env_switch, "alltoall", 8 ) == 0 */
705 if (bglmpio_comm == 0)
706 ADIOI_R_Exchange_data_alltoallv(fd, buf, flat_buf, offset_list, len_list,
707 send_size, recv_size, count,
708 start_pos, partial_send, recd_from_proc, nprocs,
709 myrank,
710 buftype_is_contig, contig_access_count,
711 min_st_offset, fd_size, fd_start, fd_end,
712 others_req,
713 m, buftype_extent, buf_idx);
714
715 #ifdef PROFILE
716 MPE_Log_event(8, 0, "end communication");
717 #endif
718
719 if (ntimes) ADIOI_Free(read_buf);
720 ADIOI_Free(curr_offlen_ptr);
721 ADIOI_Free(count);
722 ADIOI_Free(partial_send);
723 ADIOI_Free(send_size);
724 ADIOI_Free(recv_size);
725 ADIOI_Free(recd_from_proc);
726 ADIOI_Free(start_pos);
727 }
728
ADIOI_R_Exchange_data(ADIO_File fd,void * buf,ADIOI_Flatlist_node * flat_buf,ADIO_Offset * offset_list,ADIO_Offset * len_list,int * send_size,int * recv_size,int * count,int * start_pos,int * partial_send,int * recd_from_proc,int nprocs,int myrank,int buftype_is_contig,int contig_access_count,ADIO_Offset min_st_offset,ADIO_Offset fd_size,ADIO_Offset * fd_start,ADIO_Offset * fd_end,ADIOI_Access * others_req,int iter,MPI_Aint buftype_extent,int * buf_idx)729 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
730 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
731 *len_list, int *send_size, int *recv_size,
732 int *count, int *start_pos, int *partial_send,
733 int *recd_from_proc, int nprocs,
734 int myrank, int
735 buftype_is_contig, int contig_access_count,
736 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
737 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
738 ADIOI_Access *others_req,
739 int iter, MPI_Aint buftype_extent, int *buf_idx)
740 {
741 int i, j, k=0, tmp=0, nprocs_recv, nprocs_send;
742 char **recv_buf = NULL;
743 MPI_Request *requests;
744 MPI_Datatype send_type;
745 MPI_Status *statuses;
746
747 /* exchange send_size info so that each process knows how much to
748 receive from whom and how much memory to allocate. */
749
750 MPI_Alltoall(send_size, 1, MPI_INT, recv_size, 1, MPI_INT, fd->comm);
751
752 nprocs_recv = 0;
753 for (i=0; i < nprocs; i++) if (recv_size[i]) nprocs_recv++;
754
755 nprocs_send = 0;
756 for (i=0; i<nprocs; i++) if (send_size[i]) nprocs_send++;
757
758 requests = (MPI_Request *)
759 ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
760 /* +1 to avoid a 0-size malloc */
761
762 /* post recvs. if buftype_is_contig, data can be directly recd. into
763 user buf at location given by buf_idx. else use recv_buf. */
764
765 #ifdef AGGREGATION_PROFILE
766 MPE_Log_event (5032, 0, NULL);
767 #endif
768
769 if (buftype_is_contig) {
770 j = 0;
771 for (i=0; i < nprocs; i++)
772 if (recv_size[i]) {
773 MPI_Irecv(((char *) buf) + buf_idx[i], recv_size[i],
774 MPI_BYTE, i, myrank+i+100*iter, fd->comm, requests+j);
775 j++;
776 buf_idx[i] += recv_size[i];
777 }
778 }
779 else {
780 /* allocate memory for recv_buf and post receives */
781 recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char*));
782 for (i=0; i < nprocs; i++)
783 if (recv_size[i]) recv_buf[i] =
784 (char *) ADIOI_Malloc(recv_size[i]);
785
786 j = 0;
787 for (i=0; i < nprocs; i++)
788 if (recv_size[i]) {
789 MPI_Irecv(recv_buf[i], recv_size[i], MPI_BYTE, i,
790 myrank+i+100*iter, fd->comm, requests+j);
791 j++;
792 #ifdef RDCOLL_DEBUG
793 DBG_FPRINTF(stderr, "node %d, recv_size %d, tag %d \n",
794 myrank, recv_size[i], myrank+i+100*iter);
795 #endif
796 }
797 }
798
799 /* create derived datatypes and send data */
800
801 j = 0;
802 for (i=0; i<nprocs; i++) {
803 if (send_size[i]) {
804 /* take care if the last off-len pair is a partial send */
805 if (partial_send[i]) {
806 k = start_pos[i] + count[i] - 1;
807 tmp = others_req[i].lens[k];
808 others_req[i].lens[k] = partial_send[i];
809 }
810 MPI_Type_hindexed(count[i],
811 &(others_req[i].lens[start_pos[i]]),
812 &(others_req[i].mem_ptrs[start_pos[i]]),
813 MPI_BYTE, &send_type);
814 /* absolute displacement; use MPI_BOTTOM in send */
815 MPI_Type_commit(&send_type);
816 MPI_Isend(MPI_BOTTOM, 1, send_type, i, myrank+i+100*iter,
817 fd->comm, requests+nprocs_recv+j);
818 MPI_Type_free(&send_type);
819 if (partial_send[i]) others_req[i].lens[k] = tmp;
820 j++;
821 }
822 }
823
824 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \
825 sizeof(MPI_Status));
826 /* +1 to avoid a 0-size malloc */
827
828 /* wait on the receives */
829 if (nprocs_recv) {
830 #ifdef NEEDS_MPI_TEST
831 j = 0;
832 while (!j) MPI_Testall(nprocs_recv, requests, &j, statuses);
833 #else
834 MPI_Waitall(nprocs_recv, requests, statuses);
835 #endif
836
837 /* if noncontiguous, to the copies from the recv buffers */
838 if (!buftype_is_contig)
839 ADIOI_Fill_user_buffer(fd, buf, flat_buf, recv_buf,
840 offset_list, len_list, (unsigned*)recv_size,
841 requests, statuses, recd_from_proc,
842 nprocs, contig_access_count,
843 min_st_offset, fd_size, fd_start, fd_end,
844 buftype_extent);
845 }
846
847 /* wait on the sends*/
848 MPI_Waitall(nprocs_send, requests+nprocs_recv, statuses+nprocs_recv);
849
850 ADIOI_Free(statuses);
851 ADIOI_Free(requests);
852
853 if (!buftype_is_contig) {
854 for (i=0; i < nprocs; i++)
855 if (recv_size[i]) ADIOI_Free(recv_buf[i]);
856 ADIOI_Free(recv_buf);
857 }
858 #ifdef AGGREGATION_PROFILE
859 MPE_Log_event (5033, 0, NULL);
860 #endif
861 }
862
863 #define ADIOI_BUF_INCR \
864 { \
865 while (buf_incr) { \
866 size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
867 user_buf_idx += size_in_buf; \
868 flat_buf_sz -= size_in_buf; \
869 if (!flat_buf_sz) { \
870 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
871 else { \
872 flat_buf_idx = 0; \
873 n_buftypes++; \
874 } \
875 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
876 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
877 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
878 } \
879 buf_incr -= size_in_buf; \
880 } \
881 }
882
883
884 #define ADIOI_BUF_COPY \
885 { \
886 while (size) { \
887 size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
888 ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIR_Upint)(buf + user_buf_idx)); \
889 ADIOI_Assert(size_in_buf == (size_t)size_in_buf); \
890 memcpy(((char *) buf) + user_buf_idx, \
891 &(recv_buf[p][recv_buf_idx[p]]), size_in_buf); \
892 recv_buf_idx[p] += size_in_buf; /* already tested (size_t)size_in_buf*/ \
893 user_buf_idx += size_in_buf; \
894 flat_buf_sz -= size_in_buf; \
895 if (!flat_buf_sz) { \
896 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
897 else { \
898 flat_buf_idx = 0; \
899 n_buftypes++; \
900 } \
901 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
902 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
903 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
904 } \
905 size -= size_in_buf; \
906 buf_incr -= size_in_buf; \
907 } \
908 ADIOI_BUF_INCR \
909 }
910
ADIOI_Fill_user_buffer(ADIO_File fd,void * buf,ADIOI_Flatlist_node * flat_buf,char ** recv_buf,ADIO_Offset * offset_list,ADIO_Offset * len_list,unsigned * recv_size,MPI_Request * requests,MPI_Status * statuses,int * recd_from_proc,int nprocs,int contig_access_count,ADIO_Offset min_st_offset,ADIO_Offset fd_size,ADIO_Offset * fd_start,ADIO_Offset * fd_end,MPI_Aint buftype_extent)911 static void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
912 *flat_buf, char **recv_buf, ADIO_Offset
913 *offset_list, ADIO_Offset *len_list,
914 unsigned *recv_size,
915 MPI_Request *requests, MPI_Status *statuses,
916 int *recd_from_proc, int nprocs,
917 int contig_access_count,
918 ADIO_Offset min_st_offset,
919 ADIO_Offset fd_size, ADIO_Offset *fd_start,
920 ADIO_Offset *fd_end,
921 MPI_Aint buftype_extent)
922 {
923
924 /* this function is only called if buftype is not contig */
925
926 int i, p, flat_buf_idx;
927 ADIO_Offset flat_buf_sz, size_in_buf, buf_incr, size;
928 int n_buftypes;
929 ADIO_Offset off, len, rem_len, user_buf_idx;
930 /* Not sure unsigned is necessary, but it makes the math safer */
931 unsigned *curr_from_proc, *done_from_proc, *recv_buf_idx;
932
933 ADIOI_UNREFERENCED_ARG(requests);
934 ADIOI_UNREFERENCED_ARG(statuses);
935
936 /* curr_from_proc[p] = amount of data recd from proc. p that has already
937 been accounted for so far
938 done_from_proc[p] = amount of data already recd from proc. p and
939 filled into user buffer in previous iterations
940 user_buf_idx = current location in user buffer
941 recv_buf_idx[p] = current location in recv_buf of proc. p */
942 curr_from_proc = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
943 done_from_proc = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
944 recv_buf_idx = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
945
946 for (i=0; i < nprocs; i++) {
947 recv_buf_idx[i] = curr_from_proc[i] = 0;
948 done_from_proc[i] = recd_from_proc[i];
949 }
950
951 user_buf_idx = flat_buf->indices[0];
952 flat_buf_idx = 0;
953 n_buftypes = 0;
954 flat_buf_sz = flat_buf->blocklens[0];
955
956 /* flat_buf_idx = current index into flattened buftype
957 flat_buf_sz = size of current contiguous component in
958 flattened buf */
959
960 for (i=0; i<contig_access_count; i++) {
961 off = offset_list[i];
962 rem_len = len_list[i];
963
964 /* this request may span the file domains of more than one process */
965 while (rem_len > 0) {
966 len = rem_len;
967 /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no
968 * longer than the single region that processor "p" is responsible
969 * for.
970 */
971 p = ADIOI_BGL_Calc_aggregator(fd,
972 off,
973 min_st_offset,
974 &len,
975 fd_size,
976 fd_start,
977 fd_end);
978
979 if (recv_buf_idx[p] < recv_size[p]) {
980 if (curr_from_proc[p]+len > done_from_proc[p]) {
981 if (done_from_proc[p] > curr_from_proc[p]) {
982 size = ADIOI_MIN(curr_from_proc[p] + len -
983 done_from_proc[p], recv_size[p]-recv_buf_idx[p]);
984 buf_incr = done_from_proc[p] - curr_from_proc[p];
985 ADIOI_BUF_INCR
986 buf_incr = curr_from_proc[p]+len-done_from_proc[p];
987 ADIOI_Assert((done_from_proc[p] + size) == (unsigned)((ADIO_Offset)done_from_proc[p] + size));
988 curr_from_proc[p] = done_from_proc[p] + size;
989 ADIOI_BUF_COPY
990 }
991 else {
992 size = ADIOI_MIN(len,recv_size[p]-recv_buf_idx[p]);
993 buf_incr = len;
994 ADIOI_Assert((curr_from_proc[p] + size) == (unsigned)((ADIO_Offset)curr_from_proc[p] + size));
995 curr_from_proc[p] += (unsigned) size;
996 ADIOI_BUF_COPY
997 }
998 }
999 else {
1000 ADIOI_Assert((curr_from_proc[p] + len) == (unsigned)((ADIO_Offset)curr_from_proc[p] + len));
1001 curr_from_proc[p] += (unsigned) len;
1002 buf_incr = len;
1003 ADIOI_BUF_INCR
1004 }
1005 }
1006 else {
1007 buf_incr = len;
1008 ADIOI_BUF_INCR
1009 }
1010 off += len;
1011 rem_len -= len;
1012 }
1013 }
1014 for (i=0; i < nprocs; i++)
1015 if (recv_size[i]) recd_from_proc[i] = curr_from_proc[i];
1016
1017 ADIOI_Free(curr_from_proc);
1018 ADIOI_Free(done_from_proc);
1019 ADIOI_Free(recv_buf_idx);
1020 }
1021
ADIOI_R_Exchange_data_alltoallv(ADIO_File fd,void * buf,ADIOI_Flatlist_node * flat_buf,ADIO_Offset * offset_list,ADIO_Offset * len_list,int * send_size,int * recv_size,int * count,int * start_pos,int * partial_send,int * recd_from_proc,int nprocs,int myrank,int buftype_is_contig,int contig_access_count,ADIO_Offset min_st_offset,ADIO_Offset fd_size,ADIO_Offset * fd_start,ADIO_Offset * fd_end,ADIOI_Access * others_req,int iter,MPI_Aint buftype_extent,int * buf_idx)1022 static void ADIOI_R_Exchange_data_alltoallv(
1023 ADIO_File fd, void *buf, ADIOI_Flatlist_node
1024 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
1025 *len_list, int *send_size, int *recv_size,
1026 int *count, int *start_pos, int *partial_send,
1027 int *recd_from_proc, int nprocs,
1028 int myrank, int
1029 buftype_is_contig, int contig_access_count,
1030 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
1031 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
1032 ADIOI_Access *others_req,
1033 int iter, MPI_Aint buftype_extent, int *buf_idx)
1034 {
1035 int i, j, k=0, tmp=0, nprocs_recv, nprocs_send;
1036 char **recv_buf = NULL;
1037 MPI_Request *requests=NULL;
1038 MPI_Status *statuses=NULL;
1039 int rtail, stail;
1040 char *sbuf_ptr, *from_ptr;
1041 int len;
1042 int *sdispls, *rdispls;
1043 char *all_recv_buf, *all_send_buf;
1044
1045 /* exchange send_size info so that each process knows how much to
1046 receive from whom and how much memory to allocate. */
1047 MPI_Alltoall(send_size, 1, MPI_INT, recv_size, 1, MPI_INT, fd->comm);
1048
1049 nprocs_recv = 0;
1050 for (i=0; i<nprocs; i++) if (recv_size[i]) { nprocs_recv++; break; }
1051
1052 nprocs_send = 0;
1053 for (i=0; i<nprocs; i++) if (send_size[i]) { nprocs_send++; break; }
1054
1055 /* receiver side data structures */
1056 rdispls = (int *) ADIOI_Malloc( nprocs * sizeof(int) );
1057 rtail = 0;
1058 for (i=0; i<nprocs; i++) { rdispls[i] = rtail; rtail += recv_size[i]; }
1059
1060 /* data buffer */
1061 all_recv_buf = (char *) ADIOI_Malloc( rtail );
1062 recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char *));
1063 for (i=0; i<nprocs; i++) { recv_buf[i] = all_recv_buf + rdispls[i]; }
1064
1065 /* sender side data structures */
1066 sdispls = (int *) ADIOI_Malloc( nprocs * sizeof(int) );
1067 stail = 0;
1068 for (i=0; i<nprocs; i++) { sdispls[i] = stail; stail += send_size[i]; }
1069
1070 /* data buffer */
1071 all_send_buf = (char *) ADIOI_Malloc( stail );
1072 for (i=0; i<nprocs; i++)
1073 {
1074 if (send_size[i]) {
1075 if (partial_send[i]) {
1076 k = start_pos[i] + count[i] - 1;
1077 tmp = others_req[i].lens[k];
1078 others_req[i].lens[k] = partial_send[i];
1079 }
1080 sbuf_ptr = all_send_buf + sdispls[i];
1081 for (j=0; j<count[i]; j++) {
1082 ADIOI_ENSURE_AINT_FITS_IN_PTR( others_req[i].mem_ptrs[ start_pos[i]+j ]);
1083 from_ptr = (char *) ADIOI_AINT_CAST_TO_VOID_PTR ( others_req[i].mem_ptrs[ start_pos[i]+j ] );
1084 len = others_req[i].lens[ start_pos[i]+j ] ;
1085 memcpy( sbuf_ptr, from_ptr, len );
1086 sbuf_ptr += len;
1087 }
1088 if (partial_send[i]) others_req[i].lens[k] = tmp;
1089 }
1090 }
1091
1092 #if RDCOLL_DEBUG
1093 DBG_FPRINTF(stderr, "\tsend_size = [%d]%2d,",0,send_size[0]);
1094 for (i=1; i<nprocs; i++) if(send_size[i-1]!=send_size[i]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i,send_size[i] ); }
1095 DBG_FPRINTF(stderr, "\trecv_size = [%d]%2d,",0,recv_size[0]);
1096 for (i=1; i<nprocs; i++) if(recv_size[i-1]!=recv_size[i]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i,recv_size[i] ); }
1097 DBG_FPRINTF(stderr, "\tsdispls = [%d]%2d,",0,sdispls[0]);
1098 for (i=1; i<nprocs; i++) if(sdispls[i-1]!=sdispls[i]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i,sdispls [i] ); }
1099 DBG_FPRINTF(stderr, "\trdispls = [%d]%2d,",0,rdispls[0]);
1100 for (i=1; i<nprocs; i++) if(rdispls[i-1]!=rdispls[i]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i,rdispls [i] ); }
1101 DBG_FPRINTF(stderr, "\ttails = %4d, %4d\n", stail, rtail );
1102 if (nprocs_send) {
1103 DBG_FPRINTF(stderr, "\tall_send_buf = [%d]%2d,",0,all_send_buf[0]);
1104 for (i=1; i<nprocs; i++) if(all_send_buf[(i-1)*131072]!=all_send_buf[i*131072]){ DBG_FPRINTF(stderr, "\t\t[%d]%2d,", i, all_send_buf [i*131072] ); }
1105 }
1106 #endif
1107
1108 /* alltoallv */
1109 MPI_Alltoallv(
1110 all_send_buf, send_size, sdispls, MPI_BYTE,
1111 all_recv_buf, recv_size, rdispls, MPI_BYTE,
1112 fd->comm );
1113
1114 #if 0
1115 DBG_FPRINTF(stderr, "\tall_recv_buf = " );
1116 for (i=131072; i<131073; i++) { DBG_FPRINTF(stderr, "%2d,", all_recv_buf [i] ); }
1117 DBG_FPRINTF(stderr, "\n" );
1118 #endif
1119
1120 /* unpack at the receiver side */
1121 if (nprocs_recv) {
1122 if (!buftype_is_contig)
1123 ADIOI_Fill_user_buffer(fd, buf, flat_buf, recv_buf,
1124 offset_list, len_list, (unsigned*)recv_size,
1125 requests, statuses, /* never used inside */
1126 recd_from_proc,
1127 nprocs, contig_access_count,
1128 min_st_offset, fd_size, fd_start, fd_end,
1129 buftype_extent);
1130 else {
1131 rtail = 0;
1132 for (i=0; i < nprocs; i++)
1133 if (recv_size[i]) {
1134 memcpy( (char *)buf + buf_idx[i], all_recv_buf + rtail, recv_size[i] );
1135 buf_idx[i] += recv_size[i];
1136 rtail += recv_size[i];
1137 }
1138 }
1139 }
1140
1141 ADIOI_Free( all_send_buf );
1142 ADIOI_Free( all_recv_buf );
1143 ADIOI_Free( recv_buf );
1144 ADIOI_Free( sdispls );
1145 ADIOI_Free( rdispls );
1146 return;
1147 }
1148