1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  *   Copyright (C) 1997 University of Chicago.
4  *   See COPYRIGHT notice in top-level directory.
5  */
6 
7 #include "adio.h"
8 #include "adio_extern.h"
9 /* #ifdef MPISGI
10 #include "mpisgi2.h"
11 #endif */
12 
13 #ifdef USE_DBG_LOGGING
14   #define FLATTEN_DEBUG 1
15 #endif
16 
17 struct adio_short_int {
18   short elem_s;
19   int elem_i;
20 };
21 
22 struct adio_double_int {
23   double elem_d;
24   int elem_i;
25 };
26 
27 struct adio_long_int {
28   long elem_l;
29   int elem_i;
30 };
31 
32 struct adio_long_double_int {
33   long double elem_ld;
34   int elem_i;
35 };
36 
ADIOI_Type_get_envelope(MPI_Datatype datatype,int * num_integers,int * num_addresses,int * num_datatypes,int * combiner)37 int ADIOI_Type_get_envelope (MPI_Datatype datatype, int *num_integers,
38                              int *num_addresses, int *num_datatypes, int *combiner)
39 {
40   int rc, is_contig;
41 
42   ADIOI_Datatype_iscontig(datatype, &is_contig);
43 
44   rc = MPI_Type_get_envelope (datatype, num_integers, num_addresses, num_datatypes, combiner);
45   if (MPI_SUCCESS != rc || MPI_COMBINER_NAMED != *combiner || is_contig) {
46     return rc;
47   }
48 
49   if (MPI_SHORT_INT == datatype || MPI_DOUBLE_INT == datatype || MPI_LONG_DOUBLE_INT == datatype ||
50       MPI_LONG_INT == datatype) {
51       *num_integers = 2;
52       *num_addresses = 2;
53       *num_datatypes = 2;
54       *combiner = MPI_COMBINER_STRUCT;
55   }
56 
57   return rc;
58 }
59 
ADIOI_Type_get_contents(MPI_Datatype datatype,int max_integers,int max_addresses,int max_datatypes,int array_of_integers[],MPI_Aint array_of_addresses[],MPI_Datatype array_of_datatypes[])60 int ADIOI_Type_get_contents (MPI_Datatype datatype, int max_integers,
61                              int max_addresses, int max_datatypes, int array_of_integers[],
62                              MPI_Aint array_of_addresses[], MPI_Datatype array_of_datatypes[])
63 {
64   int dontcare, combiner;
65   int rc;
66 
67   rc = MPI_Type_get_envelope (datatype, &dontcare, &dontcare, &dontcare, &combiner);
68   if (MPI_SUCCESS != rc) {
69     return rc;
70   }
71 
72   if (MPI_COMBINER_NAMED != combiner) {
73     return MPI_Type_get_contents (datatype, max_integers, max_addresses, max_datatypes,
74                                   array_of_integers, array_of_addresses, array_of_datatypes);
75   }
76 
77   array_of_integers[0] = 1;
78   array_of_integers[1] = 1;
79   array_of_addresses[0] = 0;
80   array_of_datatypes[1] = MPI_INT;
81 
82   if (MPI_SHORT_INT == datatype) {
83       array_of_datatypes[0] = MPI_SHORT;
84       array_of_addresses[1] = offsetof (struct adio_short_int, elem_i);
85   } else if (MPI_DOUBLE_INT == datatype) {
86       array_of_datatypes[0] = MPI_DOUBLE;
87       array_of_addresses[1] = offsetof (struct adio_double_int, elem_i);
88   } else if (MPI_LONG_DOUBLE_INT == datatype) {
89       array_of_datatypes[0] = MPI_LONG_DOUBLE;
90       array_of_addresses[1] = offsetof (struct adio_long_double_int, elem_i);
91   } else if (MPI_LONG_INT == datatype) {
92       array_of_datatypes[0] = MPI_LONG;
93       array_of_addresses[1] = offsetof (struct adio_long_int, elem_i);
94   } else {
95     rc = MPI_ERR_TYPE;
96   }
97 
98   return rc;
99 }
100 
101 void ADIOI_Optimize_flattened(ADIOI_Flatlist_node *flat_type);
102 /* flatten datatype and add it to Flatlist */
ADIOI_Flatten_datatype(MPI_Datatype datatype)103 void ADIOI_Flatten_datatype(MPI_Datatype datatype)
104 {
105 #ifdef HAVE_MPIR_TYPE_FLATTEN
106     MPI_Aint flatten_idx;
107 #endif
108     MPI_Count curr_index=0;
109     int is_contig;
110     ADIOI_Flatlist_node *flat, *prev=0;
111 
112     /* check if necessary to flatten. */
113 
114     /* is it entirely contiguous? */
115     ADIOI_Datatype_iscontig(datatype, &is_contig);
116   #ifdef FLATTEN_DEBUG
117   DBG_FPRINTF(stderr,"ADIOI_Flatten_datatype:: is_contig %#X\n",is_contig);
118   #endif
119     if (is_contig) return;
120 
121     /* has it already been flattened? */
122     flat = ADIOI_Flatlist;
123     while (flat) {
124 	if (flat->type == datatype) {
125       #ifdef FLATTEN_DEBUG
126       DBG_FPRINTF(stderr,"ADIOI_Flatten_datatype:: found datatype %#X\n", datatype);
127       #endif
128 		return;
129 	}
130 	else {
131 	    prev = flat;
132 	    flat = flat->next;
133 	}
134     }
135 
136     /* flatten and add to the list */
137     flat = prev;
138     flat->next = (ADIOI_Flatlist_node *)ADIOI_Malloc(sizeof(ADIOI_Flatlist_node));
139     flat = flat->next;
140 
141     flat->type = datatype;
142     flat->next = NULL;
143     flat->blocklens = NULL;
144     flat->indices = NULL;
145 
146     flat->count = ADIOI_Count_contiguous_blocks(datatype, &curr_index);
147 #ifdef FLATTEN_DEBUG
148     DBG_FPRINTF(stderr,"ADIOI_Flatten_datatype:: count %llX, cur_idx = %#llX\n",flat->count,curr_index);
149 #endif
150 /*    DBG_FPRINTF(stderr, "%d\n", flat->count);*/
151 
152     if (flat->count) {
153 	flat->blocklens = (ADIO_Offset *) ADIOI_Malloc(flat->count * sizeof(ADIO_Offset));
154 	flat->indices = (ADIO_Offset *) ADIOI_Malloc(flat->count * sizeof(ADIO_Offset));
155     }
156 
157     curr_index = 0;
158 #ifdef HAVE_MPIR_TYPE_FLATTEN
159     flatten_idx = (MPI_Aint) flat->count;
160     MPIR_Type_flatten(datatype, flat->indices, flat->blocklens, &flatten_idx);
161   #ifdef FLATTEN_DEBUG
162   DBG_FPRINTF(stderr,"ADIOI_Flatten_datatype:: MPIR_Type_flatten\n");
163   #endif
164 #else
165     ADIOI_Flatten(datatype, flat, 0, &curr_index);
166   #ifdef FLATTEN_DEBUG
167   DBG_FPRINTF(stderr,"ADIOI_Flatten_datatype:: ADIOI_Flatten\n");
168   #endif
169 
170     ADIOI_Optimize_flattened(flat);
171 #endif
172 /* debug */
173 #ifdef FLATTEN_DEBUG
174     {
175 	int i;
176 	for (i=0; i<flat->count; i++)
177       DBG_FPRINTF(stderr,"ADIOI_Flatten_datatype:: i %#X, blocklens %#llX, indices %#llX\n",
178               i,
179               flat->blocklens[i],
180               flat->indices[i]
181              );
182   }
183 #endif
184 
185 }
186 
187 /* ADIOI_Flatten()
188  *
189  * Assumption: input datatype is not a basic!!!!
190  */
ADIOI_Flatten(MPI_Datatype datatype,ADIOI_Flatlist_node * flat,ADIO_Offset st_offset,MPI_Count * curr_index)191 void ADIOI_Flatten(MPI_Datatype datatype, ADIOI_Flatlist_node *flat,
192 		  ADIO_Offset st_offset, MPI_Count *curr_index)
193 {
194     int i, k, m, n, basic_num, nonzeroth, is_hindexed_block=0;
195     int combiner, old_combiner, old_is_contig;
196     int nints, nadds, ntypes, old_nints, old_nadds, old_ntypes;
197     /* By using ADIO_Offset we preserve +/- sign and
198          avoid >2G integer arithmetic problems */
199     ADIO_Offset top_count;
200     MPI_Count j, old_size, prev_index, num;
201     MPI_Aint old_extent;/* Assume extents are non-negative */
202     int *ints;
203     MPI_Aint *adds; /* Make no assumptions about +/- sign on these */
204     MPI_Datatype *types;
205     ADIOI_Type_get_envelope(datatype, &nints, &nadds, &ntypes, &combiner);
206     if (combiner == MPI_COMBINER_NAMED) {
207         return;  /* can't do anything else: calling get_contents on a builtin
208                     type is an error */
209     }
210     ints = (int *) ADIOI_Malloc((nints+1)*sizeof(int));
211     adds = (MPI_Aint *) ADIOI_Malloc((nadds+1)*sizeof(MPI_Aint));
212     types = (MPI_Datatype *) ADIOI_Malloc((ntypes+1)*sizeof(MPI_Datatype));
213     ADIOI_Type_get_contents(datatype, nints, nadds, ntypes, ints, adds, types);
214 
215   #ifdef FLATTEN_DEBUG
216   DBG_FPRINTF(stderr,"ADIOI_Flatten:: st_offset %#llX, curr_index %#llX\n",st_offset,*curr_index);
217   DBG_FPRINTF(stderr,"ADIOI_Flatten:: nints %#X, nadds %#X, ntypes %#X\n",nints, nadds, ntypes);
218   for(i=0; i< nints; ++i)
219   {
220     DBG_FPRINTF(stderr,"ADIOI_Flatten:: ints[%d]=%#X\n",i,ints[i]);
221   }
222   for(i=0; i< nadds; ++i)
223   {
224     DBG_FPRINTF(stderr,"ADIOI_Flatten:: adds[%d]="MPI_AINT_FMT_HEX_SPEC"\n",i,adds[i]);
225   }
226   for(i=0; i< ntypes; ++i)
227   {
228     DBG_FPRINTF(stderr,"ADIOI_Flatten:: types[%d]=%#llX\n",i,(unsigned long long)(unsigned long)types[i]);
229   }
230   #endif
231   /* Chapter 4, page 83: when processing datatypes, note this item from the
232    * standard:
233 	 Most datatype constructors have replication count or block length
234 	 arguments.  Allowed values are non-negative integers. If the value is
235 	 zero, no elements are generated in the type map and there is no effect
236 	 on datatype bounds or extent.  */
237 
238     switch (combiner) {
239 #ifdef MPIIMPL_HAVE_MPI_COMBINER_DUP
240     case MPI_COMBINER_DUP:
241     #ifdef FLATTEN_DEBUG
242     DBG_FPRINTF(stderr,"ADIOI_Flatten:: MPI_COMBINER_DUP\n");
243     #endif
244         ADIOI_Type_get_envelope(types[0], &old_nints, &old_nadds,
245 			      &old_ntypes, &old_combiner);
246         ADIOI_Datatype_iscontig(types[0], &old_is_contig);
247 	if ((old_combiner != MPI_COMBINER_NAMED) && (!old_is_contig))
248             ADIOI_Flatten(types[0], flat, st_offset, curr_index);
249         break;
250 #endif
251 #ifdef MPIIMPL_HAVE_MPI_COMBINER_SUBARRAY
252     case MPI_COMBINER_SUBARRAY:
253         {
254 	    int dims = ints[0];
255 	    MPI_Datatype stype;
256       #ifdef FLATTEN_DEBUG
257       DBG_FPRINTF(stderr,"ADIOI_Flatten:: MPI_COMBINER_SUBARRAY\n");
258       #endif
259 
260 	    ADIO_Type_create_subarray(dims,
261 				      &ints[1],        /* sizes */
262 				      &ints[dims+1],   /* subsizes */
263 				      &ints[2*dims+1], /* starts */
264 				      ints[3*dims+1],  /* order */
265 				      types[0],        /* type */
266 				      &stype);
267 	    ADIOI_Flatten(stype, flat, st_offset, curr_index);
268 	    MPI_Type_free(&stype);
269 	}
270 	break;
271 #endif
272 #ifdef MPIIMPL_HAVE_MPI_COMBINER_DARRAY
273     case MPI_COMBINER_DARRAY:
274 	{
275 	    int dims = ints[2];
276 	    MPI_Datatype dtype;
277       #ifdef FLATTEN_DEBUG
278       DBG_FPRINTF(stderr,"ADIOI_Flatten:: MPI_COMBINER_DARRAY\n");
279       #endif
280 
281 	    ADIO_Type_create_darray(ints[0],         /* size */
282 				    ints[1],         /* rank */
283 				    dims,
284 				    &ints[3],        /* gsizes */
285 				    &ints[dims+3],   /* distribs */
286 				    &ints[2*dims+3], /* dargs */
287 				    &ints[3*dims+3], /* psizes */
288 				    ints[4*dims+3],  /* order */
289 				    types[0],
290 				    &dtype);
291       #ifdef FLATTEN_DEBUG
292       DBG_FPRINTF(stderr,"ADIOI_Flatten:: MPI_COMBINER_DARRAY <ADIOI_Flatten(dtype, flat->indices[%#X] %#llX, flat->blocklens[%#X] %#llX, st_offset %#llX, curr_index %#llX);\n",
293               0, flat->indices[0], 0, flat->blocklens[0], st_offset, *curr_index);
294       #endif
295 	    ADIOI_Flatten(dtype, flat, st_offset, curr_index);
296       #ifdef FLATTEN_DEBUG
297       DBG_FPRINTF(stderr,"ADIOI_Flatten:: MPI_COMBINER_DARRAY >ADIOI_Flatten(dtype, flat->indices[%#X] %#llX, flat->blocklens[%#X] %#llX, st_offset %#llX, curr_index %#llX);\n",
298               0, flat->indices[0], 0, flat->blocklens[0], st_offset, *curr_index);
299       #endif
300 	    MPI_Type_free(&dtype);
301 	}
302 	break;
303 #endif
304     case MPI_COMBINER_CONTIGUOUS:
305     #ifdef FLATTEN_DEBUG
306     DBG_FPRINTF(stderr,"ADIOI_Flatten:: MPI_COMBINER_CONTIGUOUS\n");
307     #endif
308 	top_count = ints[0];
309         ADIOI_Type_get_envelope(types[0], &old_nints, &old_nadds,
310 			      &old_ntypes, &old_combiner);
311         ADIOI_Datatype_iscontig(types[0], &old_is_contig);
312 
313 	prev_index = *curr_index;
314 	if ((old_combiner != MPI_COMBINER_NAMED) && (!old_is_contig))
315 	    ADIOI_Flatten(types[0], flat, st_offset, curr_index);
316 
317 	if (prev_index == *curr_index) {
318 /* simplest case, made up of basic or contiguous types */
319 	    j = *curr_index;
320 	    flat->indices[j] = st_offset;
321 	    MPI_Type_size_x(types[0], &old_size);
322 	    flat->blocklens[j] = top_count * old_size;
323       #ifdef FLATTEN_DEBUG
324       DBG_FPRINTF(stderr,"ADIOI_Flatten:: simple flat->indices[%#llX] %#llX, flat->blocklens[%#llX] %#llX\n",j, flat->indices[j], j, flat->blocklens[j]);
325       #endif
326 	    (*curr_index)++;
327 	}
328 	else {
329 /* made up of noncontiguous derived types */
330 	    j = *curr_index;
331 	    num = *curr_index - prev_index;
332 
333 /* The noncontiguous types have to be replicated count times */
334 	    MPI_Type_extent(types[0], &old_extent);
335 	    for (m=1; m<top_count; m++) {
336 		for (i=0; i<num; i++) {
337 		    flat->indices[j] = flat->indices[j-num] + ADIOI_AINT_CAST_TO_OFFSET old_extent;
338 		    flat->blocklens[j] = flat->blocklens[j-num];
339           #ifdef FLATTEN_DEBUG
340           DBG_FPRINTF(stderr,"ADIOI_Flatten:: derived flat->indices[%#llX] %#llX, flat->blocklens[%#llX] %#llX\n",j, flat->indices[j], j, flat->blocklens[j]);
341           #endif
342 		    j++;
343 		}
344 	    }
345 	    *curr_index = j;
346 	}
347 	break;
348 
349     case MPI_COMBINER_VECTOR:
350     #ifdef FLATTEN_DEBUG
351     DBG_FPRINTF(stderr,"ADIOI_Flatten:: MPI_COMBINER_VECTOR\n");
352     #endif
353 	top_count = ints[0];
354         ADIOI_Type_get_envelope(types[0], &old_nints, &old_nadds,
355 			      &old_ntypes, &old_combiner);
356         ADIOI_Datatype_iscontig(types[0], &old_is_contig);
357 
358 	prev_index = *curr_index;
359 	if ((old_combiner != MPI_COMBINER_NAMED) && (!old_is_contig))
360 	    ADIOI_Flatten(types[0], flat, st_offset, curr_index);
361 
362 	if (prev_index == *curr_index) {
363 /* simplest case, vector of basic or contiguous types */
364     /* By using ADIO_Offset we preserve +/- sign and
365          avoid >2G integer arithmetic problems */
366     ADIO_Offset blocklength = ints[1], stride = ints[2];
367 	    j = *curr_index;
368 	    flat->indices[j] = st_offset;
369 	    MPI_Type_size_x(types[0], &old_size);
370 	    flat->blocklens[j] = blocklength * old_size;
371 	    for (i=j+1; i<j+top_count; i++) {
372 		flat->indices[i] = flat->indices[i-1] + stride * old_size;
373 		flat->blocklens[i] = flat->blocklens[j];
374 	    }
375 	    *curr_index = i;
376 	}
377 	else {
378 /* vector of noncontiguous derived types */
379     /* By using ADIO_Offset we preserve +/- sign and
380          avoid >2G integer arithmetic problems */
381     ADIO_Offset blocklength = ints[1], stride = ints[2];
382 
383 	    j = *curr_index;
384 	    num = *curr_index - prev_index;
385 
386 /* The noncontiguous types have to be replicated blocklen times
387    and then strided. Replicate the first one. */
388 	    MPI_Type_extent(types[0], &old_extent);
389 	    for (m=1; m<blocklength; m++) {
390 		for (i=0; i<num; i++) {
391 		    flat->indices[j] = flat->indices[j-num] + ADIOI_AINT_CAST_TO_OFFSET old_extent;
392 		    flat->blocklens[j] = flat->blocklens[j-num];
393 		    j++;
394 		}
395 	    }
396 	    *curr_index = j;
397 
398 /* Now repeat with strides. */
399 	    num = *curr_index - prev_index;
400 	    for (i=1; i<top_count; i++) {
401  		for (m=0; m<num; m++) {
402 		   flat->indices[j] =  flat->indices[j-num] + stride * ADIOI_AINT_CAST_TO_OFFSET old_extent;
403 		   flat->blocklens[j] = flat->blocklens[j-num];
404 		   j++;
405 		}
406 	    }
407 	    *curr_index = j;
408 	}
409 	break;
410 
411     case MPI_COMBINER_HVECTOR:
412     case MPI_COMBINER_HVECTOR_INTEGER:
413     #ifdef FLATTEN_DEBUG
414     DBG_FPRINTF(stderr,"ADIOI_Flatten:: MPI_COMBINER_HVECTOR_INTEGER\n");
415     #endif
416 	top_count = ints[0];
417         ADIOI_Type_get_envelope(types[0], &old_nints, &old_nadds,
418 			      &old_ntypes, &old_combiner);
419         ADIOI_Datatype_iscontig(types[0], &old_is_contig);
420 
421 	prev_index = *curr_index;
422 	if ((old_combiner != MPI_COMBINER_NAMED) && (!old_is_contig))
423 	    ADIOI_Flatten(types[0], flat, st_offset, curr_index);
424 
425 	if (prev_index == *curr_index) {
426 /* simplest case, vector of basic or contiguous types */
427     /* By using ADIO_Offset we preserve +/- sign and
428          avoid >2G integer arithmetic problems */
429     ADIO_Offset blocklength = ints[1];
430 	    j = *curr_index;
431 	    flat->indices[j] = st_offset;
432 	    MPI_Type_size_x(types[0], &old_size);
433 	    flat->blocklens[j] = blocklength * old_size;
434 	    for (i=j+1; i<j+top_count; i++) {
435 		flat->indices[i] = flat->indices[i-1] + adds[0];
436 		flat->blocklens[i] = flat->blocklens[j];
437 	    }
438 	    *curr_index = i;
439 	}
440 	else {
441 /* vector of noncontiguous derived types */
442     /* By using ADIO_Offset we preserve +/- sign and
443          avoid >2G integer arithmetic problems */
444     ADIO_Offset blocklength = ints[1];
445 
446 	    j = *curr_index;
447 	    num = *curr_index - prev_index;
448 
449 /* The noncontiguous types have to be replicated blocklen times
450    and then strided. Replicate the first one. */
451 	    MPI_Type_extent(types[0], &old_extent);
452 	    for (m=1; m<blocklength; m++) {
453 		for (i=0; i<num; i++) {
454 		    flat->indices[j] = flat->indices[j-num] + ADIOI_AINT_CAST_TO_OFFSET old_extent;
455 		    flat->blocklens[j] = flat->blocklens[j-num];
456 		    j++;
457 		}
458 	    }
459 	    *curr_index = j;
460 
461 /* Now repeat with strides. */
462 	    num = *curr_index - prev_index;
463 	    for (i=1; i<top_count; i++) {
464  		for (m=0; m<num; m++) {
465 		   flat->indices[j] =  flat->indices[j-num] + adds[0];
466 		   flat->blocklens[j] = flat->blocklens[j-num];
467 		   j++;
468 		}
469 	    }
470 	    *curr_index = j;
471 	}
472 	break;
473 
474     case MPI_COMBINER_INDEXED:
475     #ifdef FLATTEN_DEBUG
476     DBG_FPRINTF(stderr,"ADIOI_Flatten:: MPI_COMBINER_INDEXED\n");
477     #endif
478 	top_count = ints[0];
479         ADIOI_Type_get_envelope(types[0], &old_nints, &old_nadds,
480 			      &old_ntypes, &old_combiner);
481         ADIOI_Datatype_iscontig(types[0], &old_is_contig);
482 	MPI_Type_extent(types[0], &old_extent);
483 
484 	prev_index = *curr_index;
485 	if ((old_combiner != MPI_COMBINER_NAMED) && (!old_is_contig))
486   {
487     /* By using ADIO_Offset we preserve +/- sign and
488          avoid >2G integer arithmetic problems */
489     ADIO_Offset stride = ints[top_count+1];
490         ADIOI_Flatten(types[0], flat,
491          st_offset+stride* ADIOI_AINT_CAST_TO_OFFSET old_extent, curr_index);
492   }
493 
494 	if (prev_index == *curr_index) {
495 /* simplest case, indexed type made up of basic or contiguous types */
496 	    j = *curr_index;
497 	    for (i=j, nonzeroth=i; i<j+top_count; i++) {
498     /* By using ADIO_Offset we preserve +/- sign and
499          avoid >2G integer arithmetic problems */
500     ADIO_Offset blocklength = ints[1+i-j], stride = ints[top_count+1+i-j];
501 		if (blocklength > 0) {
502 		    flat->indices[nonzeroth] =
503 			st_offset + stride* ADIOI_AINT_CAST_TO_OFFSET old_extent;
504 		    flat->blocklens[nonzeroth] =
505 			blocklength* ADIOI_AINT_CAST_TO_OFFSET old_extent;
506 		    nonzeroth++;
507 		} else {
508 		    flat->count--; /* don't count/consider any zero-length blocklens */
509 		}
510 	    }
511 	    *curr_index = i;
512 	}
513 	else {
514 /* indexed type made up of noncontiguous derived types */
515 
516 	    j = *curr_index;
517 	    num = *curr_index - prev_index;
518 	    basic_num = num;
519 
520 /* The noncontiguous types have to be replicated blocklens[i] times
521    and then strided. Replicate the first one. */
522 	    for (m=1; m<ints[1]; m++) {
523 		for (i=0, nonzeroth = j; i<num; i++) {
524 		    if (flat->blocklens[j-num] > 0) {
525 			flat->indices[nonzeroth] =
526 			    flat->indices[nonzeroth-num] + ADIOI_AINT_CAST_TO_OFFSET old_extent;
527 			flat->blocklens[nonzeroth] =
528 			    flat->blocklens[nonzeroth-num];
529 			j++;
530 			nonzeroth++;
531 		    } else {
532 			flat->count --;
533 		    }
534 		}
535 	    }
536 	    *curr_index = j;
537 
538 /* Now repeat with strides. */
539 	    for (i=1; i<top_count; i++) {
540 		num = *curr_index - prev_index;
541 		prev_index = *curr_index;
542 		for (m=0, nonzeroth=j; m<basic_num; m++) {
543       /* By using ADIO_Offset we preserve +/- sign and
544          avoid >2G integer arithmetic problems */
545       ADIO_Offset stride = ints[top_count+1+i]-ints[top_count+i];
546 		    if (flat->blocklens[j-num] > 0 ) {
547 			flat->indices[nonzeroth] =
548 			    flat->indices[j-num] + stride* ADIOI_AINT_CAST_TO_OFFSET old_extent;
549 			flat->blocklens[nonzeroth] = flat->blocklens[j-num];
550 			j++;
551 			nonzeroth++;
552 		    } else {
553 			flat->count--;
554 		    }
555 		}
556 		*curr_index = j;
557 		for (m=1; m<ints[1+i]; m++) {
558                     for (k=0, nonzeroth=j; k<basic_num; k++) {
559 			if (flat->blocklens[j-basic_num] > 0) {
560 			    flat->indices[nonzeroth] =
561 				flat->indices[j-basic_num] + ADIOI_AINT_CAST_TO_OFFSET old_extent;
562 			    flat->blocklens[nonzeroth] = flat->blocklens[j-basic_num];
563 			    j++;
564 			    nonzeroth++;
565 			} else {
566 			    flat->count --;
567 			}
568                     }
569                 }
570 		*curr_index = j;
571 	    }
572 	}
573 	break;
574 
575 #if defined HAVE_DECL_MPI_COMBINER_HINDEXED_BLOCK && HAVE_DECL_MPI_COMBINER_HINDEXED_BLOCK
576     case MPI_COMBINER_HINDEXED_BLOCK:
577 	is_hindexed_block=1;
578 	/* deliberate fall-through */
579 #endif
580     case MPI_COMBINER_INDEXED_BLOCK:
581     #ifdef FLATTEN_DEBUG
582     DBG_FPRINTF(stderr,"ADIOI_Flatten:: MPI_COMBINER_INDEXED_BLOCK\n");
583     #endif
584 	top_count = ints[0];
585         ADIOI_Type_get_envelope(types[0], &old_nints, &old_nadds,
586 			      &old_ntypes, &old_combiner);
587         ADIOI_Datatype_iscontig(types[0], &old_is_contig);
588 	MPI_Type_extent(types[0], &old_extent);
589 
590 	prev_index = *curr_index;
591 	if ((old_combiner != MPI_COMBINER_NAMED) && (!old_is_contig))
592   {
593       /* By using ADIO_Offset we preserve +/- sign and
594          avoid >2G integer arithmetic problems */
595       ADIO_Offset stride = ints[1+1];
596 	if (is_hindexed_block) {
597 	    ADIOI_Flatten(types[0], flat,
598 		    st_offset+adds[0], curr_index);
599 	} else {
600 	    ADIOI_Flatten(types[0], flat,
601 		    st_offset+stride* ADIOI_AINT_CAST_TO_OFFSET old_extent, curr_index);
602 	}
603   }
604 
605 	if (prev_index == *curr_index) {
606 /* simplest case, indexed type made up of basic or contiguous types */
607 	    j = *curr_index;
608 	    for (i=j; i<j+top_count; i++) {
609       /* By using ADIO_Offset we preserve +/- sign and
610          avoid >2G integer arithmetic problems */
611 		ADIO_Offset blocklength = ints[1];
612 		if (is_hindexed_block) {
613 		    flat->indices[i] = st_offset + adds[i-j];
614 		} else {
615 		    ADIO_Offset stride = ints[1+1+i-j];
616 		    flat->indices[i] = st_offset +
617 			stride* ADIOI_AINT_CAST_TO_OFFSET old_extent;
618 		}
619 		flat->blocklens[i] = blocklength* ADIOI_AINT_CAST_TO_OFFSET old_extent;
620 	    }
621 	    *curr_index = i;
622 	}
623 	else {
624 /* vector of noncontiguous derived types */
625 
626 	    j = *curr_index;
627 	    num = *curr_index - prev_index;
628 
629 /* The noncontiguous types have to be replicated blocklens[i] times
630    and then strided. Replicate the first one. */
631 	    for (m=1; m<ints[1]; m++) {
632 		for (i=0; i<num; i++) {
633 		    if (is_hindexed_block) {
634 			/* this is the one place the hindexed case uses the
635 			 * extent of a type */
636 			MPI_Type_extent(types[0], &old_extent);
637 		    }
638 		    flat->indices[j] = flat->indices[j-num] +
639 			ADIOI_AINT_CAST_TO_OFFSET old_extent;
640 		    flat->blocklens[j] = flat->blocklens[j-num];
641 		    j++;
642 		}
643 	    }
644 	    *curr_index = j;
645 
646 /* Now repeat with strides. */
647 	    num = *curr_index - prev_index;
648 	    for (i=1; i<top_count; i++) {
649 		for (m=0; m<num; m++) {
650 		    if (is_hindexed_block) {
651 			flat->indices[j] = flat->indices[j-num] +
652 			    adds[i] - adds[i-1];
653 		    } else {
654 			/* By using ADIO_Offset we preserve +/- sign and
655 			   avoid >2G integer arithmetic problems */
656 			ADIO_Offset stride = ints[2+i]-ints[1+i];
657 			flat->indices[j] = flat->indices[j-num] +
658 			    stride* ADIOI_AINT_CAST_TO_OFFSET old_extent;
659 		    }
660 		    flat->blocklens[j] = flat->blocklens[j-num];
661 		    j++;
662 		}
663 	    }
664 	    *curr_index = j;
665 	}
666 	break;
667 
668     case MPI_COMBINER_HINDEXED:
669     case MPI_COMBINER_HINDEXED_INTEGER:
670     #ifdef FLATTEN_DEBUG
671     DBG_FPRINTF(stderr,"ADIOI_Flatten:: MPI_COMBINER_HINDEXED_INTEGER\n");
672     #endif
673 	top_count = ints[0];
674         ADIOI_Type_get_envelope(types[0], &old_nints, &old_nadds,
675 			      &old_ntypes, &old_combiner);
676         ADIOI_Datatype_iscontig(types[0], &old_is_contig);
677 
678 	prev_index = *curr_index;
679 	if ((old_combiner != MPI_COMBINER_NAMED) && (!old_is_contig))
680   {
681         ADIOI_Flatten(types[0], flat, st_offset+adds[0], curr_index);
682   }
683 
684 	if (prev_index == *curr_index) {
685 /* simplest case, indexed type made up of basic or contiguous types */
686 	    j = *curr_index;
687 	    MPI_Type_size_x(types[0], &old_size);
688 	    for (i=j, nonzeroth=j; i<j+top_count; i++) {
689 		if (ints[1+i-j] > 0) {
690 		    /* By using ADIO_Offset we preserve +/- sign and
691 		       avoid >2G integer arithmetic problems */
692 		    ADIO_Offset blocklength = ints[1+i-j];
693 		    flat->indices[nonzeroth] = st_offset + adds[i-j];
694 		    flat->blocklens[nonzeroth] = blocklength*old_size;
695 		    nonzeroth++;
696 		} else {
697 		    flat->count--;
698 		}
699 	    }
700 	    *curr_index = i;
701 	}
702 	else {
703 /* indexed type made up of noncontiguous derived types */
704 
705 	    j = *curr_index;
706 	    num = *curr_index - prev_index;
707 	    basic_num = num;
708 
709 /* The noncontiguous types have to be replicated blocklens[i] times
710    and then strided. Replicate the first one. */
711 	    MPI_Type_extent(types[0], &old_extent);
712 	    for (m=1; m<ints[1]; m++) {
713 		for (i=0, nonzeroth=j; i<num; i++) {
714 		    if (flat->blocklens[j-num] > 0) {
715 			flat->indices[nonzeroth] =
716 			    flat->indices[j-num] + ADIOI_AINT_CAST_TO_OFFSET old_extent;
717 			flat->blocklens[nonzeroth] = flat->blocklens[j-num];
718 			j++;
719 			nonzeroth++;
720 		    } else {
721 			flat->count--;
722 		    }
723 		}
724 	    }
725 	    *curr_index = j;
726 
727 /* Now repeat with strides. */
728 	    for (i=1; i<top_count; i++) {
729 		num = *curr_index - prev_index;
730 		prev_index = *curr_index;
731 		for (m=0, nonzeroth=j; m<basic_num; m++) {
732 		    if (flat->blocklens[j-num] > 0) {
733 			flat->indices[nonzeroth] =
734 			    flat->indices[j-num] + adds[i] - adds[i-1];
735 			flat->blocklens[nonzeroth] = flat->blocklens[j-num];
736 			j++;
737 			nonzeroth++;
738 		    } else {
739 			flat->count--;
740 		    }
741 		}
742 		*curr_index = j;
743 		for (m=1; m<ints[1+i]; m++) {
744 		    for (k=0,nonzeroth=j; k<basic_num; k++) {
745 			if (flat->blocklens[j-basic_num] >0) {
746 			    flat->indices[nonzeroth] =
747 				flat->indices[j-basic_num] + ADIOI_AINT_CAST_TO_OFFSET old_extent;
748 			    flat->blocklens[nonzeroth] = flat->blocklens[j-basic_num];
749 			    j++;
750 			    nonzeroth++;
751 			}
752 		    }
753 		}
754 		*curr_index = j;
755 	    }
756 	}
757 	break;
758 
759     case MPI_COMBINER_STRUCT:
760     case MPI_COMBINER_STRUCT_INTEGER:
761     #ifdef FLATTEN_DEBUG
762     DBG_FPRINTF(stderr,"ADIOI_Flatten:: MPI_COMBINER_STRUCT_INTEGER\n");
763     #endif
764 	top_count = ints[0];
765 	for (n=0; n<top_count; n++) {
766 	    ADIOI_Type_get_envelope(types[n], &old_nints, &old_nadds,
767 				  &old_ntypes, &old_combiner);
768             ADIOI_Datatype_iscontig(types[n], &old_is_contig);
769 
770 	    prev_index = *curr_index;
771             if ((old_combiner != MPI_COMBINER_NAMED) && (!old_is_contig))
772 		ADIOI_Flatten(types[n], flat, st_offset+adds[n], curr_index);
773 
774 	    if (prev_index == *curr_index) {
775 /* simplest case, current type is basic or contiguous types */
776         /* By using ADIO_Offset we preserve +/- sign and
777            avoid >2G integer arithmetic problems */
778 		if (ints[1+n] > 0 || types[n] == MPI_LB || types[n] == MPI_UB) {
779 		    ADIO_Offset blocklength = ints[1+n];
780 		    j = *curr_index;
781 		    flat->indices[j] = st_offset + adds[n];
782 		    MPI_Type_size_x(types[n], &old_size);
783 		    flat->blocklens[j] = blocklength * old_size;
784 #ifdef FLATTEN_DEBUG
785 		    DBG_FPRINTF(stderr,"ADIOI_Flatten:: simple adds[%#X] "MPI_AINT_FMT_HEX_SPEC", flat->indices[%#llX] %#llX, flat->blocklens[%#llX] %#llX\n",n,adds[n],j, flat->indices[j], j, flat->blocklens[j]);
786 #endif
787 		    (*curr_index)++;
788 		}
789 	    }
790 	    else {
791 /* current type made up of noncontiguous derived types */
792 
793 		j = *curr_index;
794 		num = *curr_index - prev_index;
795 
796 /* The current type has to be replicated blocklens[n] times */
797 		MPI_Type_extent(types[n], &old_extent);
798 		for (m=1; m<ints[1+n]; m++) {
799 		    for (i=0; i<num; i++) {
800 			flat->indices[j] =
801 			    flat->indices[j-num] + ADIOI_AINT_CAST_TO_OFFSET old_extent;
802 			flat->blocklens[j] = flat->blocklens[j-num];
803 #ifdef FLATTEN_DEBUG
804 			DBG_FPRINTF(stderr,"ADIOI_Flatten:: simple old_extent "MPI_AINT_FMT_HEX_SPEC", flat->indices[%#llX] %#llX, flat->blocklens[%#llX] %#llX\n",old_extent,j, flat->indices[j], j, flat->blocklens[j]);
805 #endif
806 			j++;
807 		    }
808 		}
809 		*curr_index = j;
810 	    }
811 	}
812  	break;
813 
814     case MPI_COMBINER_RESIZED:
815     #ifdef FLATTEN_DEBUG
816     DBG_FPRINTF(stderr,"ADIOI_Flatten:: MPI_COMBINER_RESIZED\n");
817     #endif
818 
819     /* This is done similar to a type_struct with an lb, datatype, ub */
820 
821     /* handle the Lb */
822 	j = *curr_index;
823 	flat->indices[j] = st_offset + adds[0];
824 	/* this zero-length blocklens[] element, unlike eleswhere in the
825 	 * flattening code, is correct and is used to indicate a lower bound
826 	 * marker */
827 	flat->blocklens[j] = 0;
828 
829         #ifdef FLATTEN_DEBUG
830         DBG_FPRINTF(stderr,"ADIOI_Flatten:: simple adds[%#X] "MPI_AINT_FMT_HEX_SPEC", flat->indices[%#llX] %#llX, flat->blocklens[%#llX] %#llX\n",0,adds[0],j, flat->indices[j], j, flat->blocklens[j]);
831         #endif
832 
833 	(*curr_index)++;
834 
835 	/* handle the datatype */
836 
837 	ADIOI_Type_get_envelope(types[0], &old_nints, &old_nadds,
838 			      &old_ntypes, &old_combiner);
839 	ADIOI_Datatype_iscontig(types[0], &old_is_contig);
840 
841 	if ((old_combiner != MPI_COMBINER_NAMED) && (!old_is_contig)) {
842 	    ADIOI_Flatten(types[0], flat, st_offset+adds[0], curr_index);
843 	}
844 	else {
845             /* current type is basic or contiguous */
846 	    j = *curr_index;
847 	    flat->indices[j] = st_offset;
848 	    MPI_Type_size_x(types[0], &old_size);
849 	    flat->blocklens[j] = old_size;
850 
851             #ifdef FLATTEN_DEBUG
852 	    DBG_FPRINTF(stderr,"ADIOI_Flatten:: simple adds[%#X] "MPI_AINT_FMT_HEX_SPEC", flat->indices[%#llX] %#llX, flat->blocklens[%#llX] %#llX\n",0,adds[0],j, flat->indices[j], j, flat->blocklens[j]);
853             #endif
854 
855 	    (*curr_index)++;
856 	}
857 
858 	/* take care of the extent as a UB */
859 	j = *curr_index;
860 	flat->indices[j] = st_offset + adds[0] + adds[1];
861 	/* again, zero-element ok: an upper-bound marker explicitly set by the
862 	 * constructor of this resized type */
863 	flat->blocklens[j] = 0;
864 
865         #ifdef FLATTEN_DEBUG
866         DBG_FPRINTF(stderr,"ADIOI_Flatten:: simple adds[%#X] "MPI_AINT_FMT_HEX_SPEC", flat->indices[%#llX] %#llX, flat->blocklens[%#llX] %#llX\n",1,adds[1],j, flat->indices[j], j, flat->blocklens[j]);
867         #endif
868 
869 	(*curr_index)++;
870 
871  	break;
872 
873     default:
874 	/* TODO: FIXME (requires changing prototypes to return errors...) */
875 	DBG_FPRINTF(stderr, "Error: Unsupported datatype passed to ADIOI_Flatten\n");
876 	MPI_Abort(MPI_COMM_WORLD, 1);
877     }
878 
879 #ifndef MPISGI
880 /* There is a bug in SGI's impl. of MPI_Type_get_contents. It doesn't
881    return new datatypes. Therefore no need to free. */
882     for (i=0; i<ntypes; i++) {
883  	MPI_Type_get_envelope(types[i], &old_nints, &old_nadds, &old_ntypes,
884  			      &old_combiner);
885  	if (old_combiner != MPI_COMBINER_NAMED) MPI_Type_free(types+i);
886     }
887 #endif
888 
889     ADIOI_Free(ints);
890     ADIOI_Free(adds);
891     ADIOI_Free(types);
892 
893   #ifdef FLATTEN_DEBUG
894   DBG_FPRINTF(stderr,"ADIOI_Flatten:: return st_offset %#llX, curr_index %#llX\n",st_offset,*curr_index);
895   #endif
896 
897 }
898 
899 /********************************************************/
900 
901 /* ADIOI_Count_contiguous_blocks
902  *
903  * Returns number of contiguous blocks in type, and also updates
904  * curr_index to reflect the space for the additional blocks.
905  *
906  * ASSUMES THAT TYPE IS NOT A BASIC!!!
907  */
ADIOI_Count_contiguous_blocks(MPI_Datatype datatype,MPI_Count * curr_index)908 MPI_Count ADIOI_Count_contiguous_blocks(MPI_Datatype datatype, MPI_Count *curr_index)
909 {
910     int i, n;
911     MPI_Count count=0, prev_index, num, basic_num;
912     int top_count, combiner, old_combiner, old_is_contig;
913     int nints, nadds, ntypes, old_nints, old_nadds, old_ntypes;
914     int *ints;
915     MPI_Aint *adds; /* Make no assumptions about +/- sign on these */
916     MPI_Datatype *types;
917 
918     ADIOI_Type_get_envelope(datatype, &nints, &nadds, &ntypes, &combiner);
919     if (combiner == MPI_COMBINER_NAMED) {
920         return 1;  /* builtin types not supposed to be passed to this routine
921                     */
922     }
923     ints = (int *) ADIOI_Malloc((nints+1)*sizeof(int));
924     adds = (MPI_Aint *) ADIOI_Malloc((nadds+1)*sizeof(MPI_Aint));
925     types = (MPI_Datatype *) ADIOI_Malloc((ntypes+1)*sizeof(MPI_Datatype));
926     ADIOI_Type_get_contents(datatype, nints, nadds, ntypes, ints, adds, types);
927 
928     switch (combiner) {
929 #ifdef MPIIMPL_HAVE_MPI_COMBINER_DUP
930     case MPI_COMBINER_DUP:
931         ADIOI_Type_get_envelope(types[0], &old_nints, &old_nadds,
932                               &old_ntypes, &old_combiner);
933 	ADIOI_Datatype_iscontig(types[0], &old_is_contig);
934 	if ((old_combiner != MPI_COMBINER_NAMED) && (!old_is_contig))
935 	    count = ADIOI_Count_contiguous_blocks(types[0], curr_index);
936 	else {
937 		count = 1;
938 		(*curr_index)++;
939 	}
940         break;
941 #endif
942 #ifdef MPIIMPL_HAVE_MPI_COMBINER_SUBARRAY
943     case MPI_COMBINER_SUBARRAY:
944         {
945 	    int dims = ints[0];
946 	    MPI_Datatype stype;
947 
948 	    ADIO_Type_create_subarray(dims,
949 				      &ints[1],        /* sizes */
950 				      &ints[dims+1],   /* subsizes */
951 				      &ints[2*dims+1], /* starts */
952 				      ints[3*dims+1],  /* order */
953 				      types[0],        /* type */
954 				      &stype);
955 	    count = ADIOI_Count_contiguous_blocks(stype, curr_index);
956 	    /* curr_index will have already been updated; just pass
957 	     * count back up.
958 	     */
959 	    MPI_Type_free(&stype);
960 
961 	}
962 	break;
963 #endif
964 #ifdef MPIIMPL_HAVE_MPI_COMBINER_DARRAY
965     case MPI_COMBINER_DARRAY:
966 	{
967 	    int dims = ints[2];
968 	    MPI_Datatype dtype;
969 
970 	    ADIO_Type_create_darray(ints[0],         /* size */
971 				    ints[1],         /* rank */
972 				    dims,
973 				    &ints[3],        /* gsizes */
974 				    &ints[dims+3],   /* distribs */
975 				    &ints[2*dims+3], /* dargs */
976 				    &ints[3*dims+3], /* psizes */
977 				    ints[4*dims+3],  /* order */
978 				    types[0],
979 				    &dtype);
980 	    count = ADIOI_Count_contiguous_blocks(dtype, curr_index);
981 	    /* curr_index will have already been updated; just pass
982 	     * count back up.
983 	     */
984 	    MPI_Type_free(&dtype);
985 	}
986 	break;
987 #endif
988     case MPI_COMBINER_CONTIGUOUS:
989         top_count = ints[0];
990         ADIOI_Type_get_envelope(types[0], &old_nints, &old_nadds,
991                               &old_ntypes, &old_combiner);
992 	ADIOI_Datatype_iscontig(types[0], &old_is_contig);
993 
994 	prev_index = *curr_index;
995 	if ((old_combiner != MPI_COMBINER_NAMED) && (!old_is_contig))
996 	    count = ADIOI_Count_contiguous_blocks(types[0], curr_index);
997 	else count = 1;
998 
999 	if (prev_index == *curr_index)
1000 /* simplest case, made up of basic or contiguous types */
1001 	    (*curr_index)++;
1002 	else {
1003 /* made up of noncontiguous derived types */
1004 	    num = *curr_index - prev_index;
1005 	    count *= top_count;
1006 	    *curr_index += (top_count - 1)*num;
1007 	}
1008 	break;
1009 
1010     case MPI_COMBINER_VECTOR:
1011     case MPI_COMBINER_HVECTOR:
1012     case MPI_COMBINER_HVECTOR_INTEGER:
1013         top_count = ints[0];
1014         ADIOI_Type_get_envelope(types[0], &old_nints, &old_nadds,
1015                               &old_ntypes, &old_combiner);
1016 	ADIOI_Datatype_iscontig(types[0], &old_is_contig);
1017 
1018 	prev_index = *curr_index;
1019 	if ((old_combiner != MPI_COMBINER_NAMED) && (!old_is_contig))
1020 	    count = ADIOI_Count_contiguous_blocks(types[0], curr_index);
1021 	else count = 1;
1022 
1023 	if (prev_index == *curr_index) {
1024 /* simplest case, vector of basic or contiguous types */
1025 	    count = top_count;
1026 	    *curr_index += count;
1027 	}
1028 	else {
1029 /* vector of noncontiguous derived types */
1030 	    num = *curr_index - prev_index;
1031 
1032 /* The noncontiguous types have to be replicated blocklen times
1033    and then strided. */
1034 	    count *= ints[1] * top_count;
1035 
1036 /* First one */
1037 	    *curr_index += (ints[1] - 1)*num;
1038 
1039 /* Now repeat with strides. */
1040 	    num = *curr_index - prev_index;
1041 	    *curr_index += (top_count - 1)*num;
1042 	}
1043 	break;
1044 
1045     case MPI_COMBINER_INDEXED:
1046     case MPI_COMBINER_HINDEXED:
1047     case MPI_COMBINER_HINDEXED_INTEGER:
1048         top_count = ints[0];
1049         ADIOI_Type_get_envelope(types[0], &old_nints, &old_nadds,
1050                               &old_ntypes, &old_combiner);
1051 	ADIOI_Datatype_iscontig(types[0], &old_is_contig);
1052 
1053 	prev_index = *curr_index;
1054 	if ((old_combiner != MPI_COMBINER_NAMED) && (!old_is_contig))
1055 	    count = ADIOI_Count_contiguous_blocks(types[0], curr_index);
1056 	else count = 1;
1057 
1058 	if (prev_index == *curr_index) {
1059 /* simplest case, indexed type made up of basic or contiguous types */
1060 	    count = top_count;
1061 	    *curr_index += count;
1062 	}
1063 	else {
1064 /* indexed type made up of noncontiguous derived types */
1065 	    basic_num = *curr_index - prev_index;
1066 
1067 /* The noncontiguous types have to be replicated blocklens[i] times
1068    and then strided. */
1069 	    *curr_index += (ints[1]-1) * basic_num;
1070 	    count *= ints[1];
1071 
1072 /* Now repeat with strides. */
1073 	    for (i=1; i<top_count; i++) {
1074 		count += ints[1+i] * basic_num;
1075 		*curr_index += ints[1+i] * basic_num;
1076 	    }
1077 	}
1078 	break;
1079 
1080 #if defined HAVE_DECL_MPI_COMBINER_HINDEXED_BLOCK && HAVE_DECL_MPI_COMBINER_HINDEXED_BLOCK
1081     case MPI_COMBINER_HINDEXED_BLOCK:
1082 #endif
1083     case MPI_COMBINER_INDEXED_BLOCK:
1084         top_count = ints[0];
1085         ADIOI_Type_get_envelope(types[0], &old_nints, &old_nadds,
1086                               &old_ntypes, &old_combiner);
1087 	ADIOI_Datatype_iscontig(types[0], &old_is_contig);
1088 
1089 	prev_index = *curr_index;
1090 	if ((old_combiner != MPI_COMBINER_NAMED) && (!old_is_contig))
1091 	    count = ADIOI_Count_contiguous_blocks(types[0], curr_index);
1092 	else count = 1;
1093 
1094 	if (prev_index == *curr_index) {
1095 /* simplest case, indexed type made up of basic or contiguous types */
1096 	    count = top_count;
1097 	    *curr_index += count;
1098 	}
1099 	else {
1100 /* indexed type made up of noncontiguous derived types */
1101 	    basic_num = *curr_index - prev_index;
1102 
1103 /* The noncontiguous types have to be replicated blocklens[i] times
1104    and then strided. */
1105 	    *curr_index += (ints[1]-1) * basic_num;
1106 	    count *= ints[1];
1107 
1108 /* Now repeat with strides. */
1109 	    *curr_index += (top_count-1) * count;
1110 	    count *= top_count;
1111 	}
1112 	break;
1113 
1114     case MPI_COMBINER_STRUCT:
1115     case MPI_COMBINER_STRUCT_INTEGER:
1116         top_count = ints[0];
1117 	count = 0;
1118 	for (n=0; n<top_count; n++) {
1119             ADIOI_Type_get_envelope(types[n], &old_nints, &old_nadds,
1120                                   &old_ntypes, &old_combiner);
1121 	    ADIOI_Datatype_iscontig(types[n], &old_is_contig);
1122 
1123 	    prev_index = *curr_index;
1124 	    if ((old_combiner != MPI_COMBINER_NAMED) && (!old_is_contig))
1125 	    count += ADIOI_Count_contiguous_blocks(types[n], curr_index);
1126 
1127 	    if (prev_index == *curr_index) {
1128 /* simplest case, current type is basic or contiguous types */
1129 		count++;
1130 		(*curr_index)++;
1131 	    }
1132 	    else {
1133 /* current type made up of noncontiguous derived types */
1134 /* The current type has to be replicated blocklens[n] times */
1135 
1136 		num = *curr_index - prev_index;
1137 		count += (ints[1+n]-1)*num;
1138 		(*curr_index) += (ints[1+n]-1)*num;
1139 	    }
1140 	}
1141 	break;
1142 
1143     case MPI_COMBINER_RESIZED:
1144 	/* treat it as a struct with lb, type, ub */
1145 
1146 	/* add 2 for lb and ub */
1147 	(*curr_index) += 2;
1148 	count += 2;
1149 
1150 	/* add for datatype */
1151 	ADIOI_Type_get_envelope(types[0], &old_nints, &old_nadds,
1152                                   &old_ntypes, &old_combiner);
1153 	ADIOI_Datatype_iscontig(types[0], &old_is_contig);
1154 
1155 	if ((old_combiner != MPI_COMBINER_NAMED) && (!old_is_contig)) {
1156 	    count += ADIOI_Count_contiguous_blocks(types[0], curr_index);
1157 	}
1158 	else {
1159         /* basic or contiguous type */
1160 	    count++;
1161 	    (*curr_index)++;
1162 	}
1163 	break;
1164 
1165     default:
1166 	/* TODO: FIXME */
1167 	DBG_FPRINTF(stderr, "Error: Unsupported datatype passed to ADIOI_Count_contiguous_blocks, combiner = %d\n", combiner);
1168 	MPI_Abort(MPI_COMM_WORLD, 1);
1169     }
1170 
1171 #ifndef MPISGI
1172 /* There is a bug in SGI's impl. of MPI_Type_get_contents. It doesn't
1173    return new datatypes. Therefore no need to free. */
1174     for (i=0; i<ntypes; i++) {
1175  	MPI_Type_get_envelope(types[i], &old_nints, &old_nadds, &old_ntypes,
1176  			      &old_combiner);
1177  	if (old_combiner != MPI_COMBINER_NAMED) MPI_Type_free(types+i);
1178     }
1179 #endif
1180 
1181     ADIOI_Free(ints);
1182     ADIOI_Free(adds);
1183     ADIOI_Free(types);
1184     return count;
1185 }
1186 
1187 
1188 /****************************************************************/
1189 
1190 /* ADIOI_Optimize_flattened()
1191  *
1192  * Scans the blocks of a flattened type and merges adjacent blocks
1193  * together, resulting in a shorter blocklist (and thus fewer
1194  * contiguous operations).
1195  *
1196  * NOTE: a further optimization would be to remove zero length blocks. However,
1197  * the first and last blocks must remain as zero length first or last block
1198  * indicates UB and LB.  Furthermore, once the "zero length blocklen" fix
1199  * went in, the flattened representation should no longer have zero-length
1200  * blocks except for UB and LB markers.
1201  */
ADIOI_Optimize_flattened(ADIOI_Flatlist_node * flat_type)1202 void ADIOI_Optimize_flattened(ADIOI_Flatlist_node *flat_type)
1203 {
1204     int i, j, opt_blocks;
1205     ADIO_Offset *opt_blocklens;
1206     ADIO_Offset *opt_indices;
1207 
1208     opt_blocks = 1;
1209 
1210     /* save number of noncontiguous blocks in opt_blocks */
1211     for (i=0; i < (flat_type->count - 1); i++) {
1212         if ((flat_type->indices[i] + flat_type->blocklens[i] !=
1213 	     flat_type->indices[i + 1]))
1214 	    opt_blocks++;
1215     }
1216 
1217     /* if we can't reduce the number of blocks, quit now */
1218     if (opt_blocks == flat_type->count) return;
1219 
1220     opt_blocklens = (ADIO_Offset *) ADIOI_Malloc(opt_blocks * sizeof(ADIO_Offset));
1221     opt_indices = (ADIO_Offset *)ADIOI_Malloc(opt_blocks*sizeof(ADIO_Offset));
1222 
1223     /* fill in new blocklists */
1224     opt_blocklens[0] = flat_type->blocklens[0];
1225     opt_indices[0] = flat_type->indices[0];
1226     j = 0;
1227     for (i=0; i < (flat_type->count - 1); i++) {
1228 	if ((flat_type->indices[i] + flat_type->blocklens[i] ==
1229 	     flat_type->indices[i + 1]))
1230 	    opt_blocklens[j] += flat_type->blocklens[i + 1];
1231 	else {
1232 	    j++;
1233 	    opt_indices[j] = flat_type->indices[i + 1];
1234 	    opt_blocklens[j] = flat_type->blocklens[i + 1];
1235 	}
1236     }
1237     flat_type->count = opt_blocks;
1238     ADIOI_Free(flat_type->blocklens);
1239     ADIOI_Free(flat_type->indices);
1240     flat_type->blocklens = opt_blocklens;
1241     flat_type->indices = opt_indices;
1242     return;
1243 }
1244 
ADIOI_Delete_flattened(MPI_Datatype datatype)1245 void ADIOI_Delete_flattened(MPI_Datatype datatype)
1246 {
1247     ADIOI_Flatlist_node *flat, *prev;
1248 
1249     prev = flat = ADIOI_Flatlist;
1250     while (flat && (flat->type != datatype)) {
1251 	prev = flat;
1252 	flat = flat->next;
1253     }
1254     if (flat) {
1255 	prev->next = flat->next;
1256 	if (flat->blocklens) ADIOI_Free(flat->blocklens);
1257 	if (flat->indices) ADIOI_Free(flat->indices);
1258 	ADIOI_Free(flat);
1259     }
1260 }
1261