1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*-
2  * vim: ts=8 sts=4 sw=4 noexpandtab
3  *
4  *   Copyright (C) 2008 University of Chicago.
5  *   See COPYRIGHT notice in top-level directory.
6  */
7 
8 #include "adio.h"
9 #include "adio_extern.h"
10 #include "ad_zoidfs.h"
11 
12 #include "ad_zoidfs_common.h"
13 
14 /* Copied from ADIOI_PVFS2_OldReadStrided.  It would be good to have fewer
15  * copies of this code... */
ADIOI_ZOIDFS_ReadStrided(ADIO_File fd,void * buf,int count,MPI_Datatype datatype,int file_ptr_type,ADIO_Offset offset,ADIO_Status * status,int * error_code)16 void ADIOI_ZOIDFS_ReadStrided(ADIO_File fd, void *buf, int count,
17 			     MPI_Datatype datatype, int file_ptr_type,
18 			     ADIO_Offset offset, ADIO_Status *status, int
19 			     *error_code)
20 {
21     /* offset is in units of etype relative to the filetype. */
22     ADIOI_Flatlist_node *flat_buf, *flat_file;
23     int i, j, k,  brd_size, frd_size=0, st_index=0;
24     int sum, n_etypes_in_filetype, size_in_filetype;
25     MPI_Count bufsize;
26     int n_filetypes, etype_in_filetype;
27     ADIO_Offset abs_off_in_filetype=0;
28     MPI_Count filetype_size, etype_size, buftype_size;
29     MPI_Aint filetype_extent, buftype_extent, filetype_lb, buftype_lb;
30     int buf_count, buftype_is_contig, filetype_is_contig;
31     ADIO_Offset off, disp, start_off, initial_off;
32     int flag, st_frd_size, st_n_filetypes;
33 
34     size_t mem_list_count, file_list_count;
35     void ** mem_offsets;
36     uint64_t *file_offsets;
37     size_t *mem_lengths;
38     uint64_t *file_lengths;
39     int total_blks_to_read;
40 
41     int max_mem_list, max_file_list;
42 
43     int b_blks_read;
44     int f_data_read;
45     int size_read=0, n_read_lists, extra_blks;
46 
47     int end_brd_size, end_frd_size;
48     int start_k, start_j, new_file_read, new_buffer_read;
49     int start_mem_offset;
50     ADIOI_ZOIDFS_object * zoidfs_obj_ptr;
51     int err_flag=0;
52     MPI_Offset total_bytes_read = 0;
53     static char myname[] = "ADIOI_ZOIDFS_ReadStrided";
54 
55     /* note: I don't know what zoidfs will do if you pass it a super-long list,
56      * so let's keep with the PVFS limit for now */
57 #define MAX_ARRAY_SIZE 64
58 
59     *error_code = MPI_SUCCESS;  /* changed below if error */
60 
61     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
62     ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
63 
64     /* the HDF5 tests showed a bug in this list processing code (see many many
65      * lines down below).  We added a workaround, but common HDF5 file types
66      * are actually contiguous and do not need the expensive workarond */
67     if (!filetype_is_contig) {
68 	flat_file = ADIOI_Flatlist;
69 	while (flat_file->type != fd->filetype) flat_file = flat_file->next;
70 	if (flat_file->count == 1 && !buftype_is_contig)
71 	    filetype_is_contig = 1;
72     }
73 
74     MPI_Type_size_x(fd->filetype, &filetype_size);
75     if ( ! filetype_size ) {
76 #ifdef HAVE_STATUS_SET_BYTES
77 	MPIR_Status_set_bytes(status, datatype, 0);
78 #endif
79 	*error_code = MPI_SUCCESS;
80 	return;
81     }
82 
83     MPI_Type_get_extent(fd->filetype, &filetype_lb, &filetype_extent);
84     MPI_Type_size_x(datatype, &buftype_size);
85     MPI_Type_get_extent(datatype, &buftype_lb, &buftype_extent);
86     etype_size = fd->etype_size;
87 
88     bufsize = buftype_size * count;
89 
90     zoidfs_obj_ptr = (ADIOI_ZOIDFS_object *)fd->fs_ptr;
91 
92     if (!buftype_is_contig && filetype_is_contig) {
93 
94 /* noncontiguous in memory, contiguous in file. */
95         uint64_t file_offsets;
96 	uint64_t file_lengths;
97 
98 	flat_buf = ADIOI_Flatten_and_find(datatype);
99 
100 	off = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind :
101 	    fd->disp + etype_size * offset;
102 
103 	file_list_count = 1;
104 	file_offsets = off;
105 	file_lengths = 0;
106 	total_blks_to_read = count*flat_buf->count;
107 	b_blks_read = 0;
108 
109 	/* allocate arrays according to max usage */
110 	if (total_blks_to_read > MAX_ARRAY_SIZE)
111 	    mem_list_count = MAX_ARRAY_SIZE;
112 	else mem_list_count = total_blks_to_read;
113 	mem_offsets = (void*)ADIOI_Malloc(mem_list_count*sizeof(void*));
114 	mem_lengths = (size_t*)ADIOI_Malloc(mem_list_count*sizeof(size_t));
115 
116 	/* TODO: CHECK RESULTS OF MEMORY ALLOCATION */
117 
118 	j = 0;
119 	/* step through each block in memory, filling memory arrays */
120 	while (b_blks_read < total_blks_to_read) {
121 	    for (i=0; i<flat_buf->count; i++) {
122 		mem_offsets[b_blks_read % MAX_ARRAY_SIZE] =
123 		    buf + j*buftype_extent + flat_buf->indices[i];
124 		mem_lengths[b_blks_read % MAX_ARRAY_SIZE] =
125 		    flat_buf->blocklens[i];
126 		file_lengths += flat_buf->blocklens[i];
127 		b_blks_read++;
128 		if (!(b_blks_read % MAX_ARRAY_SIZE) ||
129 		    (b_blks_read == total_blks_to_read)) {
130 
131 		    /* in the case of the last read list call,
132 		       adjust mem_list_count */
133 		    if (b_blks_read == total_blks_to_read) {
134 		        mem_list_count = total_blks_to_read % MAX_ARRAY_SIZE;
135 			/* in case last read list call fills max arrays */
136 			if (!mem_list_count) mem_list_count = MAX_ARRAY_SIZE;
137 		    }
138 #ifdef ADIOI_MPE_LOGGING
139                     MPE_Log_event( ADIOI_MPE_read_a, 0, NULL );
140 #endif
141 		    NO_STALE(err_flag, fd, zoidfs_obj_ptr,
142 				    zoidfs_read(zoidfs_obj_ptr,
143 					    mem_list_count,
144 					    mem_offsets, mem_lengths,
145 					    1, &file_offsets, &file_lengths, ZOIDFS_NO_OP_HINT));
146 #ifdef ADIOI_MPE_LOGGING
147                     MPE_Log_event( ADIOI_MPE_read_b, 0, NULL );
148 #endif
149 		    /* --BEGIN ERROR HANDLING-- */
150 		    if (err_flag != ZFS_OK) {
151 			*error_code = MPIO_Err_create_code(MPI_SUCCESS,
152 							   MPIR_ERR_RECOVERABLE,
153 							   myname, __LINE__,
154 							   ADIOI_ZOIDFS_error_convert(err_flag),
155 							   "Error in zoidfs_read", 0);
156 			goto error_state;
157 		    }
158 		    total_bytes_read += file_lengths;
159 		    /* --END ERROR HANDLING-- */
160 
161 		    /* in the case of error or the last read list call,
162 		     * leave here */
163 		    if (err_flag || b_blks_read == total_blks_to_read) break;
164 
165 		    file_offsets += file_lengths;
166 		    file_lengths = 0;
167 		}
168 	    } /* for (i=0; i<flat_buf->count; i++) */
169 	    j++;
170 	} /* while (b_blks_read < total_blks_to_read) */
171 	ADIOI_Free(mem_offsets);
172 	ADIOI_Free(mem_lengths);
173 
174         if (file_ptr_type == ADIO_INDIVIDUAL)
175 	    fd->fp_ind += total_bytes_read;
176 
177 	fd->fp_sys_posn = -1;  /* set it to null. */
178 
179 #ifdef HAVE_STATUS_SET_BYTES
180 	MPIR_Status_set_bytes(status, datatype, bufsize);
181 	/* This isa temporary way of filling in status.  The right way is to
182 	   keep tracke of how much data was actually read adn placed in buf
183 	   by ADIOI_BUFFERED_READ. */
184 #endif
185 	ADIOI_Delete_flattened(datatype);
186 
187 	return;
188     } /* if (!buftype_is_contig && filetype_is_contig) */
189 
190     /* know file is noncontiguous from above */
191     /* noncontiguous in file */
192 
193     /* filetype already flattened in ADIO_Open */
194     flat_file = ADIOI_Flatlist;
195     while (flat_file->type != fd->filetype) flat_file = flat_file->next;
196 
197     disp = fd->disp;
198     initial_off = offset;
199 
200 
201     /* for each case - ADIO_Individual pointer or explicit, find the file
202        offset in bytes (offset), n_filetypes (how many filetypes into
203        file to start), frd_size (remaining amount of data in present
204        file block), and st_index (start point in terms of blocks in
205        starting filetype) */
206     if (file_ptr_type == ADIO_INDIVIDUAL) {
207         offset = fd->fp_ind; /* in bytes */
208 	n_filetypes = -1;
209 	flag = 0;
210 	while (!flag) {
211 	    n_filetypes++;
212 	    for (i=0; i<flat_file->count; i++) {
213 	        if (disp + flat_file->indices[i] +
214 		    ((ADIO_Offset) n_filetypes)*filetype_extent +
215 		    flat_file->blocklens[i]  >= offset) {
216 		    st_index = i;
217 		    frd_size = disp + flat_file->indices[i] +
218 				    ((ADIO_Offset) n_filetypes)*filetype_extent
219 				      + flat_file->blocklens[i] - offset;
220 		    flag = 1;
221 		    break;
222 		}
223 	    }
224 	} /* while (!flag) */
225     } /* if (file_ptr_type == ADIO_INDIVIDUAL) */
226     else {
227         n_etypes_in_filetype = filetype_size/etype_size;
228 	n_filetypes = (int) (offset / n_etypes_in_filetype);
229 	etype_in_filetype = (int) (offset % n_etypes_in_filetype);
230 	size_in_filetype = etype_in_filetype * etype_size;
231 
232 	sum = 0;
233 	for (i=0; i<flat_file->count; i++) {
234 	    sum += flat_file->blocklens[i];
235 	    if (sum > size_in_filetype) {
236 	        st_index = i;
237 		frd_size = sum - size_in_filetype;
238 		abs_off_in_filetype = flat_file->indices[i] +
239 		    size_in_filetype - (sum - flat_file->blocklens[i]);
240 		break;
241 	    }
242 	}
243 
244 	/* abs. offset in bytes in the file */
245 	offset = disp + ((ADIO_Offset) n_filetypes)*filetype_extent +
246 	    abs_off_in_filetype;
247     } /* else [file_ptr_type != ADIO_INDIVIDUAL] */
248 
249     start_off = offset;
250     st_frd_size = frd_size;
251     st_n_filetypes = n_filetypes;
252 
253     if (buftype_is_contig && !filetype_is_contig) {
254 
255 /* contiguous in memory, noncontiguous in file. should be the most
256    common case. */
257 
258 	/* only one memory off-len pair, so no array here */
259         size_t mem_lengths;
260 	size_t mem_offsets;
261 
262 	i = 0;
263 	j = st_index;
264 	n_filetypes = st_n_filetypes;
265 
266 	mem_list_count = 1;
267 
268 	/* determine how many blocks in file to read */
269 	f_data_read = ADIOI_MIN(st_frd_size, bufsize);
270 	total_blks_to_read = 1;
271 	if (j < (flat_file->count-1)) j++;
272 	else {
273 	    j = 0;
274 	    n_filetypes++;
275 	}
276 	while (f_data_read < bufsize) {
277 	    f_data_read += flat_file->blocklens[j];
278 	    total_blks_to_read++;
279 	    if (j<(flat_file->count-1)) j++;
280 	    else j = 0;
281 	}
282 
283 	j = st_index;
284 	n_filetypes = st_n_filetypes;
285 	n_read_lists = total_blks_to_read/MAX_ARRAY_SIZE;
286 	extra_blks = total_blks_to_read%MAX_ARRAY_SIZE;
287 
288 	mem_offsets = (size_t)buf;
289 	mem_lengths = 0;
290 
291 	/* if at least one full readlist, allocate file arrays
292 	   at max array size and don't free until very end */
293 	if (n_read_lists) {
294 	    file_offsets = (int64_t*)ADIOI_Malloc(MAX_ARRAY_SIZE*
295 						  sizeof(int64_t));
296 	    file_lengths = (uint64_t*)ADIOI_Malloc(MAX_ARRAY_SIZE*
297 						  sizeof(uint64_t));
298 	}
299 	/* if there's no full readlist allocate file arrays according
300 	   to needed size (extra_blks) */
301 	else {
302 	    file_offsets = (int64_t*)ADIOI_Malloc(extra_blks*
303 						  sizeof(int64_t));
304 	    file_lengths = (uint64_t*)ADIOI_Malloc(extra_blks*
305 						  sizeof(uint64_t));
306 	}
307 
308 	/* for file arrays that are of MAX_ARRAY_SIZE, build arrays */
309 	for (i=0; i<n_read_lists; i++) {
310 	    file_list_count = MAX_ARRAY_SIZE;
311 	    if(!i) {
312 	        file_offsets[0] = offset;
313 		file_lengths[0] = st_frd_size;
314 		mem_lengths = st_frd_size;
315 	    }
316 	    for (k=0; k<MAX_ARRAY_SIZE; k++) {
317 	        if (i || k) {
318 		    file_offsets[k] = disp +
319 			((ADIO_Offset)n_filetypes)*filetype_extent
320 		      + flat_file->indices[j];
321 		    file_lengths[k] = flat_file->blocklens[j];
322 		    mem_lengths += file_lengths[k];
323 		}
324 		if (j<(flat_file->count - 1)) j++;
325 		else {
326 		    j = 0;
327 		    n_filetypes++;
328 		}
329 	    } /* for (k=0; k<MAX_ARRAY_SIZE; k++) */
330 	    /* --END ERROR HANDLING-- */
331 #ifdef ADIOI_MPE_LOGGING
332             MPE_Log_event( ADIOI_MPE_read_a, 0, NULL );
333 #endif
334 	    NO_STALE(err_flag, fd, zoidfs_obj_ptr,
335 			    zoidfs_read(zoidfs_obj_ptr,
336 				    1, buf, &mem_lengths,
337 				    file_list_count,
338 				    file_offsets, file_lengths, ZOIDFS_NO_OP_HINT));
339 #ifdef ADIOI_MPE_LOGGING
340             MPE_Log_event( ADIOI_MPE_read_b, 0, NULL );
341 #endif
342 	    /* --BEGIN ERROR HANDLING-- */
343 	    if (err_flag != ZFS_OK) {
344 		*error_code = MPIO_Err_create_code(MPI_SUCCESS,
345 						   MPIR_ERR_RECOVERABLE,
346 						   myname, __LINE__,
347 						   ADIOI_ZOIDFS_error_convert(err_flag),
348 						   "Error in zoidfs_read", 0);
349 		goto error_state;
350 	    }
351 	    /* --END ERROR HANDING-- */
352 	    total_bytes_read += mem_lengths;
353 
354 	    mem_offsets += mem_lengths;
355 	    mem_lengths = 0;
356 	} /* for (i=0; i<n_read_lists; i++) */
357 
358 	/* for file arrays smaller than MAX_ARRAY_SIZE (last read_list call) */
359 	if (extra_blks) {
360 	    file_list_count = extra_blks;
361 	    if(!i) {
362 	        file_offsets[0] = offset;
363 		file_lengths[0] = ADIOI_MIN(st_frd_size, bufsize);
364 	    }
365 	    for (k=0; k<extra_blks; k++) {
366 	        if(i || k) {
367 		    file_offsets[k] = disp +
368 			((ADIO_Offset)n_filetypes)*filetype_extent +
369 			flat_file->indices[j];
370 		    if (k == (extra_blks - 1)) {
371 		        file_lengths[k] = bufsize - mem_lengths
372 			  - mem_offsets + (size_t)buf;
373 		    }
374 		    else file_lengths[k] = flat_file->blocklens[j];
375 		} /* if(i || k) */
376 		mem_lengths += file_lengths[k];
377 		if (j<(flat_file->count - 1)) j++;
378 		else {
379 		    j = 0;
380 		    n_filetypes++;
381 		}
382 	    } /* for (k=0; k<extra_blks; k++) */
383 #ifdef ADIOI_MPE_LOGGING
384             MPE_Log_event( ADIOI_MPE_read_a, 0, NULL );
385 #endif
386 	    NO_STALE(err_flag, fd, zoidfs_obj_ptr,
387 			    zoidfs_read(zoidfs_obj_ptr, 1,
388 				   (void **)&mem_offsets,
389 				   &mem_lengths,
390 				   file_list_count,
391 				   file_offsets, file_lengths, ZOIDFS_NO_OP_HINT));
392 #ifdef ADIOI_MPE_LOGGING
393             MPE_Log_event( ADIOI_MPE_read_b, 0, NULL );
394 #endif
395 	    /* --BEGIN ERROR HANDLING-- */
396 	    if (err_flag != 0) {
397 		*error_code = MPIO_Err_create_code(MPI_SUCCESS,
398 						   MPIR_ERR_RECOVERABLE,
399 						   myname, __LINE__,
400 						   ADIOI_ZOIDFS_error_convert(err_flag),
401 						   "Error in zoidfs_read", 0);
402 		goto error_state;
403 	    }
404 	    /* --END ERROR HANDLING-- */
405 	    total_bytes_read += mem_lengths;
406 	}
407     }
408     else {
409 /* noncontiguous in memory as well as in file */
410 
411 	flat_buf = ADIOI_Flatten_and_find(datatype);
412 
413 	size_read = 0;
414 	n_filetypes = st_n_filetypes;
415 	frd_size = st_frd_size;
416 	brd_size = flat_buf->blocklens[0];
417 	buf_count = 0;
418 	start_mem_offset = 0;
419 	start_k = k = 0;
420 	start_j = st_index;
421 	max_mem_list = 0;
422 	max_file_list = 0;
423 
424 	/* run through and file max_file_list and max_mem_list so that you
425 	   can allocate the file and memory arrays less than MAX_ARRAY_SIZE
426 	   if possible */
427 
428 	while (size_read < bufsize) {
429 	    k = start_k;
430 	    new_buffer_read = 0;
431 	    mem_list_count = 0;
432 	    while ((mem_list_count < MAX_ARRAY_SIZE) &&
433 		   (new_buffer_read < bufsize-size_read)) {
434 	        /* find mem_list_count and file_list_count such that both are
435 		   less than MAX_ARRAY_SIZE, the sum of their lengths are
436 		   equal, and the sum of all the data read and data to be
437 		   read in the next immediate read list is less than
438 		   bufsize */
439 	        if(mem_list_count) {
440 		    if((new_buffer_read + flat_buf->blocklens[k] +
441 			size_read) > bufsize) {
442 		        end_brd_size = new_buffer_read +
443 			    flat_buf->blocklens[k] - (bufsize - size_read);
444 			new_buffer_read = bufsize - size_read;
445 		    }
446 		    else {
447 		        new_buffer_read += flat_buf->blocklens[k];
448 			end_brd_size = flat_buf->blocklens[k];
449 		    }
450 		}
451 		else {
452 		    if (brd_size > (bufsize - size_read)) {
453 		        new_buffer_read = bufsize - size_read;
454 			brd_size = new_buffer_read;
455 		    }
456 		    else new_buffer_read = brd_size;
457 		}
458 		mem_list_count++;
459 		k = (k + 1)%flat_buf->count;
460 	     } /* while ((mem_list_count < MAX_ARRAY_SIZE) &&
461 	       (new_buffer_read < bufsize-size_read)) */
462 	    j = start_j;
463 	    new_file_read = 0;
464 	    file_list_count = 0;
465 	    while ((file_list_count < MAX_ARRAY_SIZE) &&
466 		   (new_file_read < new_buffer_read)) {
467 	        if(file_list_count) {
468 		    if((new_file_read + flat_file->blocklens[j]) >
469 		       new_buffer_read) {
470 		        end_frd_size = new_buffer_read - new_file_read;
471 			new_file_read = new_buffer_read;
472 			j--;
473 		    }
474 		    else {
475 		        new_file_read += flat_file->blocklens[j];
476 			end_frd_size = flat_file->blocklens[j];
477 		    }
478 		}
479 		else {
480 		    if (frd_size > new_buffer_read) {
481 		        new_file_read = new_buffer_read;
482 			frd_size = new_file_read;
483 		    }
484 		    else new_file_read = frd_size;
485 		}
486 		file_list_count++;
487 		if (j < (flat_file->count - 1)) j++;
488 		else j = 0;
489 
490 		k = start_k;
491 		if ((new_file_read < new_buffer_read) &&
492 		    (file_list_count == MAX_ARRAY_SIZE)) {
493 		    new_buffer_read = 0;
494 		    mem_list_count = 0;
495 		    while (new_buffer_read < new_file_read) {
496 		        if(mem_list_count) {
497 			    if((new_buffer_read + flat_buf->blocklens[k]) >
498 			       new_file_read) {
499 			        end_brd_size = new_file_read - new_buffer_read;
500 				new_buffer_read = new_file_read;
501 				k--;
502 			    }
503 			    else {
504 			        new_buffer_read += flat_buf->blocklens[k];
505 				end_brd_size = flat_buf->blocklens[k];
506 			    }
507 			}
508 			else {
509 			    new_buffer_read = brd_size;
510 			    if (brd_size > (bufsize - size_read)) {
511 			        new_buffer_read = bufsize - size_read;
512 				brd_size = new_buffer_read;
513 			    }
514 			}
515 			mem_list_count++;
516 			k = (k + 1)%flat_buf->count;
517 		    } /* while (new_buffer_read < new_file_read) */
518 		} /* if ((new_file_read < new_buffer_read) && (file_list_count
519 		     == MAX_ARRAY_SIZE)) */
520 	    } /* while ((mem_list_count < MAX_ARRAY_SIZE) &&
521 		 (new_buffer_read < bufsize-size_read)) */
522 
523 	    /*  fakes filling the readlist arrays of lengths found above  */
524 	    k = start_k;
525 	    j = start_j;
526 	    for (i=0; i<mem_list_count; i++) {
527 		if(i) {
528 		    if (i == (mem_list_count - 1)) {
529 			if (flat_buf->blocklens[k] == end_brd_size)
530 			    brd_size = flat_buf->blocklens[(k+1)%
531 							  flat_buf->count];
532 			else {
533 			    brd_size = flat_buf->blocklens[k] - end_brd_size;
534 			    k--;
535 			    buf_count--;
536 			}
537 		    }
538 		}
539 		buf_count++;
540 		k = (k + 1)%flat_buf->count;
541 	    } /* for (i=0; i<mem_list_count; i++) */
542 	    for (i=0; i<file_list_count; i++) {
543 		if (i) {
544 		    if (i == (file_list_count - 1)) {
545 			if (flat_file->blocklens[j] == end_frd_size)
546 			    frd_size = flat_file->blocklens[(j+1)%
547 							  flat_file->count];
548 			else {
549 			    frd_size = flat_file->blocklens[j] - end_frd_size;
550 			    j--;
551 			}
552 		    }
553 		}
554 		if (j < flat_file->count - 1) j++;
555 		else {
556 		    j = 0;
557 		    n_filetypes++;
558 		}
559 	    } /* for (i=0; i<file_list_count; i++) */
560 	    size_read += new_buffer_read;
561 	    start_k = k;
562 	    start_j = j;
563 	    if (max_mem_list < mem_list_count)
564 	        max_mem_list = mem_list_count;
565 	    if (max_file_list < file_list_count)
566 	        max_file_list = file_list_count;
567 	} /* while (size_read < bufsize) */
568 
569 	/* one last check before we actually carry out the operation:
570 	 * this code has hard-to-fix bugs when a noncontiguous file type has
571 	 * such large pieces that the sum of the lengths of the memory type is
572 	 * not larger than one of those pieces (and vice versa for large memory
573 	 * types and many pices of file types.  In these cases, give up and
574 	 * fall back to naive reads and writes.  The testphdf5 test created a
575 	 * type with two very large memory regions and 600 very small file
576 	 * regions.  The same test also created a type with one very large file
577 	 * region and many (700) very small memory regions.  both cases caused
578 	 * problems for this code */
579 
580 	if ( ( (file_list_count == 1) &&
581 		    (new_file_read < flat_file->blocklens[0] ) ) ||
582 		((mem_list_count == 1) &&
583 		    (new_buffer_read < flat_buf->blocklens[0]) ) ||
584 		((file_list_count == MAX_ARRAY_SIZE) &&
585 		    (new_file_read < flat_buf->blocklens[0]) ) ||
586 		( (mem_list_count == MAX_ARRAY_SIZE) &&
587 		    (new_buffer_read < flat_file->blocklens[0])) )
588 	{
589 
590 	    ADIOI_Delete_flattened(datatype);
591 	    ADIOI_GEN_ReadStrided_naive(fd, buf, count, datatype,
592 		    file_ptr_type, initial_off, status, error_code);
593 	    return;
594 	}
595 
596 	mem_offsets = (void *)ADIOI_Malloc(max_mem_list*sizeof(void *));
597 	mem_lengths = (size_t*)ADIOI_Malloc(max_mem_list*sizeof(size_t));
598 	file_offsets = (uint64_t *)ADIOI_Malloc(max_file_list*sizeof(uint64_t));
599 	file_lengths = (uint64_t *)ADIOI_Malloc(max_file_list*sizeof(uint64_t));
600 
601 	size_read = 0;
602 	n_filetypes = st_n_filetypes;
603 	frd_size = st_frd_size;
604 	brd_size = flat_buf->blocklens[0];
605 	buf_count = 0;
606 	start_mem_offset = 0;
607 	start_k = k = 0;
608 	start_j = st_index;
609 
610 	/*  this section calculates mem_list_count and file_list_count
611 	    and also finds the possibly odd sized last array elements
612 	    in new_frd_size and new_brd_size  */
613 
614 	while (size_read < bufsize) {
615 	    k = start_k;
616 	    new_buffer_read = 0;
617 	    mem_list_count = 0;
618 	    while ((mem_list_count < MAX_ARRAY_SIZE) &&
619 		   (new_buffer_read < bufsize-size_read)) {
620 	        /* find mem_list_count and file_list_count such that both are
621 		   less than MAX_ARRAY_SIZE, the sum of their lengths are
622 		   equal, and the sum of all the data read and data to be
623 		   read in the next immediate read list is less than
624 		   bufsize */
625 	        if(mem_list_count) {
626 		    if((new_buffer_read + flat_buf->blocklens[k] +
627 			size_read) > bufsize) {
628 		        end_brd_size = new_buffer_read +
629 			    flat_buf->blocklens[k] - (bufsize - size_read);
630 			new_buffer_read = bufsize - size_read;
631 		    }
632 		    else {
633 		        new_buffer_read += flat_buf->blocklens[k];
634 			end_brd_size = flat_buf->blocklens[k];
635 		    }
636 		}
637 		else {
638 		    if (brd_size > (bufsize - size_read)) {
639 		        new_buffer_read = bufsize - size_read;
640 			brd_size = new_buffer_read;
641 		    }
642 		    else new_buffer_read = brd_size;
643 		}
644 		mem_list_count++;
645 		k = (k + 1)%flat_buf->count;
646 	     } /* while ((mem_list_count < MAX_ARRAY_SIZE) &&
647 	       (new_buffer_read < bufsize-size_read)) */
648 	    j = start_j;
649 	    new_file_read = 0;
650 	    file_list_count = 0;
651 	    while ((file_list_count < MAX_ARRAY_SIZE) &&
652 		   (new_file_read < new_buffer_read)) {
653 	        if(file_list_count) {
654 		    if((new_file_read + flat_file->blocklens[j]) >
655 		       new_buffer_read) {
656 		        end_frd_size = new_buffer_read - new_file_read;
657 			new_file_read = new_buffer_read;
658 			j--;
659 		    }
660 		    else {
661 		        new_file_read += flat_file->blocklens[j];
662 			end_frd_size = flat_file->blocklens[j];
663 		    }
664 		}
665 		else {
666 		    if (frd_size > new_buffer_read) {
667 		        new_file_read = new_buffer_read;
668 			frd_size = new_file_read;
669 		    }
670 		    else new_file_read = frd_size;
671 		}
672 		file_list_count++;
673 		if (j < (flat_file->count - 1)) j++;
674 		else j = 0;
675 
676 		k = start_k;
677 		if ((new_file_read < new_buffer_read) &&
678 		    (file_list_count == MAX_ARRAY_SIZE)) {
679 		    new_buffer_read = 0;
680 		    mem_list_count = 0;
681 		    while (new_buffer_read < new_file_read) {
682 		        if(mem_list_count) {
683 			    if((new_buffer_read + flat_buf->blocklens[k]) >
684 			       new_file_read) {
685 			        end_brd_size = new_file_read - new_buffer_read;
686 				new_buffer_read = new_file_read;
687 				k--;
688 			    }
689 			    else {
690 			        new_buffer_read += flat_buf->blocklens[k];
691 				end_brd_size = flat_buf->blocklens[k];
692 			    }
693 			}
694 			else {
695 			    new_buffer_read = brd_size;
696 			    if (brd_size > (bufsize - size_read)) {
697 			        new_buffer_read = bufsize - size_read;
698 				brd_size = new_buffer_read;
699 			    }
700 			}
701 			mem_list_count++;
702 			k = (k + 1)%flat_buf->count;
703 		    } /* while (new_buffer_read < new_file_read) */
704 		} /* if ((new_file_read < new_buffer_read) && (file_list_count
705 		     == MAX_ARRAY_SIZE)) */
706 	    } /* while ((mem_list_count < MAX_ARRAY_SIZE) &&
707 		 (new_buffer_read < bufsize-size_read)) */
708 
709 	    /*  fills the allocated readlist arrays  */
710 	    k = start_k;
711 	    j = start_j;
712 	    for (i=0; i<mem_list_count; i++) {
713 	        mem_offsets[i] = buf +
714 			buftype_extent* (buf_count/flat_buf->count) +
715 					 flat_buf->indices[k];
716 		if(!i) {
717 		    mem_lengths[0] = brd_size;
718 		    mem_offsets[0] += flat_buf->blocklens[k] - brd_size;
719 		}
720 		else {
721 		    if (i == (mem_list_count - 1)) {
722 		        mem_lengths[i] = end_brd_size;
723 			if (flat_buf->blocklens[k] == end_brd_size)
724 			    brd_size = flat_buf->blocklens[(k+1)%
725 							  flat_buf->count];
726 			else {
727 			    brd_size = flat_buf->blocklens[k] - end_brd_size;
728 			    k--;
729 			    buf_count--;
730 			}
731 		    }
732 		    else {
733 		        mem_lengths[i] = flat_buf->blocklens[k];
734 		    }
735 		}
736 		buf_count++;
737 		k = (k + 1)%flat_buf->count;
738 	    } /* for (i=0; i<mem_list_count; i++) */
739 	    for (i=0; i<file_list_count; i++) {
740 	        file_offsets[i] = disp + flat_file->indices[j] +
741 		    ((ADIO_Offset)n_filetypes) * filetype_extent;
742 	        if (!i) {
743 		    file_lengths[0] = frd_size;
744 		    file_offsets[0] += flat_file->blocklens[j] - frd_size;
745 		}
746 		else {
747 		    if (i == (file_list_count - 1)) {
748 		        file_lengths[i] = end_frd_size;
749 			if (flat_file->blocklens[j] == end_frd_size)
750 			    frd_size = flat_file->blocklens[(j+1)%
751 							  flat_file->count];
752 			else {
753 			    frd_size = flat_file->blocklens[j] - end_frd_size;
754 			    j--;
755 			}
756 		    }
757 		    else file_lengths[i] = flat_file->blocklens[j];
758 		}
759 		if (j < flat_file->count - 1) j++;
760 		else {
761 		    j = 0;
762 		    n_filetypes++;
763 		}
764 	    } /* for (i=0; i<file_list_count; i++) */
765 
766 #ifdef ADIOI_MPE_LOGGING
767             MPE_Log_event( ADIOI_MPE_read_a, 0, NULL );
768 #endif
769 	    NO_STALE(err_flag, fd, zoidfs_obj_ptr,
770 			    zoidfs_read(zoidfs_obj_ptr,
771 				    mem_list_count, mem_offsets, mem_lengths,
772 				    file_list_count,
773 				    file_offsets, file_lengths, ZOIDFS_NO_OP_HINT));
774 #ifdef ADIOI_MPE_LOGGING
775             MPE_Log_event( ADIOI_MPE_read_b, 0, NULL );
776 #endif
777 	    /* --BEGIN ERROR HANDLING-- */
778 	    if (err_flag != ZFS_OK) {
779 		*error_code = MPIO_Err_create_code(MPI_SUCCESS,
780 						   MPIR_ERR_RECOVERABLE,
781 						   myname, __LINE__,
782 						   ADIOI_ZOIDFS_error_convert(err_flag),
783 						   "Error in zoidfs_read", 0);
784 	    }
785 	    /* --END ERROR HANDLING-- */
786 	    size_read += new_buffer_read;
787 	    total_bytes_read += new_buffer_read; /* XXX: is this right? */
788 	    start_k = k;
789 	    start_j = j;
790 	} /* while (size_read < bufsize) */
791 	ADIOI_Free(mem_offsets);
792 	ADIOI_Free(mem_lengths);
793     }
794     /* Other ADIO routines will convert absolute bytes into counts of datatypes */
795     /* when incrementing fp_ind, need to also take into account the file type:
796      * consider an N-element 1-d subarray with a lb and ub: ( |---xxxxx-----|
797      * if we wrote N elements, offset needs to point at beginning of type, not
798      * at empty region at offset N+1)
799      *
800      * As we discussed on mpich-discuss in may/june 2009, the code below might
801      * look wierd, but by putting fp_ind at the last byte written, the next
802      * time we run through the strided code we'll update the fp_ind to the
803      * right location. */
804     if (file_ptr_type == ADIO_INDIVIDUAL) {
805 	fd->fp_ind = file_offsets[file_list_count-1]+
806 	    file_lengths[file_list_count-1];
807     }
808 
809     ADIOI_Free(file_offsets);
810     ADIOI_Free(file_lengths);
811 
812     if (err_flag == 0) *error_code = MPI_SUCCESS;
813 
814 error_state:
815     fd->fp_sys_posn = -1;   /* set it to null. */
816 
817 #ifdef HAVE_STATUS_SET_BYTES
818     MPIR_Status_set_bytes(status, datatype, bufsize);
819     /* This is a temporary way of filling in status. The right way is to
820        keep track of how much data was actually read and placed in buf
821        by ADIOI_BUFFERED_READ. */
822 #endif
823 
824     if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
825 }
826 
827