1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
2 /*
3  *
4  *  (C) 2001 by Argonne National Laboratory.
5  *      See COPYRIGHT in top-level directory.
6  */
7 
8 #include "mpiimpl.h"
9 
10 /* -- Begin Profiling Symbol Block for routine MPI_Type_create_darray */
11 #if defined(HAVE_PRAGMA_WEAK)
12 #pragma weak MPI_Type_create_darray = PMPI_Type_create_darray
13 #elif defined(HAVE_PRAGMA_HP_SEC_DEF)
14 #pragma _HP_SECONDARY_DEF PMPI_Type_create_darray  MPI_Type_create_darray
15 #elif defined(HAVE_PRAGMA_CRI_DUP)
16 #pragma _CRI duplicate MPI_Type_create_darray as PMPI_Type_create_darray
17 #endif
18 /* -- End Profiling Symbol Block */
19 
20 #ifndef MIN
21 #define MIN(__a, __b) (((__a) < (__b)) ? (__a) : (__b))
22 #endif
23 
24 PMPI_LOCAL int MPIR_Type_block(const int *array_of_gsizes,
25 			       int dim,
26 			       int ndims,
27 			       int nprocs,
28 			       int rank,
29 			       int darg,
30 			       int order,
31 			       MPI_Aint orig_extent,
32 			       MPI_Datatype type_old,
33 			       MPI_Datatype *type_new,
34 			       MPI_Aint *st_offset);
35 PMPI_LOCAL int MPIR_Type_cyclic(const int *array_of_gsizes,
36 				int dim,
37 				int ndims,
38 				int nprocs,
39 				int rank,
40 				int darg,
41 				int order,
42 				MPI_Aint orig_extent,
43 				MPI_Datatype type_old,
44 				MPI_Datatype *type_new,
45 				MPI_Aint *st_offset);
46 
47 /* Define MPICH_MPI_FROM_PMPI if weak symbols are not supported to build
48    the MPI routines */
49 #ifndef MPICH_MPI_FROM_PMPI
50 #undef MPI_Type_create_darray
51 #define MPI_Type_create_darray PMPI_Type_create_darray
52 
53 
54 
MPIR_Type_block(const int * array_of_gsizes,int dim,int ndims,int nprocs,int rank,int darg,int order,MPI_Aint orig_extent,MPI_Datatype type_old,MPI_Datatype * type_new,MPI_Aint * st_offset)55 PMPI_LOCAL int MPIR_Type_block(const int *array_of_gsizes,
56 			       int dim,
57 			       int ndims,
58 			       int nprocs,
59 			       int rank,
60 			       int darg,
61 			       int order,
62 			       MPI_Aint orig_extent,
63 			       MPI_Datatype type_old,
64 			       MPI_Datatype *type_new,
65 			       MPI_Aint *st_offset)
66 {
67 /* nprocs = no. of processes in dimension dim of grid
68    rank = coordinate of this process in dimension dim */
69     static const char FCNAME[] = "MPIR_Type_block";
70     int mpi_errno, blksize, global_size, mysize, i, j;
71     MPI_Aint stride;
72 
73     global_size = array_of_gsizes[dim];
74 
75     if (darg == MPI_DISTRIBUTE_DFLT_DARG)
76 	blksize = (global_size + nprocs - 1)/nprocs;
77     else {
78 	blksize = darg;
79 
80 #ifdef HAVE_ERROR_CHECKING
81 	if (blksize <= 0) {
82 	    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS,
83 					     MPIR_ERR_RECOVERABLE,
84 					     FCNAME,
85 					     __LINE__,
86 					     MPI_ERR_ARG,
87 					     "**darrayblock",
88 					     "**darrayblock %d",
89 					     blksize);
90 	    return mpi_errno;
91 	}
92 	if (blksize * nprocs < global_size) {
93 	    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS,
94 					     MPIR_ERR_RECOVERABLE,
95 					     FCNAME,
96 					     __LINE__,
97 					     MPI_ERR_ARG,
98 					     "**darrayblock2",
99 					     "**darrayblock2 %d %d",
100 					     blksize*nprocs,
101 					     global_size);
102 	    return mpi_errno;
103 	}
104 #endif
105     }
106 
107     j = global_size - blksize*rank;
108     mysize = MIN(blksize, j);
109     if (mysize < 0) mysize = 0;
110 
111     stride = orig_extent;
112     if (order == MPI_ORDER_FORTRAN) {
113 	if (dim == 0) {
114 	    mpi_errno = MPID_Type_contiguous(mysize,
115 					     type_old,
116 					     type_new);
117 	    /* --BEGIN ERROR HANDLING-- */
118 	    if (mpi_errno != MPI_SUCCESS)
119 	    {
120 		mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
121 		return mpi_errno;
122 	    }
123 	    /* --END ERROR HANDLING-- */
124 	}
125 	else {
126 	    for (i=0; i<dim; i++) stride *= (MPI_Aint)(array_of_gsizes[i]);
127 	    mpi_errno = MPID_Type_vector(mysize,
128 					 1,
129 					 stride,
130 					 1, /* stride in bytes */
131 					 type_old,
132 					 type_new);
133 	    /* --BEGIN ERROR HANDLING-- */
134 	    if (mpi_errno != MPI_SUCCESS)
135 	    {
136 		mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
137 		return mpi_errno;
138 	    }
139 	    /* --END ERROR HANDLING-- */
140 	}
141     }
142     else {
143 	if (dim == ndims-1) {
144 	    mpi_errno = MPID_Type_contiguous(mysize,
145 					     type_old,
146 					     type_new);
147 	    /* --BEGIN ERROR HANDLING-- */
148 	    if (mpi_errno != MPI_SUCCESS)
149 	    {
150 		mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
151 		return mpi_errno;
152 	    }
153 	    /* --END ERROR HANDLING-- */
154 	}
155 	else {
156 	    for (i=ndims-1; i>dim; i--) stride *= (MPI_Aint)(array_of_gsizes[i]);
157 	    mpi_errno = MPID_Type_vector(mysize,
158 					 1,
159 					 stride,
160 					 1, /* stride in bytes */
161 					 type_old,
162 					 type_new);
163 	    /* --BEGIN ERROR HANDLING-- */
164 	    if (mpi_errno != MPI_SUCCESS)
165 	    {
166 		mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
167 		return mpi_errno;
168 	    }
169 	    /* --END ERROR HANDLING-- */
170 	}
171     }
172 
173     *st_offset = (MPI_Aint) blksize * (MPI_Aint) rank;
174      /* in terms of no. of elements of type oldtype in this dimension */
175     if (mysize == 0) *st_offset = 0;
176 
177     return MPI_SUCCESS;
178 }
179 
180 
MPIR_Type_cyclic(const int * array_of_gsizes,int dim,int ndims,int nprocs,int rank,int darg,int order,MPI_Aint orig_extent,MPI_Datatype type_old,MPI_Datatype * type_new,MPI_Aint * st_offset)181 PMPI_LOCAL int MPIR_Type_cyclic(const int *array_of_gsizes,
182 				int dim,
183 				int ndims,
184 				int nprocs,
185 				int rank,
186 				int darg,
187 				int order,
188 				MPI_Aint orig_extent,
189 				MPI_Datatype type_old,
190 				MPI_Datatype *type_new,
191 				MPI_Aint *st_offset)
192 {
193 /* nprocs = no. of processes in dimension dim of grid
194    rank = coordinate of this process in dimension dim */
195     static const char FCNAME[] = "MPIR_Type_cyclic";
196     int mpi_errno,blksize, i, blklens[3], st_index, end_index,
197 	local_size, rem, count;
198     MPI_Aint stride, disps[3];
199     MPI_Datatype type_tmp, types[3];
200 
201     if (darg == MPI_DISTRIBUTE_DFLT_DARG) blksize = 1;
202     else blksize = darg;
203 
204 #ifdef HAVE_ERROR_CHECKING
205     if (blksize <= 0) {
206 	mpi_errno = MPIR_Err_create_code(MPI_SUCCESS,
207 					 MPIR_ERR_RECOVERABLE,
208 					 FCNAME,
209 					 __LINE__,
210 					 MPI_ERR_ARG,
211 					 "**darraycyclic",
212 					 "**darraycyclic %d",
213 					 blksize);
214 	return mpi_errno;
215     }
216 #endif
217 
218     st_index = rank*blksize;
219     end_index = array_of_gsizes[dim] - 1;
220 
221     if (end_index < st_index) local_size = 0;
222     else {
223 	local_size = ((end_index - st_index + 1)/(nprocs*blksize))*blksize;
224 	rem = (end_index - st_index + 1) % (nprocs*blksize);
225 	local_size += MIN(rem, blksize);
226     }
227 
228     count = local_size/blksize;
229     rem = local_size % blksize;
230 
231     stride = (MPI_Aint) nprocs * (MPI_Aint) blksize * orig_extent;
232     if (order == MPI_ORDER_FORTRAN)
233 	for (i=0; i<dim; i++) stride *= (MPI_Aint)(array_of_gsizes[i]);
234     else for (i=ndims-1; i>dim; i--) stride *= (MPI_Aint)(array_of_gsizes[i]);
235 
236     mpi_errno = MPID_Type_vector(count,
237 				 blksize,
238 				 stride,
239 				 1, /* stride in bytes */
240 				 type_old,
241 				 type_new);
242     /* --BEGIN ERROR HANDLING-- */
243     if (mpi_errno != MPI_SUCCESS)
244     {
245 	mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
246 	return mpi_errno;
247     }
248     /* --END ERROR HANDLING-- */
249 
250     if (rem) {
251 	/* if the last block is of size less than blksize, include
252 	   it separately using MPI_Type_struct */
253 
254 	types[0] = *type_new;
255 	types[1] = type_old;
256 	disps[0] = 0;
257 	disps[1] = (MPI_Aint) count * stride;
258 	blklens[0] = 1;
259 	blklens[1] = rem;
260 
261 	mpi_errno = MPID_Type_struct(2,
262 				     blklens,
263 				     disps,
264 				     types,
265 				     &type_tmp);
266 	MPIR_Type_free_impl(type_new);
267 	*type_new = type_tmp;
268 
269 	/* --BEGIN ERROR HANDLING-- */
270 	if (mpi_errno != MPI_SUCCESS)
271 	{
272 	    mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
273 	    return mpi_errno;
274 	}
275 	/* --END ERROR HANDLING-- */
276     }
277 
278     /* In the first iteration, we need to set the displacement in that
279        dimension correctly. */
280     if (((order == MPI_ORDER_FORTRAN) && (dim == 0)) ||
281 	((order == MPI_ORDER_C) && (dim == ndims-1)))
282     {
283         types[0] = MPI_LB;
284         disps[0] = 0;
285         types[1] = *type_new;
286         disps[1] = (MPI_Aint) rank * (MPI_Aint) blksize * orig_extent;
287         types[2] = MPI_UB;
288         disps[2] = orig_extent * (MPI_Aint)(array_of_gsizes[dim]);
289         blklens[0] = blklens[1] = blklens[2] = 1;
290         mpi_errno = MPID_Type_struct(3,
291 				     blklens,
292 				     disps,
293 				     types,
294 				     &type_tmp);
295         MPIR_Type_free_impl(type_new);
296         *type_new = type_tmp;
297 
298 	/* --BEGIN ERROR HANDLING-- */
299 	if (mpi_errno != MPI_SUCCESS)
300 	{
301 	    mpi_errno = MPIR_Err_create_code(mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**fail", 0);
302 	    return mpi_errno;
303 	}
304 	/* --END ERROR HANDLING-- */
305 
306         *st_offset = 0;  /* set it to 0 because it is taken care of in
307                             the struct above */
308     }
309     else {
310         *st_offset = (MPI_Aint) rank * (MPI_Aint) blksize;
311         /* st_offset is in terms of no. of elements of type oldtype in
312          * this dimension */
313     }
314 
315     if (local_size == 0) *st_offset = 0;
316 
317     return MPI_SUCCESS;
318 }
319 #endif
320 
321 #undef FUNCNAME
322 #define FUNCNAME MPI_Type_create_darray
323 
324 
325 /*@
326    MPI_Type_create_darray - Create a datatype representing a distributed array
327 
328    Input Parameters:
329 + size - size of process group (positive integer)
330 . rank - rank in process group (nonnegative integer)
331 . ndims - number of array dimensions as well as process grid dimensions (positive integer)
332 . array_of_gsizes - number of elements of type oldtype in each dimension of global array (array of positive integers)
333 . array_of_distribs - distribution of array in each dimension (array of state)
334 . array_of_dargs - distribution argument in each dimension (array of positive integers)
335 . array_of_psizes - size of process grid in each dimension (array of positive integers)
336 . order - array storage order flag (state)
337 - oldtype - old datatype (handle)
338 
339     Output Parameter:
340 . newtype - new datatype (handle)
341 
342 .N ThreadSafe
343 
344 .N Fortran
345 
346 .N Errors
347 .N MPI_SUCCESS
348 .N MPI_ERR_TYPE
349 .N MPI_ERR_ARG
350 @*/
MPI_Type_create_darray(int size,int rank,int ndims,MPICH2_CONST int array_of_gsizes[],MPICH2_CONST int array_of_distribs[],MPICH2_CONST int array_of_dargs[],MPICH2_CONST int array_of_psizes[],int order,MPI_Datatype oldtype,MPI_Datatype * newtype)351 int MPI_Type_create_darray(int size,
352 			   int rank,
353 			   int ndims,
354 			   MPICH2_CONST int array_of_gsizes[],
355 			   MPICH2_CONST int array_of_distribs[],
356 			   MPICH2_CONST int array_of_dargs[],
357 			   MPICH2_CONST int array_of_psizes[],
358 			   int order,
359 			   MPI_Datatype oldtype,
360 			   MPI_Datatype *newtype)
361 {
362     static const char FCNAME[] = "MPI_Type_create_darray";
363     int mpi_errno = MPI_SUCCESS, i;
364     MPI_Datatype new_handle;
365 
366     int procs, tmp_rank, tmp_size, blklens[3], *coords;
367     MPI_Aint *st_offsets, orig_extent, disps[3];
368     MPI_Datatype type_old, type_new = MPI_DATATYPE_NULL, types[3];
369 
370 #   ifdef HAVE_ERROR_CHECKING
371     MPI_Aint   size_with_aint;
372     MPI_Offset size_with_offset;
373 #   endif
374 
375     int *ints;
376     MPID_Datatype *datatype_ptr = NULL;
377     MPIU_CHKLMEM_DECL(3);
378     MPID_MPI_STATE_DECL(MPID_STATE_MPI_TYPE_CREATE_DARRAY);
379 
380     MPIR_ERRTEST_INITIALIZED_ORDIE();
381 
382     MPIU_THREAD_CS_ENTER(ALLFUNC,);
383     MPID_MPI_FUNC_ENTER(MPID_STATE_MPI_TYPE_CREATE_DARRAY);
384 
385     /* Validate parameters, especially handles needing to be converted */
386 #   ifdef HAVE_ERROR_CHECKING
387     {
388         MPID_BEGIN_ERROR_CHECKS;
389         {
390 	    MPIR_ERRTEST_DATATYPE(oldtype, "datatype", mpi_errno);
391         }
392         MPID_END_ERROR_CHECKS;
393     }
394 #   endif
395 
396     /* Convert MPI object handles to object pointers */
397     MPID_Datatype_get_ptr(oldtype, datatype_ptr);
398     MPID_Datatype_get_extent_macro(oldtype, orig_extent);
399 
400     /* Validate parameters and objects (post conversion) */
401 #   ifdef HAVE_ERROR_CHECKING
402     {
403         MPID_BEGIN_ERROR_CHECKS;
404         {
405 	    /* Check parameters */
406 	    MPIR_ERRTEST_ARGNEG(rank, "rank", mpi_errno);
407 	    MPIR_ERRTEST_ARGNONPOS(size, "size", mpi_errno);
408 	    MPIR_ERRTEST_ARGNONPOS(ndims, "ndims", mpi_errno);
409 
410 	    MPIR_ERRTEST_ARGNULL(array_of_gsizes, "array_of_gsizes", mpi_errno);
411 	    MPIR_ERRTEST_ARGNULL(array_of_distribs, "array_of_distribs", mpi_errno);
412 	    MPIR_ERRTEST_ARGNULL(array_of_dargs, "array_of_dargs", mpi_errno);
413 	    MPIR_ERRTEST_ARGNULL(array_of_psizes, "array_of_psizes", mpi_errno);
414 	    if (order != MPI_ORDER_C && order != MPI_ORDER_FORTRAN) {
415 		mpi_errno = MPIR_Err_create_code(MPI_SUCCESS,
416 						 MPIR_ERR_RECOVERABLE,
417 						 FCNAME,
418 						 __LINE__,
419 						 MPI_ERR_ARG,
420 						 "**arg",
421 						 "**arg %s",
422 						 "order");
423                 goto fn_fail;
424 	    }
425 
426 	    for (i=0; mpi_errno == MPI_SUCCESS && i < ndims; i++) {
427 		MPIR_ERRTEST_ARGNONPOS(array_of_gsizes[i], "gsize", mpi_errno);
428 		MPIR_ERRTEST_ARGNONPOS(array_of_psizes[i], "psize", mpi_errno);
429 
430 		if ((array_of_distribs[i] != MPI_DISTRIBUTE_NONE) &&
431 		    (array_of_distribs[i] != MPI_DISTRIBUTE_BLOCK) &&
432 		    (array_of_distribs[i] != MPI_DISTRIBUTE_CYCLIC))
433 		{
434 		    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS,
435 						     MPIR_ERR_RECOVERABLE,
436 						     FCNAME,
437 						     __LINE__,
438 						     MPI_ERR_ARG,
439 						     "**darrayunknown",
440 						     0);
441                     goto fn_fail;
442 		}
443 
444 		if ((array_of_dargs[i] != MPI_DISTRIBUTE_DFLT_DARG) &&
445 		    (array_of_dargs[i] <= 0))
446 		{
447 		    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS,
448 						     MPIR_ERR_RECOVERABLE,
449 						     FCNAME,
450 						     __LINE__,
451 						     MPI_ERR_ARG,
452 						     "**arg",
453 						     "**arg %s",
454 						     "array_of_dargs");
455                     goto fn_fail;
456 		}
457 
458 		if ((array_of_distribs[i] == MPI_DISTRIBUTE_NONE) &&
459 		    (array_of_psizes[i] != 1))
460 		{
461 		    mpi_errno = MPIR_Err_create_code(MPI_SUCCESS,
462 						     MPIR_ERR_RECOVERABLE,
463 						     FCNAME,
464 						     __LINE__,
465 						     MPI_ERR_ARG,
466 						     "**darraydist",
467 						     "**darraydist %d %d",
468 						     i, array_of_psizes[i]);
469                     goto fn_fail;
470 		}
471 	    }
472 
473 	    /* TODO: GET THIS CHECK IN ALSO */
474 
475 	    /* check if MPI_Aint is large enough for size of global array.
476 	       if not, complain. */
477 
478 	    size_with_aint = orig_extent;
479 	    for (i=0; i<ndims; i++) size_with_aint *= array_of_gsizes[i];
480 	    size_with_offset = orig_extent;
481 	    for (i=0; i<ndims; i++) size_with_offset *= array_of_gsizes[i];
482 	    if (size_with_aint != size_with_offset) {
483 		mpi_errno = MPIR_Err_create_code(MPI_SUCCESS,
484 						 MPIR_ERR_FATAL,
485 						 FCNAME,
486 						 __LINE__,
487 						 MPI_ERR_ARG,
488 						 "**darrayoverflow",
489 						 "**darrayoverflow %L",
490 						 size_with_offset);
491                 goto fn_fail;
492 	    }
493 
494             /* Validate datatype_ptr */
495             MPID_Datatype_valid_ptr(datatype_ptr, mpi_errno);
496 	    /* If datatype_ptr is not valid, it will be reset to null */
497 	    /* --BEGIN ERROR HANDLING-- */
498             if (mpi_errno) goto fn_fail;
499 	    /* --END ERROR HANDLING-- */
500         }
501         MPID_END_ERROR_CHECKS;
502     }
503 #   endif /* HAVE_ERROR_CHECKING */
504 
505     /* ... body of routine ... */
506 
507 /* calculate position in Cartesian grid as MPI would (row-major
508    ordering) */
509     MPIU_CHKLMEM_MALLOC_ORJUMP(coords, int *, ndims * sizeof(int), mpi_errno, "position is Cartesian grid");
510 
511     procs = size;
512     tmp_rank = rank;
513     for (i=0; i<ndims; i++) {
514 	procs = procs/array_of_psizes[i];
515 	coords[i] = tmp_rank/procs;
516 	tmp_rank = tmp_rank % procs;
517     }
518 
519     MPIU_CHKLMEM_MALLOC_ORJUMP(st_offsets, MPI_Aint *, ndims * sizeof(MPI_Aint), mpi_errno, "st_offsets");
520 
521     type_old = oldtype;
522 
523     if (order == MPI_ORDER_FORTRAN) {
524       /* dimension 0 changes fastest */
525 	for (i=0; i<ndims; i++) {
526 	    switch(array_of_distribs[i]) {
527 	    case MPI_DISTRIBUTE_BLOCK:
528 		mpi_errno = MPIR_Type_block(array_of_gsizes,
529 					    i,
530 					    ndims,
531 					    array_of_psizes[i],
532 					    coords[i],
533 					    array_of_dargs[i],
534 					    order,
535 					    orig_extent,
536 					    type_old,
537 					    &type_new,
538 					    st_offsets+i);
539 		break;
540 	    case MPI_DISTRIBUTE_CYCLIC:
541 		mpi_errno = MPIR_Type_cyclic(array_of_gsizes,
542 					     i,
543 					     ndims,
544 					     array_of_psizes[i],
545 					     coords[i],
546 					     array_of_dargs[i],
547 					     order,
548 					     orig_extent,
549 					     type_old,
550 					     &type_new,
551 					     st_offsets+i);
552 		break;
553 	    case MPI_DISTRIBUTE_NONE:
554 		/* treat it as a block distribution on 1 process */
555 		mpi_errno = MPIR_Type_block(array_of_gsizes,
556 					    i,
557 					    ndims,
558 					    1,
559 					    0,
560 					    MPI_DISTRIBUTE_DFLT_DARG,
561 					    order,
562 					    orig_extent,
563 					    type_old,
564 					    &type_new,
565 					    st_offsets+i);
566 		break;
567 	    }
568 	    if (i)
569 	    {
570 		MPIR_Type_free_impl(&type_old);
571 	    }
572 	    type_old = type_new;
573 
574 	    /* --BEGIN ERROR HANDLING-- */
575 	    if (mpi_errno != MPI_SUCCESS) goto fn_fail;
576 	    /* --END ERROR HANDLING-- */
577 	}
578 
579 	/* add displacement and UB */
580 	disps[1] = st_offsets[0];
581 	tmp_size = 1;
582 	for (i=1; i<ndims; i++) {
583 	    tmp_size *= array_of_gsizes[i-1];
584 	    disps[1] += (MPI_Aint) tmp_size * st_offsets[i];
585 	}
586         /* rest done below for both Fortran and C order */
587     }
588 
589     else /* order == MPI_ORDER_C */ {
590         /* dimension ndims-1 changes fastest */
591 	for (i=ndims-1; i>=0; i--) {
592 	    switch(array_of_distribs[i]) {
593 	    case MPI_DISTRIBUTE_BLOCK:
594 		mpi_errno = MPIR_Type_block(array_of_gsizes,
595 					    i,
596 					    ndims,
597 					    array_of_psizes[i],
598 					    coords[i],
599 					    array_of_dargs[i],
600 					    order,
601 					    orig_extent,
602 					    type_old,
603 					    &type_new,
604 					    st_offsets+i);
605 		break;
606 	    case MPI_DISTRIBUTE_CYCLIC:
607 		mpi_errno = MPIR_Type_cyclic(array_of_gsizes,
608 					     i,
609 					     ndims,
610 					     array_of_psizes[i],
611 					     coords[i],
612 					     array_of_dargs[i],
613 					     order,
614 					     orig_extent,
615 					     type_old,
616 					     &type_new,
617 					     st_offsets+i);
618 		break;
619 	    case MPI_DISTRIBUTE_NONE:
620 		/* treat it as a block distribution on 1 process */
621 		mpi_errno = MPIR_Type_block(array_of_gsizes,
622 					    i,
623 					    ndims,
624 					    array_of_psizes[i],
625 					    coords[i],
626 					    MPI_DISTRIBUTE_DFLT_DARG,
627 					    order,
628 					    orig_extent,
629 					    type_old,
630 					    &type_new,
631 					    st_offsets+i);
632 		break;
633 	    }
634 	    if (i != ndims-1)
635 	    {
636 		MPIR_Type_free_impl(&type_old);
637 	    }
638 	    type_old = type_new;
639 
640 	    /* --BEGIN ERROR HANDLING-- */
641 	    if (mpi_errno != MPI_SUCCESS) goto fn_fail;
642 	    /* --END ERROR HANDLING-- */
643 	}
644 
645 	/* add displacement and UB */
646 	disps[1] = st_offsets[ndims-1];
647 	tmp_size = 1;
648 	for (i=ndims-2; i>=0; i--) {
649 	    tmp_size *= array_of_gsizes[i+1];
650 	    disps[1] += (MPI_Aint) tmp_size * st_offsets[i];
651 	}
652     }
653 
654     disps[1] *= orig_extent;
655 
656     disps[2] = orig_extent;
657     for (i=0; i<ndims; i++) disps[2] *= (MPI_Aint)(array_of_gsizes[i]);
658 
659     disps[0] = 0;
660     blklens[0] = blklens[1] = blklens[2] = 1;
661     types[0] = MPI_LB;
662     types[1] = type_new;
663     types[2] = MPI_UB;
664 
665     mpi_errno = MPID_Type_struct(3,
666 				 blklens,
667 				 disps,
668 				 types,
669 				 &new_handle);
670     /* --BEGIN ERROR HANDLING-- */
671     if (mpi_errno != MPI_SUCCESS) goto fn_fail;
672     /* --END ERROR HANDLING-- */
673 
674     MPIR_Type_free_impl(&type_new);
675 
676     /* at this point we have the new type, and we've cleaned up any
677      * intermediate types created in the process.  we just need to save
678      * all our contents/envelope information.
679      */
680 
681     /* Save contents */
682     MPIU_CHKLMEM_MALLOC_ORJUMP(ints, int *, (4 * ndims + 4) * sizeof(int), mpi_errno, "content description");
683 
684     ints[0] = size;
685     ints[1] = rank;
686     ints[2] = ndims;
687 
688     for (i=0; i < ndims; i++) {
689 	ints[i + 3] = array_of_gsizes[i];
690     }
691     for (i=0; i < ndims; i++) {
692 	ints[i + ndims + 3] = array_of_distribs[i];
693     }
694     for (i=0; i < ndims; i++) {
695 	ints[i + 2*ndims + 3] = array_of_dargs[i];
696     }
697     for (i=0; i < ndims; i++) {
698 	ints[i + 3*ndims + 3] = array_of_psizes[i];
699     }
700     ints[4*ndims + 3] = order;
701     MPID_Datatype_get_ptr(new_handle, datatype_ptr);
702     mpi_errno = MPID_Datatype_set_contents(datatype_ptr,
703 					   MPI_COMBINER_DARRAY,
704 					   4*ndims + 4,
705 					   0,
706 					   1,
707 					   ints,
708 					   NULL,
709 					   &oldtype);
710     /* --BEGIN ERROR HANDLING-- */
711     if (mpi_errno != MPI_SUCCESS) goto fn_fail;
712     /* --END ERROR HANDLING-- */
713 
714     MPIU_OBJ_PUBLISH_HANDLE(*newtype, new_handle);
715     /* ... end of body of routine ... */
716 
717   fn_exit:
718     MPIU_CHKLMEM_FREEALL();
719     MPID_MPI_FUNC_EXIT(MPID_STATE_MPI_TYPE_CREATE_DARRAY);
720     MPIU_THREAD_CS_EXIT(ALLFUNC,);
721     return mpi_errno;
722 
723   fn_fail:
724     /* --BEGIN ERROR HANDLING-- */
725 #   ifdef HAVE_ERROR_CHECKING
726     {
727 	mpi_errno = MPIR_Err_create_code(
728 	    mpi_errno, MPIR_ERR_RECOVERABLE, FCNAME, __LINE__, MPI_ERR_OTHER, "**mpi_type_create_darray",
729 	    "**mpi_type_create_darray %d %d %d %p %p %p %p %d %D %p", size, rank, ndims, array_of_gsizes,
730 	    array_of_distribs, array_of_dargs, array_of_psizes, order, oldtype, newtype);
731     }
732 #   endif
733     mpi_errno = MPIR_Err_return_comm(NULL, FCNAME, mpi_errno);
734     goto fn_exit;
735     /* --END ERROR HANDLING-- */
736 }
737