1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
2 /*
3 *
4 * Copyright (C) 1997 University of Chicago.
5 * See COPYRIGHT notice in top-level directory.
6 */
7
8 #include "adio.h"
9 #include "adio_extern.h"
10
11 #ifdef USE_DBG_LOGGING
12 #define RDCOLL_DEBUG 1
13 #endif
14 #ifdef AGGREGATION_PROFILE
15 #include "mpe.h"
16 #endif
17
18 /* prototypes of functions used for collective reads only. */
19 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
20 datatype, int nprocs,
21 int myrank, ADIOI_Access
22 *others_req, ADIO_Offset *offset_list,
23 ADIO_Offset *len_list, int contig_access_count,
24 ADIO_Offset
25 min_st_offset, ADIO_Offset fd_size,
26 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
27 int *buf_idx, int *error_code);
28 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
29 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
30 *len_list, int *send_size, int *recv_size,
31 int *count, int *start_pos,
32 int *partial_send,
33 int *recd_from_proc, int nprocs,
34 int myrank, int
35 buftype_is_contig, int contig_access_count,
36 ADIO_Offset min_st_offset,
37 ADIO_Offset fd_size,
38 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
39 ADIOI_Access *others_req,
40 int iter,
41 MPI_Aint buftype_extent, int *buf_idx);
42 static void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
43 *flat_buf, char **recv_buf, ADIO_Offset
44 *offset_list, ADIO_Offset *len_list,
45 unsigned *recv_size,
46 MPI_Request *requests, MPI_Status *statuses,
47 int *recd_from_proc, int nprocs,
48 int contig_access_count,
49 ADIO_Offset min_st_offset,
50 ADIO_Offset fd_size, ADIO_Offset *fd_start,
51 ADIO_Offset *fd_end,
52 MPI_Aint buftype_extent);
53
54
ADIOI_GEN_ReadStridedColl(ADIO_File fd,void * buf,int count,MPI_Datatype datatype,int file_ptr_type,ADIO_Offset offset,ADIO_Status * status,int * error_code)55 void ADIOI_GEN_ReadStridedColl(ADIO_File fd, void *buf, int count,
56 MPI_Datatype datatype, int file_ptr_type,
57 ADIO_Offset offset, ADIO_Status *status, int
58 *error_code)
59 {
60 /* Uses a generalized version of the extended two-phase method described
61 in "An Extended Two-Phase Method for Accessing Sections of
62 Out-of-Core Arrays", Rajeev Thakur and Alok Choudhary,
63 Scientific Programming, (5)4:301--317, Winter 1996.
64 http://www.mcs.anl.gov/home/thakur/ext2ph.ps */
65
66 ADIOI_Access *my_req;
67 /* array of nprocs structures, one for each other process in
68 whose file domain this process's request lies */
69
70 ADIOI_Access *others_req;
71 /* array of nprocs structures, one for each other process
72 whose request lies in this process's file domain. */
73
74 int i, filetype_is_contig, nprocs, nprocs_for_coll, myrank;
75 int contig_access_count=0, interleave_count = 0, buftype_is_contig;
76 int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
77 ADIO_Offset start_offset, end_offset, orig_fp, fd_size, min_st_offset, off;
78 ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *fd_start = NULL,
79 *fd_end = NULL, *end_offsets = NULL;
80 ADIO_Offset *len_list = NULL;
81 int *buf_idx = NULL;
82
83 #ifdef HAVE_STATUS_SET_BYTES
84 int bufsize, size;
85 #endif
86
87 if (fd->hints->cb_pfr != ADIOI_HINT_DISABLE) {
88 ADIOI_IOStridedColl (fd, buf, count, ADIOI_READ, datatype,
89 file_ptr_type, offset, status, error_code);
90 return;
91 }
92
93
94 MPI_Comm_size(fd->comm, &nprocs);
95 MPI_Comm_rank(fd->comm, &myrank);
96
97 /* number of aggregators, cb_nodes, is stored in the hints */
98 nprocs_for_coll = fd->hints->cb_nodes;
99 orig_fp = fd->fp_ind;
100
101 /* only check for interleaving if cb_read isn't disabled */
102 if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
103 /* For this process's request, calculate the list of offsets and
104 lengths in the file and determine the start and end offsets. */
105
106 /* Note: end_offset points to the last byte-offset that will be accessed.
107 e.g., if start_offset=0 and 100 bytes to be read, end_offset=99*/
108
109 ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
110 &offset_list, &len_list, &start_offset,
111 &end_offset, &contig_access_count);
112
113 #ifdef RDCOLL_DEBUG
114 for (i=0; i<contig_access_count; i++) {
115 DBG_FPRINTF(stderr, "rank %d off %lld len %lld\n",
116 myrank, offset_list[i], len_list[i]);
117 }
118 #endif
119
120 /* each process communicates its start and end offsets to other
121 processes. The result is an array each of start and end offsets
122 stored in order of process rank. */
123
124 st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
125 end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs*sizeof(ADIO_Offset));
126
127 MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
128 ADIO_OFFSET, fd->comm);
129 MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
130 ADIO_OFFSET, fd->comm);
131
132 /* are the accesses of different processes interleaved? */
133 for (i=1; i<nprocs; i++)
134 if ((st_offsets[i] < end_offsets[i-1]) &&
135 (st_offsets[i] <= end_offsets[i]))
136 interleave_count++;
137 /* This is a rudimentary check for interleaving, but should suffice
138 for the moment. */
139 }
140
141 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
142
143 if (fd->hints->cb_read == ADIOI_HINT_DISABLE
144 || (!interleave_count && (fd->hints->cb_read == ADIOI_HINT_AUTO)))
145 {
146 /* don't do aggregation */
147 if (fd->hints->cb_read != ADIOI_HINT_DISABLE) {
148 ADIOI_Free(offset_list);
149 ADIOI_Free(len_list);
150 ADIOI_Free(st_offsets);
151 ADIOI_Free(end_offsets);
152 }
153
154 fd->fp_ind = orig_fp;
155 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
156
157 if (buftype_is_contig && filetype_is_contig) {
158 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
159 off = fd->disp + (fd->etype_size) * offset;
160 ADIO_ReadContig(fd, buf, count, datatype, ADIO_EXPLICIT_OFFSET,
161 off, status, error_code);
162 }
163 else ADIO_ReadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
164 0, status, error_code);
165 }
166 else ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type,
167 offset, status, error_code);
168
169 return;
170 }
171
172 /* We're going to perform aggregation of I/O. Here we call
173 * ADIOI_Calc_file_domains() to determine what processes will handle I/O
174 * to what regions. We pass nprocs_for_coll into this function; it is
175 * used to determine how many processes will perform I/O, which is also
176 * the number of regions into which the range of bytes must be divided.
177 * These regions are called "file domains", or FDs.
178 *
179 * When this function returns, fd_start, fd_end, fd_size, and
180 * min_st_offset will be filled in. fd_start holds the starting byte
181 * location for each file domain. fd_end holds the ending byte location.
182 * min_st_offset holds the minimum byte location that will be accessed.
183 *
184 * Both fd_start[] and fd_end[] are indexed by an aggregator number; this
185 * needs to be mapped to an actual rank in the communicator later.
186 *
187 */
188 ADIOI_Calc_file_domains(st_offsets, end_offsets, nprocs,
189 nprocs_for_coll, &min_st_offset,
190 &fd_start, &fd_end,
191 fd->hints->min_fdomain_size, &fd_size,
192 fd->hints->striping_unit);
193
194 /* calculate where the portions of the access requests of this process
195 * are located in terms of the file domains. this could be on the same
196 * process or on other processes. this function fills in:
197 * count_my_req_procs - number of processes (including this one) for which
198 * this process has requests in their file domain
199 * count_my_req_per_proc - count of requests for each process, indexed
200 * by rank of the process
201 * my_req[] - array of data structures describing the requests to be
202 * performed by each process (including self). indexed by rank.
203 * buf_idx[] - array of locations into which data can be directly moved;
204 * this is only valid for contiguous buffer case
205 */
206 ADIOI_Calc_my_req(fd, offset_list, len_list, contig_access_count,
207 min_st_offset, fd_start, fd_end, fd_size,
208 nprocs, &count_my_req_procs,
209 &count_my_req_per_proc, &my_req,
210 &buf_idx);
211
212 /* perform a collective communication in order to distribute the
213 * data calculated above. fills in the following:
214 * count_others_req_procs - number of processes (including this
215 * one) which have requests in this process's file domain.
216 * count_others_req_per_proc[] - number of separate contiguous
217 * requests from proc i lie in this process's file domain.
218 */
219 ADIOI_Calc_others_req(fd, count_my_req_procs,
220 count_my_req_per_proc, my_req,
221 nprocs, myrank, &count_others_req_procs,
222 &others_req);
223
224 /* my_req[] and count_my_req_per_proc aren't needed at this point, so
225 * let's free the memory
226 */
227 ADIOI_Free(count_my_req_per_proc);
228 for (i=0; i<nprocs; i++) {
229 if (my_req[i].count) {
230 ADIOI_Free(my_req[i].offsets);
231 ADIOI_Free(my_req[i].lens);
232 }
233 }
234 ADIOI_Free(my_req);
235
236
237 /* read data in sizes of no more than ADIOI_Coll_bufsize,
238 * communicate, and fill user buf.
239 */
240 ADIOI_Read_and_exch(fd, buf, datatype, nprocs, myrank,
241 others_req, offset_list,
242 len_list, contig_access_count, min_st_offset,
243 fd_size, fd_start, fd_end, buf_idx, error_code);
244
245 if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
246
247 /* free all memory allocated for collective I/O */
248 for (i=0; i<nprocs; i++) {
249 if (others_req[i].count) {
250 ADIOI_Free(others_req[i].offsets);
251 ADIOI_Free(others_req[i].lens);
252 ADIOI_Free(others_req[i].mem_ptrs);
253 }
254 }
255 ADIOI_Free(others_req);
256
257 ADIOI_Free(buf_idx);
258 ADIOI_Free(offset_list);
259 ADIOI_Free(len_list);
260 ADIOI_Free(st_offsets);
261 ADIOI_Free(end_offsets);
262 ADIOI_Free(fd_start);
263 ADIOI_Free(fd_end);
264
265 #ifdef HAVE_STATUS_SET_BYTES
266 MPI_Type_size(datatype, &size);
267 bufsize = size * count;
268 MPIR_Status_set_bytes(status, datatype, bufsize);
269 /* This is a temporary way of filling in status. The right way is to
270 keep track of how much data was actually read and placed in buf
271 during collective I/O. */
272 #endif
273
274 fd->fp_sys_posn = -1; /* set it to null. */
275 }
276
ADIOI_Calc_my_off_len(ADIO_File fd,int bufcount,MPI_Datatype datatype,int file_ptr_type,ADIO_Offset offset,ADIO_Offset ** offset_list_ptr,ADIO_Offset ** len_list_ptr,ADIO_Offset * start_offset_ptr,ADIO_Offset * end_offset_ptr,int * contig_access_count_ptr)277 void ADIOI_Calc_my_off_len(ADIO_File fd, int bufcount, MPI_Datatype
278 datatype, int file_ptr_type, ADIO_Offset
279 offset, ADIO_Offset **offset_list_ptr, ADIO_Offset
280 **len_list_ptr, ADIO_Offset *start_offset_ptr,
281 ADIO_Offset *end_offset_ptr, int
282 *contig_access_count_ptr)
283 {
284 int filetype_size, etype_size;
285 unsigned buftype_size;
286 int i, j, k;
287 ADIO_Offset i_offset;
288 ADIO_Offset frd_size=0, old_frd_size=0;
289 int st_index=0;
290 ADIO_Offset n_filetypes, etype_in_filetype;
291 ADIO_Offset abs_off_in_filetype=0;
292 ADIO_Offset bufsize;
293 ADIO_Offset sum, n_etypes_in_filetype, size_in_filetype;
294 int contig_access_count, filetype_is_contig;
295 ADIO_Offset *len_list;
296 MPI_Aint filetype_extent, filetype_lb;
297 ADIOI_Flatlist_node *flat_file;
298 ADIO_Offset *offset_list, off, end_offset=0, disp;
299
300 #ifdef AGGREGATION_PROFILE
301 MPE_Log_event (5028, 0, NULL);
302 #endif
303
304 /* For this process's request, calculate the list of offsets and
305 lengths in the file and determine the start and end offsets. */
306
307 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
308
309 MPI_Type_size(fd->filetype, &filetype_size);
310 MPI_Type_extent(fd->filetype, &filetype_extent);
311 MPI_Type_lb(fd->filetype, &filetype_lb);
312 MPI_Type_size(datatype, (int*)&buftype_size);
313 etype_size = fd->etype_size;
314
315 if ( ! filetype_size ) {
316 *contig_access_count_ptr = 0;
317 *offset_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
318 *len_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
319 /* 2 is for consistency. everywhere I malloc one more than needed */
320
321 offset_list = *offset_list_ptr;
322 len_list = *len_list_ptr;
323 offset_list[0] = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind :
324 fd->disp + (ADIO_Offset)etype_size * offset;
325 len_list[0] = 0;
326 *start_offset_ptr = offset_list[0];
327 *end_offset_ptr = offset_list[0] + len_list[0] - 1;
328
329 return;
330 }
331
332 if (filetype_is_contig) {
333 *contig_access_count_ptr = 1;
334 *offset_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
335 *len_list_ptr = (ADIO_Offset *) ADIOI_Malloc(2*sizeof(ADIO_Offset));
336 /* 2 is for consistency. everywhere I malloc one more than needed */
337
338 offset_list = *offset_list_ptr;
339 len_list = *len_list_ptr;
340 offset_list[0] = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind :
341 fd->disp + (ADIO_Offset)etype_size * offset;
342 len_list[0] = (ADIO_Offset)bufcount * (ADIO_Offset)buftype_size;
343 *start_offset_ptr = offset_list[0];
344 *end_offset_ptr = offset_list[0] + len_list[0] - 1;
345
346 /* update file pointer */
347 if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = *end_offset_ptr + 1;
348 }
349
350 else {
351
352 /* First calculate what size of offset_list and len_list to allocate */
353
354 /* filetype already flattened in ADIO_Open or ADIO_Fcntl */
355 flat_file = ADIOI_Flatlist;
356 while (flat_file->type != fd->filetype) flat_file = flat_file->next;
357 disp = fd->disp;
358
359 #ifdef RDCOLL_DEBUG
360 {
361 int ii;
362 DBG_FPRINTF(stderr, "flattened %3d : ", flat_file->count );
363 for (ii=0; ii<flat_file->count; ii++) {
364 DBG_FPRINTF(stderr, "%16qd:%-16qd", flat_file->indices[ii], flat_file->blocklens[ii] );
365 }
366 DBG_FPRINTF(stderr, "\n" );
367 }
368 #endif
369 if (file_ptr_type == ADIO_INDIVIDUAL) {
370 /* Wei-keng reworked type processing to be a bit more efficient */
371 offset = fd->fp_ind - disp;
372 n_filetypes = (offset - flat_file->indices[0]) / filetype_extent;
373 offset -= (ADIO_Offset)n_filetypes * filetype_extent;
374 /* now offset is local to this extent */
375
376 /* find the block where offset is located, skip blocklens[i]==0 */
377 for (i=0; i<flat_file->count; i++) {
378 ADIO_Offset dist;
379 if (flat_file->blocklens[i] == 0) continue;
380 dist = flat_file->indices[i] + flat_file->blocklens[i] - offset;
381 /* frd_size is from offset to the end of block i */
382 if (dist == 0) {
383 i++;
384 offset = flat_file->indices[i];
385 frd_size = flat_file->blocklens[i];
386 break;
387 }
388 if (dist > 0) {
389 frd_size = dist;
390 break;
391 }
392 }
393 st_index = i; /* starting index in flat_file->indices[] */
394 offset += disp + (ADIO_Offset)n_filetypes*filetype_extent;
395 }
396 else {
397 n_etypes_in_filetype = filetype_size/etype_size;
398 n_filetypes = offset / n_etypes_in_filetype;
399 etype_in_filetype = offset % n_etypes_in_filetype;
400 size_in_filetype = etype_in_filetype * etype_size;
401
402 sum = 0;
403 for (i=0; i<flat_file->count; i++) {
404 sum += flat_file->blocklens[i];
405 if (sum > size_in_filetype) {
406 st_index = i;
407 frd_size = sum - size_in_filetype;
408 abs_off_in_filetype = flat_file->indices[i] +
409 size_in_filetype - (sum - flat_file->blocklens[i]);
410 break;
411 }
412 }
413
414 /* abs. offset in bytes in the file */
415 offset = disp + n_filetypes* (ADIO_Offset)filetype_extent +
416 abs_off_in_filetype;
417 }
418
419 /* calculate how much space to allocate for offset_list, len_list */
420
421 old_frd_size = frd_size;
422 contig_access_count = i_offset = 0;
423 j = st_index;
424 bufsize = (ADIO_Offset)buftype_size * (ADIO_Offset)bufcount;
425 frd_size = ADIOI_MIN(frd_size, bufsize);
426 while (i_offset < bufsize) {
427 if (frd_size) contig_access_count++;
428 i_offset += frd_size;
429 j = (j + 1) % flat_file->count;
430 frd_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i_offset);
431 }
432
433 /* allocate space for offset_list and len_list */
434
435 *offset_list_ptr = (ADIO_Offset *)
436 ADIOI_Malloc((contig_access_count+1)*sizeof(ADIO_Offset));
437 *len_list_ptr = (ADIO_Offset *) ADIOI_Malloc((contig_access_count+1)*sizeof(ADIO_Offset));
438 /* +1 to avoid a 0-size malloc */
439
440 offset_list = *offset_list_ptr;
441 len_list = *len_list_ptr;
442
443 /* find start offset, end offset, and fill in offset_list and len_list */
444
445 *start_offset_ptr = offset; /* calculated above */
446
447 i_offset = k = 0;
448 j = st_index;
449 off = offset;
450 frd_size = ADIOI_MIN(old_frd_size, bufsize);
451 while (i_offset < bufsize) {
452 if (frd_size) {
453 offset_list[k] = off;
454 len_list[k] = frd_size;
455 k++;
456 }
457 i_offset += frd_size;
458 end_offset = off + frd_size - 1;
459
460 /* Note: end_offset points to the last byte-offset that will be accessed.
461 e.g., if start_offset=0 and 100 bytes to be read, end_offset=99*/
462
463 if (off + frd_size < disp + flat_file->indices[j] +
464 flat_file->blocklens[j] +
465 n_filetypes* (ADIO_Offset)filetype_extent)
466 {
467 off += frd_size;
468 /* did not reach end of contiguous block in filetype.
469 * no more I/O needed. off is incremented by frd_size.
470 */
471 }
472 else {
473 j = (j+1) % flat_file->count;
474 n_filetypes += (j == 0) ? 1 : 0;
475 while (flat_file->blocklens[j]==0) {
476 j = (j+1) % flat_file->count;
477 n_filetypes += (j == 0) ? 1 : 0;
478 /* hit end of flattened filetype; start at beginning
479 * again */
480 }
481 off = disp + flat_file->indices[j] +
482 n_filetypes* (ADIO_Offset)filetype_extent;
483 frd_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i_offset);
484 }
485 }
486
487 /* update file pointer */
488 if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = off;
489
490 *contig_access_count_ptr = contig_access_count;
491 *end_offset_ptr = end_offset;
492 }
493 #ifdef AGGREGATION_PROFILE
494 MPE_Log_event (5029, 0, NULL);
495 #endif
496 }
497
ADIOI_Read_and_exch(ADIO_File fd,void * buf,MPI_Datatype datatype,int nprocs,int myrank,ADIOI_Access * others_req,ADIO_Offset * offset_list,ADIO_Offset * len_list,int contig_access_count,ADIO_Offset min_st_offset,ADIO_Offset fd_size,ADIO_Offset * fd_start,ADIO_Offset * fd_end,int * buf_idx,int * error_code)498 static void ADIOI_Read_and_exch(ADIO_File fd, void *buf, MPI_Datatype
499 datatype, int nprocs,
500 int myrank, ADIOI_Access
501 *others_req, ADIO_Offset *offset_list,
502 ADIO_Offset *len_list, int contig_access_count, ADIO_Offset
503 min_st_offset, ADIO_Offset fd_size,
504 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
505 int *buf_idx, int *error_code)
506 {
507 /* Read in sizes of no more than coll_bufsize, an info parameter.
508 Send data to appropriate processes.
509 Place recd. data in user buf.
510 The idea is to reduce the amount of extra memory required for
511 collective I/O. If all data were read all at once, which is much
512 easier, it would require temp space more than the size of user_buf,
513 which is often unacceptable. For example, to read a distributed
514 array from a file, where each local array is 8Mbytes, requiring
515 at least another 8Mbytes of temp space is unacceptable. */
516
517 int i, j, m, ntimes, max_ntimes, buftype_is_contig;
518 ADIO_Offset st_loc=-1, end_loc=-1, off, done, real_off, req_off;
519 char *read_buf = NULL, *tmp_buf;
520 int *curr_offlen_ptr, *count, *send_size, *recv_size;
521 int *partial_send, *recd_from_proc, *start_pos;
522 /* Not convinced end_loc-st_loc couldn't be > int, so make these offsets*/
523 ADIO_Offset real_size, size, for_curr_iter, for_next_iter;
524 int req_len, flag, rank;
525 MPI_Status status;
526 ADIOI_Flatlist_node *flat_buf=NULL;
527 MPI_Aint buftype_extent;
528 int coll_bufsize;
529
530 *error_code = MPI_SUCCESS; /* changed below if error */
531 /* only I/O errors are currently reported */
532
533 /* calculate the number of reads of size coll_bufsize
534 to be done by each process and the max among all processes.
535 That gives the no. of communication phases as well.
536 coll_bufsize is obtained from the hints object. */
537
538 coll_bufsize = fd->hints->cb_buffer_size;
539
540 /* grab some initial values for st_loc and end_loc */
541 for (i=0; i < nprocs; i++) {
542 if (others_req[i].count) {
543 st_loc = others_req[i].offsets[0];
544 end_loc = others_req[i].offsets[0];
545 break;
546 }
547 }
548
549 /* now find the real values */
550 for (i=0; i < nprocs; i++)
551 for (j=0; j<others_req[i].count; j++) {
552 st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
553 end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j]
554 + others_req[i].lens[j] - 1));
555 }
556
557 /* calculate ntimes, the number of times this process must perform I/O
558 * operations in order to complete all the requests it has received.
559 * the need for multiple I/O operations comes from the restriction that
560 * we only use coll_bufsize bytes of memory for internal buffering.
561 */
562 if ((st_loc==-1) && (end_loc==-1)) {
563 /* this process does no I/O. */
564 ntimes = 0;
565 }
566 else {
567 /* ntimes=ceiling_div(end_loc - st_loc + 1, coll_bufsize)*/
568 ntimes = (int) ((end_loc - st_loc + coll_bufsize)/coll_bufsize);
569 }
570
571 MPI_Allreduce(&ntimes, &max_ntimes, 1, MPI_INT, MPI_MAX, fd->comm);
572
573 if (ntimes) read_buf = (char *) ADIOI_Malloc(coll_bufsize);
574
575 curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
576 /* its use is explained below. calloc initializes to 0. */
577
578 count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
579 /* to store count of how many off-len pairs per proc are satisfied
580 in an iteration. */
581
582 partial_send = (int *) ADIOI_Calloc(nprocs, sizeof(int));
583 /* if only a portion of the last off-len pair is sent to a process
584 in a particular iteration, the length sent is stored here.
585 calloc initializes to 0. */
586
587 send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
588 /* total size of data to be sent to each proc. in an iteration */
589
590 recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
591 /* total size of data to be recd. from each proc. in an iteration.
592 Of size nprocs so that I can use MPI_Alltoall later. */
593
594 recd_from_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
595 /* amount of data recd. so far from each proc. Used in
596 ADIOI_Fill_user_buffer. initialized to 0 here. */
597
598 start_pos = (int *) ADIOI_Malloc(nprocs*sizeof(int));
599 /* used to store the starting value of curr_offlen_ptr[i] in
600 this iteration */
601
602 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
603 if (!buftype_is_contig) {
604 ADIOI_Flatten_datatype(datatype);
605 flat_buf = ADIOI_Flatlist;
606 while (flat_buf->type != datatype) flat_buf = flat_buf->next;
607 }
608 MPI_Type_extent(datatype, &buftype_extent);
609
610 done = 0;
611 off = st_loc;
612 for_curr_iter = for_next_iter = 0;
613
614 MPI_Comm_rank(fd->comm, &rank);
615
616 for (m=0; m<ntimes; m++) {
617 /* read buf of size coll_bufsize (or less) */
618 /* go through all others_req and check if any are satisfied
619 by the current read */
620
621 /* since MPI guarantees that displacements in filetypes are in
622 monotonically nondecreasing order, I can maintain a pointer
623 (curr_offlen_ptr) to
624 current off-len pair for each process in others_req and scan
625 further only from there. There is still a problem of filetypes
626 such as: (1, 2, 3 are not process nos. They are just numbers for
627 three chunks of data, specified by a filetype.)
628
629 1 -------!--
630 2 -----!----
631 3 --!-----
632
633 where ! indicates where the current read_size limitation cuts
634 through the filetype. I resolve this by reading up to !, but
635 filling the communication buffer only for 1. I copy the portion
636 left over for 2 into a tmp_buf for use in the next
637 iteration. i.e., 2 and 3 will be satisfied in the next
638 iteration. This simplifies filling in the user's buf at the
639 other end, as only one off-len pair with incomplete data
640 will be sent. I also don't need to send the individual
641 offsets and lens along with the data, as the data is being
642 sent in a particular order. */
643
644 /* off = start offset in the file for the data actually read in
645 this iteration
646 size = size of data read corresponding to off
647 real_off = off minus whatever data was retained in memory from
648 previous iteration for cases like 2, 3 illustrated above
649 real_size = size plus the extra corresponding to real_off
650 req_off = off in file for a particular contiguous request
651 minus what was satisfied in previous iteration
652 req_size = size corresponding to req_off */
653
654 size = ADIOI_MIN((unsigned)coll_bufsize, end_loc-st_loc+1-done);
655 real_off = off - for_curr_iter;
656 real_size = size + for_curr_iter;
657
658 for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
659 for_next_iter = 0;
660
661 for (i=0; i<nprocs; i++) {
662 #ifdef RDCOLL_DEBUG
663 DBG_FPRINTF(stderr, "rank %d, i %d, others_count %d\n", rank, i, others_req[i].count);
664 #endif
665 if (others_req[i].count) {
666 start_pos[i] = curr_offlen_ptr[i];
667 for (j=curr_offlen_ptr[i]; j<others_req[i].count;
668 j++) {
669 if (partial_send[i]) {
670 /* this request may have been partially
671 satisfied in the previous iteration. */
672 req_off = others_req[i].offsets[j] +
673 partial_send[i];
674 req_len = others_req[i].lens[j] -
675 partial_send[i];
676 partial_send[i] = 0;
677 /* modify the off-len pair to reflect this change */
678 others_req[i].offsets[j] = req_off;
679 others_req[i].lens[j] = req_len;
680 }
681 else {
682 req_off = others_req[i].offsets[j];
683 req_len = others_req[i].lens[j];
684 }
685 if (req_off < real_off + real_size) {
686 count[i]++;
687 ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)read_buf)+req_off-real_off) == (ADIO_Offset)(MPIR_Upint)(read_buf+req_off-real_off));
688 MPI_Address(read_buf+req_off-real_off,
689 &(others_req[i].mem_ptrs[j]));
690 ADIOI_Assert((real_off + real_size - req_off) == (int)(real_off + real_size - req_off));
691 send_size[i] += (int)(ADIOI_MIN(real_off + real_size - req_off,
692 (ADIO_Offset)(unsigned)req_len));
693
694 if (real_off+real_size-req_off < (ADIO_Offset)(unsigned)req_len) {
695 partial_send[i] = (int) (real_off + real_size - req_off);
696 if ((j+1 < others_req[i].count) &&
697 (others_req[i].offsets[j+1] <
698 real_off+real_size)) {
699 /* this is the case illustrated in the
700 figure above. */
701 for_next_iter = ADIOI_MAX(for_next_iter,
702 real_off + real_size - others_req[i].offsets[j+1]);
703 /* max because it must cover requests
704 from different processes */
705 }
706 break;
707 }
708 }
709 else break;
710 }
711 curr_offlen_ptr[i] = j;
712 }
713 }
714
715 flag = 0;
716 for (i=0; i<nprocs; i++)
717 if (count[i]) flag = 1;
718
719 if (flag) {
720 ADIOI_Assert(size == (int)size);
721 ADIO_ReadContig(fd, read_buf+for_curr_iter, (int)size, MPI_BYTE,
722 ADIO_EXPLICIT_OFFSET, off, &status, error_code);
723 if (*error_code != MPI_SUCCESS) return;
724 }
725
726 for_curr_iter = for_next_iter;
727
728 ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
729 send_size, recv_size, count,
730 start_pos, partial_send, recd_from_proc, nprocs,
731 myrank,
732 buftype_is_contig, contig_access_count,
733 min_st_offset, fd_size, fd_start, fd_end,
734 others_req,
735 m, buftype_extent, buf_idx);
736
737
738 if (for_next_iter) {
739 tmp_buf = (char *) ADIOI_Malloc(for_next_iter);
740 ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)read_buf)+real_size-for_next_iter) == (ADIO_Offset)(MPIR_Upint)(read_buf+real_size-for_next_iter));
741 ADIOI_Assert((for_next_iter+coll_bufsize) == (size_t)(for_next_iter+coll_bufsize));
742 memcpy(tmp_buf, read_buf+real_size-for_next_iter, for_next_iter);
743 ADIOI_Free(read_buf);
744 read_buf = (char *) ADIOI_Malloc(for_next_iter+coll_bufsize);
745 memcpy(read_buf, tmp_buf, for_next_iter);
746 ADIOI_Free(tmp_buf);
747 }
748
749 off += size;
750 done += size;
751 }
752
753 for (i=0; i<nprocs; i++) count[i] = send_size[i] = 0;
754 for (m=ntimes; m<max_ntimes; m++)
755 /* nothing to send, but check for recv. */
756 ADIOI_R_Exchange_data(fd, buf, flat_buf, offset_list, len_list,
757 send_size, recv_size, count,
758 start_pos, partial_send, recd_from_proc, nprocs,
759 myrank,
760 buftype_is_contig, contig_access_count,
761 min_st_offset, fd_size, fd_start, fd_end,
762 others_req, m,
763 buftype_extent, buf_idx);
764
765 if (ntimes) ADIOI_Free(read_buf);
766 ADIOI_Free(curr_offlen_ptr);
767 ADIOI_Free(count);
768 ADIOI_Free(partial_send);
769 ADIOI_Free(send_size);
770 ADIOI_Free(recv_size);
771 ADIOI_Free(recd_from_proc);
772 ADIOI_Free(start_pos);
773 }
774
ADIOI_R_Exchange_data(ADIO_File fd,void * buf,ADIOI_Flatlist_node * flat_buf,ADIO_Offset * offset_list,ADIO_Offset * len_list,int * send_size,int * recv_size,int * count,int * start_pos,int * partial_send,int * recd_from_proc,int nprocs,int myrank,int buftype_is_contig,int contig_access_count,ADIO_Offset min_st_offset,ADIO_Offset fd_size,ADIO_Offset * fd_start,ADIO_Offset * fd_end,ADIOI_Access * others_req,int iter,MPI_Aint buftype_extent,int * buf_idx)775 static void ADIOI_R_Exchange_data(ADIO_File fd, void *buf, ADIOI_Flatlist_node
776 *flat_buf, ADIO_Offset *offset_list, ADIO_Offset
777 *len_list, int *send_size, int *recv_size,
778 int *count, int *start_pos, int *partial_send,
779 int *recd_from_proc, int nprocs,
780 int myrank, int
781 buftype_is_contig, int contig_access_count,
782 ADIO_Offset min_st_offset, ADIO_Offset fd_size,
783 ADIO_Offset *fd_start, ADIO_Offset *fd_end,
784 ADIOI_Access *others_req,
785 int iter, MPI_Aint buftype_extent, int *buf_idx)
786 {
787 int i, j, k=0, tmp=0, nprocs_recv, nprocs_send;
788 char **recv_buf = NULL;
789 MPI_Request *requests;
790 MPI_Datatype send_type;
791 MPI_Status *statuses;
792
793 /* exchange send_size info so that each process knows how much to
794 receive from whom and how much memory to allocate. */
795
796 MPI_Alltoall(send_size, 1, MPI_INT, recv_size, 1, MPI_INT, fd->comm);
797
798 nprocs_recv = 0;
799 for (i=0; i < nprocs; i++) if (recv_size[i]) nprocs_recv++;
800
801 nprocs_send = 0;
802 for (i=0; i<nprocs; i++) if (send_size[i]) nprocs_send++;
803
804 requests = (MPI_Request *)
805 ADIOI_Malloc((nprocs_send+nprocs_recv+1)*sizeof(MPI_Request));
806 /* +1 to avoid a 0-size malloc */
807
808 /* post recvs. if buftype_is_contig, data can be directly recd. into
809 user buf at location given by buf_idx. else use recv_buf. */
810
811 #ifdef AGGREGATION_PROFILE
812 MPE_Log_event (5032, 0, NULL);
813 #endif
814
815 if (buftype_is_contig) {
816 j = 0;
817 for (i=0; i < nprocs; i++)
818 if (recv_size[i]) {
819 MPI_Irecv(((char *) buf) + buf_idx[i], recv_size[i],
820 MPI_BYTE, i, myrank+i+100*iter, fd->comm, requests+j);
821 j++;
822 buf_idx[i] += recv_size[i];
823 }
824 }
825 else {
826 /* allocate memory for recv_buf and post receives */
827 recv_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char*));
828 for (i=0; i < nprocs; i++)
829 if (recv_size[i]) recv_buf[i] =
830 (char *) ADIOI_Malloc(recv_size[i]);
831
832 j = 0;
833 for (i=0; i < nprocs; i++)
834 if (recv_size[i]) {
835 MPI_Irecv(recv_buf[i], recv_size[i], MPI_BYTE, i,
836 myrank+i+100*iter, fd->comm, requests+j);
837 j++;
838 #ifdef RDCOLL_DEBUG
839 DBG_FPRINTF(stderr, "node %d, recv_size %d, tag %d \n",
840 myrank, recv_size[i], myrank+i+100*iter);
841 #endif
842 }
843 }
844
845 /* create derived datatypes and send data */
846
847 j = 0;
848 for (i=0; i<nprocs; i++) {
849 if (send_size[i]) {
850 /* take care if the last off-len pair is a partial send */
851 if (partial_send[i]) {
852 k = start_pos[i] + count[i] - 1;
853 tmp = others_req[i].lens[k];
854 others_req[i].lens[k] = partial_send[i];
855 }
856 MPI_Type_hindexed(count[i],
857 &(others_req[i].lens[start_pos[i]]),
858 &(others_req[i].mem_ptrs[start_pos[i]]),
859 MPI_BYTE, &send_type);
860 /* absolute displacement; use MPI_BOTTOM in send */
861 MPI_Type_commit(&send_type);
862 MPI_Isend(MPI_BOTTOM, 1, send_type, i, myrank+i+100*iter,
863 fd->comm, requests+nprocs_recv+j);
864 MPI_Type_free(&send_type);
865 if (partial_send[i]) others_req[i].lens[k] = tmp;
866 j++;
867 }
868 }
869
870 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send+nprocs_recv+1) * \
871 sizeof(MPI_Status));
872 /* +1 to avoid a 0-size malloc */
873
874 /* wait on the receives */
875 if (nprocs_recv) {
876 #ifdef NEEDS_MPI_TEST
877 j = 0;
878 while (!j) MPI_Testall(nprocs_recv, requests, &j, statuses);
879 #else
880 MPI_Waitall(nprocs_recv, requests, statuses);
881 #endif
882
883 /* if noncontiguous, to the copies from the recv buffers */
884 if (!buftype_is_contig)
885 ADIOI_Fill_user_buffer(fd, buf, flat_buf, recv_buf,
886 offset_list, len_list, (unsigned*)recv_size,
887 requests, statuses, recd_from_proc,
888 nprocs, contig_access_count,
889 min_st_offset, fd_size, fd_start, fd_end,
890 buftype_extent);
891 }
892
893 /* wait on the sends*/
894 MPI_Waitall(nprocs_send, requests+nprocs_recv, statuses+nprocs_recv);
895
896 ADIOI_Free(statuses);
897 ADIOI_Free(requests);
898
899 if (!buftype_is_contig) {
900 for (i=0; i < nprocs; i++)
901 if (recv_size[i]) ADIOI_Free(recv_buf[i]);
902 ADIOI_Free(recv_buf);
903 }
904 #ifdef AGGREGATION_PROFILE
905 MPE_Log_event (5033, 0, NULL);
906 #endif
907 }
908
909 #define ADIOI_BUF_INCR \
910 { \
911 while (buf_incr) { \
912 size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
913 user_buf_idx += size_in_buf; \
914 flat_buf_sz -= size_in_buf; \
915 if (!flat_buf_sz) { \
916 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
917 else { \
918 flat_buf_idx = 0; \
919 n_buftypes++; \
920 } \
921 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
922 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
923 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
924 } \
925 buf_incr -= size_in_buf; \
926 } \
927 }
928
929
930 #define ADIOI_BUF_COPY \
931 { \
932 while (size) { \
933 size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
934 ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIR_Upint)((MPIR_Upint)buf + user_buf_idx)); \
935 ADIOI_Assert(size_in_buf == (size_t)size_in_buf); \
936 memcpy(((char *) buf) + user_buf_idx, \
937 &(recv_buf[p][recv_buf_idx[p]]), size_in_buf); \
938 recv_buf_idx[p] += size_in_buf; /* already tested (size_t)size_in_buf*/ \
939 user_buf_idx += size_in_buf; \
940 flat_buf_sz -= size_in_buf; \
941 if (!flat_buf_sz) { \
942 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
943 else { \
944 flat_buf_idx = 0; \
945 n_buftypes++; \
946 } \
947 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
948 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
949 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
950 } \
951 size -= size_in_buf; \
952 buf_incr -= size_in_buf; \
953 } \
954 ADIOI_BUF_INCR \
955 }
956
ADIOI_Fill_user_buffer(ADIO_File fd,void * buf,ADIOI_Flatlist_node * flat_buf,char ** recv_buf,ADIO_Offset * offset_list,ADIO_Offset * len_list,unsigned * recv_size,MPI_Request * requests,MPI_Status * statuses,int * recd_from_proc,int nprocs,int contig_access_count,ADIO_Offset min_st_offset,ADIO_Offset fd_size,ADIO_Offset * fd_start,ADIO_Offset * fd_end,MPI_Aint buftype_extent)957 static void ADIOI_Fill_user_buffer(ADIO_File fd, void *buf, ADIOI_Flatlist_node
958 *flat_buf, char **recv_buf, ADIO_Offset
959 *offset_list, ADIO_Offset *len_list,
960 unsigned *recv_size,
961 MPI_Request *requests, MPI_Status *statuses,
962 int *recd_from_proc, int nprocs,
963 int contig_access_count,
964 ADIO_Offset min_st_offset,
965 ADIO_Offset fd_size, ADIO_Offset *fd_start,
966 ADIO_Offset *fd_end,
967 MPI_Aint buftype_extent)
968 {
969
970 /* this function is only called if buftype is not contig */
971
972 int i, p, flat_buf_idx;
973 ADIO_Offset flat_buf_sz, size_in_buf, buf_incr, size;
974 int n_buftypes;
975 ADIO_Offset off, len, rem_len, user_buf_idx;
976 /* Not sure unsigned is necessary, but it makes the math safer */
977 unsigned *curr_from_proc, *done_from_proc, *recv_buf_idx;
978
979 ADIOI_UNREFERENCED_ARG(requests);
980 ADIOI_UNREFERENCED_ARG(statuses);
981
982 /* curr_from_proc[p] = amount of data recd from proc. p that has already
983 been accounted for so far
984 done_from_proc[p] = amount of data already recd from proc. p and
985 filled into user buffer in previous iterations
986 user_buf_idx = current location in user buffer
987 recv_buf_idx[p] = current location in recv_buf of proc. p */
988 curr_from_proc = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
989 done_from_proc = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
990 recv_buf_idx = (unsigned *) ADIOI_Malloc(nprocs * sizeof(unsigned));
991
992 for (i=0; i < nprocs; i++) {
993 recv_buf_idx[i] = curr_from_proc[i] = 0;
994 done_from_proc[i] = recd_from_proc[i];
995 }
996
997 user_buf_idx = flat_buf->indices[0];
998 flat_buf_idx = 0;
999 n_buftypes = 0;
1000 flat_buf_sz = flat_buf->blocklens[0];
1001
1002 /* flat_buf_idx = current index into flattened buftype
1003 flat_buf_sz = size of current contiguous component in
1004 flattened buf */
1005
1006 for (i=0; i<contig_access_count; i++) {
1007 off = offset_list[i];
1008 rem_len = len_list[i];
1009
1010 /* this request may span the file domains of more than one process */
1011 while (rem_len != 0) {
1012 len = rem_len;
1013 /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no
1014 * longer than the single region that processor "p" is responsible
1015 * for.
1016 */
1017 p = ADIOI_Calc_aggregator(fd,
1018 off,
1019 min_st_offset,
1020 &len,
1021 fd_size,
1022 fd_start,
1023 fd_end);
1024
1025 if (recv_buf_idx[p] < recv_size[p]) {
1026 if (curr_from_proc[p]+len > done_from_proc[p]) {
1027 if (done_from_proc[p] > curr_from_proc[p]) {
1028 size = ADIOI_MIN(curr_from_proc[p] + len -
1029 done_from_proc[p], recv_size[p]-recv_buf_idx[p]);
1030 buf_incr = done_from_proc[p] - curr_from_proc[p];
1031 ADIOI_BUF_INCR
1032 buf_incr = curr_from_proc[p]+len-done_from_proc[p];
1033 ADIOI_Assert((done_from_proc[p] + size) == (unsigned)((ADIO_Offset)done_from_proc[p] + size));
1034 curr_from_proc[p] = done_from_proc[p] + size;
1035 ADIOI_BUF_COPY
1036 }
1037 else {
1038 size = ADIOI_MIN(len,recv_size[p]-recv_buf_idx[p]);
1039 buf_incr = len;
1040 ADIOI_Assert((curr_from_proc[p] + size) == (unsigned)((ADIO_Offset)curr_from_proc[p] + size));
1041 curr_from_proc[p] += (unsigned) size;
1042 ADIOI_BUF_COPY
1043 }
1044 }
1045 else {
1046 ADIOI_Assert((curr_from_proc[p] + len) == (unsigned)((ADIO_Offset)curr_from_proc[p] + len));
1047 curr_from_proc[p] += (unsigned) len;
1048 buf_incr = len;
1049 ADIOI_BUF_INCR
1050 }
1051 }
1052 else {
1053 buf_incr = len;
1054 ADIOI_BUF_INCR
1055 }
1056 off += len;
1057 rem_len -= len;
1058 }
1059 }
1060 for (i=0; i < nprocs; i++)
1061 if (recv_size[i]) recd_from_proc[i] = curr_from_proc[i];
1062
1063 ADIOI_Free(curr_from_proc);
1064 ADIOI_Free(done_from_proc);
1065 ADIOI_Free(recv_buf_idx);
1066 }
1067