1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
2 /*
3  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
4  *                         University Research and Technology
5  *                         Corporation.  All rights reserved.
6  * Copyright (c) 2004-2017 The University of Tennessee and The University
7  *                         of Tennessee Research Foundation.  All rights
8  *                         reserved.
9  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
10  *                         University of Stuttgart.  All rights reserved.
11  * Copyright (c) 2004-2005 The Regents of the University of California.
12  *                         All rights reserved.
13  * Copyright (c) 2008-2017 University of Houston. All rights reserved.
14  * Copyright (c) 2011-2015 Cisco Systems, Inc.  All rights reserved.
15  * Copyright (c) 2012-2013 Inria.  All rights reserved.
16  * Copyright (c) 2015-2017 Research Organization for Information Science
17  *                         and Technology (RIST). All rights reserved.
18  * Copyright (c) 2017      IBM Corporation. All rights reserved.
19  * $COPYRIGHT$
20  *
21  * Additional copyrights may follow
22  *
23  * $HEADER$
24  */
25 
26 #include "ompi_config.h"
27 
28 #include "ompi/runtime/params.h"
29 #include "ompi/communicator/communicator.h"
30 #include "ompi/mca/pml/pml.h"
31 #include "ompi/mca/topo/topo.h"
32 #include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
33 #include "opal/datatype/opal_convertor.h"
34 #include "opal/datatype/opal_datatype.h"
35 #include "ompi/datatype/ompi_datatype.h"
36 #include "ompi/info/info.h"
37 #include "ompi/request/request.h"
38 
39 #include <math.h>
40 #include <unistd.h>
41 
42 #include "common_ompio_aggregators.h"
43 
44 /*
45 ** This file contains all the functionality related to determing the number of aggregators
46 ** and the list of aggregators.
47 **
48 ** The first group functions determines the number of aggregators based on various characteristics
49 **
50 ** 1. simple_grouping: A heuristic based on a cost model
51 ** 2. fview_based_grouping: analysis the fileview to detect regular patterns
52 ** 3. cart_based_grouping: uses a cartesian communicator to derive certain (probable) properties
53 **    of the access pattern
54 */
55 
56 
57 static double cost_calc (int P, int P_agg, size_t Data_proc, size_t coll_buffer, int dim );
58 #define DIM1 1
59 #define DIM2 2
60 
mca_common_ompio_simple_grouping(mca_io_ompio_file_t * fh,int * num_groups_out,mca_common_ompio_contg * contg_groups)61 int mca_common_ompio_simple_grouping(mca_io_ompio_file_t *fh,
62                                  int *num_groups_out,
63                                  mca_common_ompio_contg *contg_groups)
64 {
65     int group_size  = 0;
66     int k=0, p=0, g=0;
67     int total_procs = 0;
68     int num_groups=1;
69 
70     double time=0.0, time_prev=0.0, dtime=0.0, dtime_abs=0.0, dtime_diff=0.0, dtime_prev=0.0;
71     double dtime_threshold=0.0;
72 
73     /* This is the threshold for absolute improvement. It is not
74     ** exposed as an MCA parameter to avoid overwhelming users. It is
75     ** mostly relevant for smaller process counts and data volumes.
76     */
77     double time_threshold=0.001;
78 
79     int incr=1, mode=1;
80     int P_a, P_a_prev;
81 
82     /* The aggregator selection algorithm is based on the formulas described
83     ** in: Shweta Jha, Edgar Gabriel, 'Performance Models for Communication in
84     ** Collective I/O operations', Proceedings of the 17th IEEE/ACM Symposium
85     ** on Cluster, Cloud and Grid Computing, Workshop on Theoretical
86     ** Approaches to Performance Evaluation, Modeling and Simulation, 2017.
87     **
88     ** The current implementation is based on the 1-D and 2-D models derived for the even
89     ** file partitioning strategy in the paper. Note, that the formulas currently only model
90     ** the communication aspect of collective I/O operations. There are two extensions in this
91     ** implementation:
92     **
93     ** 1. Since the resulting formula has an asymptotic behavior w.r.t. the
94     ** no. of aggregators, this version determines the no. of aggregators to
95     ** be used iteratively and stops increasing the no. of aggregators if the
96     ** benefits of increasing the aggregators is below a certain threshold
97     ** value relative to the last number tested. The aggresivnes of cutting of
98     ** the increasie in the number of aggregators is controlled by the new mca
99     ** parameter mca_io_ompio_aggregator_cutoff_threshold.  Lower values for
100     ** this parameter will lead to higher number of aggregators (useful e.g
101     ** for PVFS2 and GPFS file systems), while higher number will lead to
102     ** lower no. of aggregators (useful for regular UNIX or NFS file systems).
103     **
104     ** 2. The algorithm further caps the maximum no. of aggregators used to not exceed
105     ** (no. of processes / mca_io_ompio_max_aggregators_ratio), i.e. a higher value
106     ** for mca_io_ompio_max_aggregators will decrease the maximum number of aggregators
107     ** allowed for the given no. of processes.
108     */
109     dtime_threshold = (double) OMPIO_MCA_GET(fh, aggregators_cutoff_threshold) / 100.0;
110 
111     /* Determine whether to use the formula for 1-D or 2-D data decomposition. Anything
112     ** that is not 1-D is assumed to be 2-D in this version
113     */
114     mode = ( fh->f_cc_size == fh->f_view_size ) ? 1 : 2;
115 
116     /* Determine the increment size when searching the optimal
117     ** no. of aggregators
118     */
119     if ( fh->f_size < 16 ) {
120 	incr = 2;
121     }
122     else if (fh->f_size < 128 ) {
123 	incr = 4;
124     }
125     else if ( fh->f_size < 4096 ) {
126 	incr = 16;
127     }
128     else {
129 	incr = 32;
130     }
131 
132     P_a = 1;
133     time_prev = cost_calc ( fh->f_size, P_a, fh->f_view_size, (size_t) fh->f_bytes_per_agg, mode );
134     P_a_prev = P_a;
135     for ( P_a = incr; P_a <= fh->f_size; P_a += incr ) {
136 	time = cost_calc ( fh->f_size, P_a, fh->f_view_size, (size_t) fh->f_bytes_per_agg, mode );
137 	dtime_abs = (time_prev - time);
138 	dtime = dtime_abs / time_prev;
139 	dtime_diff = ( P_a == incr ) ? dtime : (dtime_prev - dtime);
140 #ifdef OMPIO_DEBUG
141 	if ( 0 == fh->f_rank  ){
142 	    printf(" d_p = %ld P_a = %d time = %lf dtime = %lf dtime_abs =%lf dtime_diff=%lf\n",
143 		   fh->f_view_size, P_a, time, dtime, dtime_abs, dtime_diff );
144 	}
145 #endif
146 	if ( dtime_diff < dtime_threshold ) {
147 	    /* The relative improvement compared to the last number
148 	    ** of aggregators was below a certain threshold. This is typically
149 	    ** the dominating factor for large data volumes and larger process
150 	    ** counts
151 	    */
152 #ifdef OMPIO_DEBUG
153 	    if ( 0 == fh->f_rank ) {
154 		printf("dtime_diff below threshold\n");
155 	    }
156 #endif
157 	    break;
158 	}
159 	if ( dtime_abs < time_threshold ) {
160 	    /* The absolute improvement compared to the last number
161 	    ** of aggregators was below a given threshold. This is typically
162 	    ** important for small data valomes and smallers process counts
163 	    */
164 #ifdef OMPIO_DEBUG
165 	    if ( 0 == fh->f_rank ) {
166 		printf("dtime_abs below threshold\n");
167 	    }
168 #endif
169 	    break;
170 	}
171 	time_prev = time;
172 	dtime_prev = dtime;
173 	P_a_prev = P_a;
174     }
175     num_groups = P_a_prev;
176 #ifdef OMPIO_DEBUG
177     printf(" For P=%d d_p=%ld b_c=%d threshold=%f chosen P_a = %d \n",
178 	   fh->f_size, fh->f_view_size, fh->f_bytes_per_agg, dtime_threshold, P_a_prev);
179 #endif
180 
181     /* Cap the maximum number of aggregators.*/
182     if ( num_groups > (fh->f_size/OMPIO_MCA_GET(fh, max_aggregators_ratio))) {
183 	num_groups = (fh->f_size/OMPIO_MCA_GET(fh, max_aggregators_ratio));
184     }
185     if ( 1 >= num_groups ) {
186 	num_groups = 1;
187     }
188     group_size = fh->f_size / num_groups;
189 
190     for ( k=0, p=0; p<num_groups; p++ ) {
191         if ( p == (num_groups - 1) ) {
192             contg_groups[p].procs_per_contg_group = fh->f_size - total_procs;
193         }
194         else {
195             contg_groups[p].procs_per_contg_group = group_size;
196             total_procs +=group_size;
197         }
198         for ( g=0; g<contg_groups[p].procs_per_contg_group; g++ ) {
199             contg_groups[p].procs_in_contg_group[g] = k;
200             k++;
201         }
202     }
203 
204     *num_groups_out = num_groups;
205     return OMPI_SUCCESS;
206 }
207 
mca_common_ompio_fview_based_grouping(mca_io_ompio_file_t * fh,int * num_groups,mca_common_ompio_contg * contg_groups)208 int mca_common_ompio_fview_based_grouping(mca_io_ompio_file_t *fh,
209                      		      int *num_groups,
210 				      mca_common_ompio_contg *contg_groups)
211 {
212 
213     int k = 0;
214     int p = 0;
215     int g = 0;
216     int ret = OMPI_SUCCESS;
217     OMPI_MPI_OFFSET_TYPE start_offset_len[3] = {0};
218     OMPI_MPI_OFFSET_TYPE *end_offsets = NULL;
219     OMPI_MPI_OFFSET_TYPE *start_offsets_lens = NULL;
220 
221     //Store start offset,length and corresponding rank in an array
222     if(NULL == fh->f_decoded_iov){
223       start_offset_len[0] = 0;
224       start_offset_len[1] = 0;
225     }
226     else{
227        start_offset_len[0] = (OMPI_MPI_OFFSET_TYPE) fh->f_decoded_iov[0].iov_base;
228        start_offset_len[1] = fh->f_decoded_iov[0].iov_len;
229     }
230     start_offset_len[2] = fh->f_rank;
231 
232     start_offsets_lens = (OMPI_MPI_OFFSET_TYPE* )malloc (3 * fh->f_size * sizeof(OMPI_MPI_OFFSET_TYPE));
233     if (NULL == start_offsets_lens) {
234         opal_output (1, "OUT OF MEMORY\n");
235         ret = OMPI_ERR_OUT_OF_RESOURCE;
236         goto exit;
237     }
238     end_offsets = (OMPI_MPI_OFFSET_TYPE* )malloc (fh->f_size * sizeof(OMPI_MPI_OFFSET_TYPE));
239     if (NULL == end_offsets) {
240         opal_output (1, "OUT OF MEMORY\n");
241         ret = OMPI_ERR_OUT_OF_RESOURCE;
242         goto exit;
243     }
244 
245     //Allgather start offsets across processes in a group on aggregator
246     ret = fh->f_comm->c_coll->coll_allgather (start_offset_len,
247                                              3,
248                                              OMPI_OFFSET_DATATYPE,
249                                              start_offsets_lens,
250                                              3,
251                                              OMPI_OFFSET_DATATYPE,
252                                              fh->f_comm,
253                                              fh->f_comm->c_coll->coll_allgather_module);
254     if ( OMPI_SUCCESS != ret ) {
255         goto exit;
256     }
257 
258 
259     //Calculate contg chunk size and contg subgroups
260     for( k = 0 ; k < fh->f_size; k++){
261         end_offsets[k] = start_offsets_lens[3*k] + start_offsets_lens[3*k+1];
262         contg_groups[k].contg_chunk_size = 0;
263     }
264     k = 0;
265     while( k < fh->f_size){
266         if( k == 0){
267             contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1];
268             contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2];
269             g++;
270             contg_groups[p].procs_per_contg_group = g;
271             k++;
272         }
273         else if( start_offsets_lens[3*k] == end_offsets[k - 1] ){
274             contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1];
275             contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2];
276             g++;
277             contg_groups[p].procs_per_contg_group = g;
278             k++;
279         }
280         else{
281             p++;
282             g = 0;
283             contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1];
284             contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2];
285             g++;
286             contg_groups[p].procs_per_contg_group = g;
287             k++;
288         }
289     }
290 
291     *num_groups = p+1;
292     ret = OMPI_SUCCESS;
293 
294 exit:
295     if (NULL != start_offsets_lens) {
296         free (start_offsets_lens);
297     }
298     if (NULL != end_offsets) {
299         free(end_offsets);
300     }
301 
302     return ret;
303 }
304 
mca_common_ompio_cart_based_grouping(mca_io_ompio_file_t * ompio_fh,int * num_groups,mca_common_ompio_contg * contg_groups)305 int mca_common_ompio_cart_based_grouping(mca_io_ompio_file_t *ompio_fh,
306                                          int *num_groups,
307                                          mca_common_ompio_contg *contg_groups)
308 {
309     int k = 0;
310     int g=0;
311     int ret = OMPI_SUCCESS, tmp_rank = 0;
312     int *coords_tmp = NULL;
313 
314     mca_common_ompio_cart_topo_components cart_topo;
315     memset (&cart_topo, 0, sizeof(mca_common_ompio_cart_topo_components));
316 
317     ret = ompio_fh->f_comm->c_topo->topo.cart.cartdim_get(ompio_fh->f_comm, &cart_topo.ndims);
318     if (OMPI_SUCCESS != ret  ) {
319         goto exit;
320     }
321 
322     if (cart_topo.ndims < 2 ) {
323         /* We shouldn't be here, this routine only works for more than 1 dimension */
324         ret = MPI_ERR_INTERN;
325         goto exit;
326     }
327 
328     cart_topo.dims = (int*)malloc (cart_topo.ndims * sizeof(int));
329     if (NULL == cart_topo.dims) {
330         opal_output (1, "OUT OF MEMORY\n");
331         ret = OMPI_ERR_OUT_OF_RESOURCE;
332         goto exit;
333     }
334     cart_topo.periods = (int*)malloc (cart_topo.ndims * sizeof(int));
335     if (NULL == cart_topo.periods) {
336         opal_output (1, "OUT OF MEMORY\n");
337         ret = OMPI_ERR_OUT_OF_RESOURCE;
338         goto exit;
339     }
340     cart_topo.coords = (int*)malloc (cart_topo.ndims * sizeof(int));
341     if (NULL == cart_topo.coords) {
342         opal_output (1, "OUT OF MEMORY\n");
343         ret = OMPI_ERR_OUT_OF_RESOURCE;
344         goto exit;
345     }
346 
347     coords_tmp  = (int*)malloc (cart_topo.ndims * sizeof(int));
348     if (NULL == coords_tmp) {
349         opal_output (1, "OUT OF MEMORY\n");
350         ret = OMPI_ERR_OUT_OF_RESOURCE;
351         goto exit;
352     }
353 
354     ret = ompio_fh->f_comm->c_topo->topo.cart.cart_get(ompio_fh->f_comm,
355                                                        cart_topo.ndims,
356                                                        cart_topo.dims,
357                                                        cart_topo.periods,
358                                                        cart_topo.coords);
359     if ( OMPI_SUCCESS != ret ) {
360         opal_output (1, "mca_common_ompio_cart_based_grouping: Error in cart_get \n");
361         goto exit;
362     }
363 
364     *num_groups = cart_topo.dims[0];  //number of rows
365 
366     for(k = 0; k < cart_topo.dims[0]; k++){
367         int done = 0;
368         int index = cart_topo.ndims-1;
369 
370         memset ( coords_tmp, 0, cart_topo.ndims * sizeof(int));
371         contg_groups[k].procs_per_contg_group = (ompio_fh->f_size / cart_topo.dims[0]);
372         coords_tmp[0] = k;
373 
374         ret = ompio_fh->f_comm->c_topo->topo.cart.cart_rank (ompio_fh->f_comm,coords_tmp,&tmp_rank);
375         if ( OMPI_SUCCESS != ret ) {
376             opal_output (1, "mca_common_ompio_cart_based_grouping: Error in cart_rank\n");
377             goto exit;
378         }
379         contg_groups[k].procs_in_contg_group[0] = tmp_rank;
380 
381         for ( g=1; g< contg_groups[k].procs_per_contg_group; g++ ) {
382             done = 0;
383             index = cart_topo.ndims-1;
384 
385             while ( ! done ) {
386                 coords_tmp[index]++;
387                 if ( coords_tmp[index] ==cart_topo.dims[index] ) {
388                     coords_tmp[index]=0;
389                     index--;
390                 }
391                 else {
392                     done = 1;
393                 }
394                 if ( index == 0 ) {
395                     done = 1;
396                 }
397             }
398 
399            ret = ompio_fh->f_comm->c_topo->topo.cart.cart_rank (ompio_fh->f_comm,coords_tmp,&tmp_rank);
400            if ( OMPI_SUCCESS != ret ) {
401              opal_output (1, "mca_common_ompio_cart_based_grouping: Error in cart_rank\n");
402              goto exit;
403            }
404            contg_groups[k].procs_in_contg_group[g] = tmp_rank;
405         }
406     }
407 
408 
409 exit:
410     if (NULL != cart_topo.dims) {
411        free (cart_topo.dims);
412        cart_topo.dims = NULL;
413     }
414     if (NULL != cart_topo.periods) {
415        free (cart_topo.periods);
416        cart_topo.periods = NULL;
417     }
418     if (NULL != cart_topo.coords) {
419        free (cart_topo.coords);
420        cart_topo.coords = NULL;
421     }
422     if (NULL != coords_tmp) {
423        free (coords_tmp);
424        coords_tmp = NULL;
425     }
426 
427     return ret;
428 }
429 
430 
431 
mca_common_ompio_finalize_initial_grouping(mca_io_ompio_file_t * fh,int num_groups,mca_common_ompio_contg * contg_groups)432 int mca_common_ompio_finalize_initial_grouping(mca_io_ompio_file_t *fh,
433 		                           int num_groups,
434 					   mca_common_ompio_contg *contg_groups)
435 {
436 
437     int z = 0;
438     int y = 0;
439 
440     fh->f_init_num_aggrs = num_groups;
441     if (NULL != fh->f_init_aggr_list) {
442         free(fh->f_init_aggr_list);
443     }
444     fh->f_init_aggr_list = (int*)malloc (fh->f_init_num_aggrs * sizeof(int));
445     if (NULL == fh->f_init_aggr_list) {
446         opal_output (1, "OUT OF MEMORY\n");
447         return OMPI_ERR_OUT_OF_RESOURCE;
448     }
449 
450     for( z = 0 ;z < num_groups; z++){
451         for( y = 0; y < contg_groups[z].procs_per_contg_group; y++){
452             if ( fh->f_rank == contg_groups[z].procs_in_contg_group[y] ) {
453                 fh->f_init_procs_per_group = contg_groups[z].procs_per_contg_group;
454                 if (NULL != fh->f_init_procs_in_group) {
455                     free(fh->f_init_procs_in_group);
456                 }
457                 fh->f_init_procs_in_group = (int*)malloc (fh->f_init_procs_per_group * sizeof(int));
458                 if (NULL == fh->f_init_procs_in_group) {
459                     opal_output (1, "OUT OF MEMORY\n");
460                     return OMPI_ERR_OUT_OF_RESOURCE;
461                 }
462                 memcpy ( fh->f_init_procs_in_group, contg_groups[z].procs_in_contg_group,
463                          contg_groups[z].procs_per_contg_group * sizeof (int));
464 
465             }
466         }
467     }
468 
469     for( z = 0 ;z < num_groups; z++){
470         fh->f_init_aggr_list[z] = contg_groups[z].procs_in_contg_group[0];
471     }
472 
473 
474    return OMPI_SUCCESS;
475 }
476 
477 /*****************************************************************************************************/
478 /*****************************************************************************************************/
479 /*****************************************************************************************************/
480 /*
481 ** This function is called by the collective I/O operations to determine the final number
482 ** of aggregators.
483 */
484 
mca_common_ompio_set_aggregator_props(struct mca_io_ompio_file_t * fh,int num_aggregators,size_t bytes_per_proc)485 int mca_common_ompio_set_aggregator_props (struct mca_io_ompio_file_t *fh,
486                                        int num_aggregators,
487                                        size_t bytes_per_proc)
488 {
489     int j,procs_per_group = 0;
490     int ret=OMPI_SUCCESS;
491 
492     /*If only one process used, no need to do aggregator selection!*/
493     if (fh->f_size == 1){
494 	num_aggregators = 1;
495     }
496 
497     fh->f_flags |= OMPIO_AGGREGATOR_IS_SET;
498 
499     if (-1 == num_aggregators) {
500         if ( SIMPLE        == OMPIO_MCA_GET(fh, grouping_option) ||
501              NO_REFINEMENT == OMPIO_MCA_GET(fh,grouping_option) ||
502              SIMPLE_PLUS   == OMPIO_MCA_GET(fh,grouping_option) ) {
503             fh->f_aggregator_index = 0;
504             fh->f_final_num_aggrs  = fh->f_init_num_aggrs;
505             fh->f_procs_per_group  = fh->f_init_procs_per_group;
506 
507             fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
508             if (NULL == fh->f_procs_in_group) {
509                 opal_output (1, "OUT OF MEMORY\n");
510                 return OMPI_ERR_OUT_OF_RESOURCE;
511             }
512 
513             for (j=0 ; j<fh->f_procs_per_group ; j++) {
514                 fh->f_procs_in_group[j] = fh->f_init_procs_in_group[j];
515             }
516         }
517         else {
518             ret = mca_common_ompio_create_groups(fh,bytes_per_proc);
519         }
520         return ret;
521     }
522 
523     /* Forced number of aggregators
524     ** calculate the offset at which each group of processes will start
525     */
526     if ( num_aggregators > fh->f_size ) {
527 	num_aggregators = fh->f_size;
528     }
529     procs_per_group = ceil ((float)fh->f_size/num_aggregators);
530 
531     /* calculate the number of processes in the local group */
532     if (fh->f_size/procs_per_group != fh->f_rank/procs_per_group) {
533         fh->f_procs_per_group = procs_per_group;
534     }
535     else {
536         fh->f_procs_per_group = fh->f_size%procs_per_group;
537     }
538 
539     fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
540     if (NULL == fh->f_procs_in_group) {
541         opal_output (1, "OUT OF MEMORY\n");
542         return OMPI_ERR_OUT_OF_RESOURCE;
543     }
544 
545     for (j=0 ; j<fh->f_procs_per_group ; j++) {
546         fh->f_procs_in_group[j] = (fh->f_rank/procs_per_group) * procs_per_group + j;
547     }
548 
549     fh->f_aggregator_index = 0;
550     fh->f_final_num_aggrs  = num_aggregators;
551 
552     return OMPI_SUCCESS;
553  }
554 
555 
556 
mca_common_ompio_create_groups(mca_io_ompio_file_t * fh,size_t bytes_per_proc)557 int mca_common_ompio_create_groups(mca_io_ompio_file_t *fh,
558 		               size_t bytes_per_proc)
559 {
560 
561     int is_aggregator = 0;
562     int final_aggr = 0;
563     int final_num_aggrs = 0;
564     int ret = OMPI_SUCCESS, ompio_grouping_flag = 0;
565 
566     int *decision_list = NULL;
567 
568     OMPI_MPI_OFFSET_TYPE *start_offsets_lens = NULL;
569     OMPI_MPI_OFFSET_TYPE *end_offsets = NULL;
570     OMPI_MPI_OFFSET_TYPE bytes_per_group = 0;
571     OMPI_MPI_OFFSET_TYPE *aggr_bytes_per_group = NULL;
572 
573     ret = mca_common_ompio_prepare_to_group(fh,
574                                         &start_offsets_lens,
575                                         &end_offsets,
576                                         &aggr_bytes_per_group,
577                                         &bytes_per_group,
578                                         &decision_list,
579                                         bytes_per_proc,
580                                         &is_aggregator,
581                                         &ompio_grouping_flag);
582     if ( OMPI_SUCCESS != ret ) {
583         opal_output (1, "mca_common_ompio_create_groups: error in mca_common_ompio_prepare_to_group\n");
584         goto exit;
585     }
586 
587     switch(ompio_grouping_flag){
588 
589         case OMPIO_SPLIT:
590             ret = mca_common_ompio_split_initial_groups(fh,
591                                                     start_offsets_lens,
592                                                     end_offsets,
593                                                     bytes_per_group);
594         break;
595 
596         case OMPIO_MERGE:
597             ret = mca_common_ompio_merge_initial_groups(fh,
598                                                     aggr_bytes_per_group,
599                                                     decision_list,
600                                                     is_aggregator);
601             break;
602 
603         case  OMPIO_RETAIN:
604 
605             ret = mca_common_ompio_retain_initial_groups(fh);
606 
607         break;
608 
609 
610     }
611     if ( OMPI_SUCCESS != ret ) {
612         opal_output (1, "mca_common_ompio_create_groups: error in subroutine called within switch statement\n");
613         goto exit;
614     }
615 
616     //Set aggregator index
617     fh->f_aggregator_index = 0;
618 
619     //Calculate final number of aggregators
620     if(fh->f_rank == fh->f_procs_in_group[fh->f_aggregator_index]){
621 	   final_aggr = 1;
622     }
623     ret = fh->f_comm->c_coll->coll_allreduce (&final_aggr,
624                                              &final_num_aggrs,
625                                              1,
626                                              MPI_INT,
627                                              MPI_SUM,
628                                              fh->f_comm,
629                                              fh->f_comm->c_coll->coll_allreduce_module);
630     if ( OMPI_SUCCESS != ret ) {
631         opal_output (1, "mca_common_ompio_create_groups: error in allreduce\n");
632     }
633 
634     //Set final number of aggregators in file handle
635     fh->f_final_num_aggrs = final_num_aggrs;
636 
637 exit:
638 
639     if (NULL != start_offsets_lens) {
640         free (start_offsets_lens);
641     }
642     if (NULL != end_offsets) {
643         free (end_offsets);
644     }
645     if(NULL != aggr_bytes_per_group){
646       free(aggr_bytes_per_group);
647     }
648     if( NULL != decision_list){
649       free(decision_list);
650     }
651 
652 
653    return OMPI_SUCCESS;
654 }
655 
mca_common_ompio_merge_initial_groups(mca_io_ompio_file_t * fh,OMPI_MPI_OFFSET_TYPE * aggr_bytes_per_group,int * decision_list,int is_aggregator)656 int mca_common_ompio_merge_initial_groups(mca_io_ompio_file_t *fh,
657 		                      OMPI_MPI_OFFSET_TYPE *aggr_bytes_per_group,
658 				      int *decision_list,
659 	                              int is_aggregator){
660 
661     OMPI_MPI_OFFSET_TYPE sum_bytes = 0;
662     MPI_Request *sendreqs = NULL;
663 
664     int start = 0;
665     int end = 0;
666     int i = 0;
667     int j = 0;
668     int r  = 0;
669 
670     int merge_pair_flag = 4;
671     int first_merge_flag = 4;
672     int *merge_aggrs = NULL;
673     int is_new_aggregator= 0;
674     int ret = OMPI_SUCCESS;
675 
676     if(is_aggregator){
677         i = 0;
678 	sum_bytes = 0;
679         //go through the decision list
680 	//Find the aggregators that could merge
681 
682 	while(i < fh->f_init_num_aggrs){
683 	    while(1){
684 	        if( i >= fh->f_init_num_aggrs){
685 	            break;
686 	        }
687 	        else if((decision_list[i] == OMPIO_MERGE) &&
688 	                (sum_bytes <= OMPIO_MCA_GET(fh, bytes_per_agg))){
689 	            sum_bytes = sum_bytes + aggr_bytes_per_group[i];
690 	            decision_list[i] = merge_pair_flag;
691 	            i++;
692 	        }
693 	        else if((decision_list[i] == OMPIO_MERGE) &&
694 	                (sum_bytes >= OMPIO_MCA_GET(fh, bytes_per_agg)) ){
695 	           if(decision_list[i+1] == OMPIO_MERGE){
696 	               merge_pair_flag++;
697 	               decision_list[i] = merge_pair_flag;
698 	               sum_bytes = aggr_bytes_per_group[i];
699 	               i++;
700 	           }
701 	           else{
702 	               decision_list[i] = merge_pair_flag;
703 	               i++;
704 	           }
705 	        }
706 	        else{
707 	            i++;
708 	            if(decision_list[i] == OMPIO_MERGE)
709 	               merge_pair_flag++;
710 	            sum_bytes = 0;
711 	            break;
712 	        }
713 	    }
714         }
715 
716         //Now go through the new edited decision list and
717 	//make lists of aggregators to merge and number
718 	//of groups to me merged.
719 	i = 0;
720 	j = 0;
721 
722 	while(i < fh->f_init_num_aggrs){
723 	   if(decision_list[i] >= first_merge_flag){
724 	       start = i;
725 	       while((decision_list[i] >= first_merge_flag) &&
726 		      (i < fh->f_init_num_aggrs-1)){
727 	           if(decision_list[i+1] == decision_list[i]){
728 	               i++;
729 	           }
730 	           else{
731 	               break;
732 	           }
733 	           end = i;
734 	       }
735 	       merge_aggrs = (int *)malloc((end - start + 1) * sizeof(int));
736 	       if (NULL == merge_aggrs) {
737 		  opal_output (1, "OUT OF MEMORY\n");
738 		  return OMPI_ERR_OUT_OF_RESOURCE;
739 	       }
740 	       j = 0;
741 	       for( j = 0 ; j < end - start + 1; j++){
742 	           merge_aggrs[j] = fh->f_init_aggr_list[start+j];
743 	       }
744                if(fh->f_rank == merge_aggrs[0])
745 	          is_new_aggregator = 1;
746 
747 	       for( j = 0 ; j < end-start+1 ;j++){
748 	          if(fh->f_rank == merge_aggrs[j]){
749 	              ret = mca_common_ompio_merge_groups(fh, merge_aggrs,
750                                                       end-start+1);
751                       if ( OMPI_SUCCESS != ret ) {
752                           opal_output (1, "mca_common_ompio_merge_initial_groups: error in mca_common_ompio_merge_groups\n");
753                           free ( merge_aggrs );
754                           return ret;
755                       }
756 		  }
757 	       }
758                if(NULL != merge_aggrs){
759 	           free(merge_aggrs);
760                    merge_aggrs = NULL;
761                }
762 
763 	   }
764            i++;
765         }
766 
767     }//end old aggregators
768 
769     //New aggregators communicate new grouping info to the groups
770     if(is_new_aggregator){
771        sendreqs = (MPI_Request *)malloc ( 2 *fh->f_procs_per_group * sizeof(MPI_Request));
772        if (NULL == sendreqs) {
773           return OMPI_ERR_OUT_OF_RESOURCE;
774        }
775        //Communicate grouping info
776        for( j = 0 ; j < fh->f_procs_per_group; j++){
777 	   if (fh->f_procs_in_group[j] == fh->f_rank ) {
778 	       continue;
779 	   }
780            //new aggregator sends new procs_per_group to all its members
781 	   ret = MCA_PML_CALL(isend(&fh->f_procs_per_group,
782                                     1,
783                                     MPI_INT,
784                                     fh->f_procs_in_group[j],
785                                     OMPIO_PROCS_PER_GROUP_TAG,
786                                     MCA_PML_BASE_SEND_STANDARD,
787                                     fh->f_comm,
788                                     sendreqs + r++));
789            if ( OMPI_SUCCESS != ret ) {
790                opal_output (1, "mca_common_ompio_merge_initial_groups: error in Isend\n");
791                goto exit;
792            }
793 	   //new aggregator sends distribution of process to all its new members
794 	   ret = MCA_PML_CALL(isend(fh->f_procs_in_group,
795                                     fh->f_procs_per_group,
796                                     MPI_INT,
797                                     fh->f_procs_in_group[j],
798                                     OMPIO_PROCS_IN_GROUP_TAG,
799                                     MCA_PML_BASE_SEND_STANDARD,
800                                     fh->f_comm,
801                                     sendreqs + r++));
802            if ( OMPI_SUCCESS != ret ) {
803                opal_output (1, "mca_common_ompio_merge_initial_groups: error in Isend 2\n");
804                goto exit;
805            }
806 
807        }
808     }
809     else {
810 	//All non aggregators
811 	//All processes receive initial process distribution from aggregators
812 	ret = MCA_PML_CALL(recv(&fh->f_procs_per_group,
813                                 1,
814                                 MPI_INT,
815                                 MPI_ANY_SOURCE,
816                                 OMPIO_PROCS_PER_GROUP_TAG,
817                                 fh->f_comm,
818                                 MPI_STATUS_IGNORE));
819         if ( OMPI_SUCCESS != ret ) {
820             opal_output (1, "mca_common_ompio_merge_initial_groups: error in Recv\n");
821             return ret;
822         }
823 
824 	fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
825 	if (NULL == fh->f_procs_in_group) {
826 	    opal_output (1, "OUT OF MEMORY\n");
827 	    return OMPI_ERR_OUT_OF_RESOURCE;
828 	}
829 
830 	ret = MCA_PML_CALL(recv(fh->f_procs_in_group,
831                                 fh->f_procs_per_group,
832                                 MPI_INT,
833                                 MPI_ANY_SOURCE,
834                                 OMPIO_PROCS_IN_GROUP_TAG,
835                                 fh->f_comm,
836                                 MPI_STATUS_IGNORE));
837         if ( OMPI_SUCCESS != ret ) {
838             opal_output (1, "mca_common_ompio_merge_initial_groups: error in Recv 2\n");
839             return ret;
840         }
841 
842     }
843 
844     if(is_new_aggregator) {
845 	ret = ompi_request_wait_all (r, sendreqs, MPI_STATUSES_IGNORE);
846     }
847 
848 exit:
849     if (NULL != sendreqs) {
850         free(sendreqs);
851     }
852 
853     return ret;
854 }
855 
mca_common_ompio_split_initial_groups(mca_io_ompio_file_t * fh,OMPI_MPI_OFFSET_TYPE * start_offsets_lens,OMPI_MPI_OFFSET_TYPE * end_offsets,OMPI_MPI_OFFSET_TYPE bytes_per_group)856 int mca_common_ompio_split_initial_groups(mca_io_ompio_file_t *fh,
857 		                      OMPI_MPI_OFFSET_TYPE *start_offsets_lens,
858 				      OMPI_MPI_OFFSET_TYPE *end_offsets,
859 				      OMPI_MPI_OFFSET_TYPE bytes_per_group){
860 
861 
862     int size_new_group = 0;
863     int size_old_group = 0;
864     int size_last_group = 0;
865     int size_smallest_group = 0;
866     int num_groups = 0;
867     int ret = OMPI_SUCCESS;
868 
869     OMPI_MPI_OFFSET_TYPE max_cci = 0;
870     OMPI_MPI_OFFSET_TYPE min_cci = 0;
871 
872     // integer round up
873     size_new_group = (int)(OMPIO_MCA_GET(fh, bytes_per_agg) / bytes_per_group + (OMPIO_MCA_GET(fh, bytes_per_agg) % bytes_per_group ? 1u : 0u));
874     size_old_group = fh->f_init_procs_per_group;
875 
876     ret = mca_common_ompio_split_a_group(fh,
877                                      start_offsets_lens,
878                                      end_offsets,
879                                      size_new_group,
880                                      &max_cci,
881                                      &min_cci,
882                                      &num_groups,
883                                      &size_smallest_group);
884     if (OMPI_SUCCESS != ret ) {
885         opal_output (1, "mca_common_ompio_split_initial_groups: error in mca_common_ompio_split_a_group\n");
886         return ret;
887     }
888 
889 
890     switch( OMPIO_MCA_GET(fh,grouping_option)){
891         case DATA_VOLUME:
892             //Just use size as returned by split group
893             size_last_group = size_smallest_group;
894 	break;
895 
896 	case UNIFORM_DISTRIBUTION:
897 	    if(size_smallest_group <= OMPIO_UNIFORM_DIST_THRESHOLD * size_new_group){
898 	        //uneven split need to call split again
899 	        if( size_old_group % num_groups == 0 ){
900 	           //most even distribution possible
901 	           size_new_group = size_old_group / num_groups;
902 	           size_last_group = size_new_group;
903 	        }
904 	        else{
905 	            //merge the last small group with the previous group
906 	            size_last_group = size_new_group + size_smallest_group;
907 	        }
908 	    }
909 	    else{
910 	         //Considered uniform
911 	         size_last_group = size_smallest_group;
912 	    }
913 	break;
914 
915 	case CONTIGUITY:
916 
917    	    while(1){
918 		 if((max_cci < OMPIO_CONTG_THRESHOLD) &&
919 		    (size_new_group < size_old_group)){
920 
921                     size_new_group = (size_new_group + size_old_group ) / 2;
922   	            ret = mca_common_ompio_split_a_group(fh,
923                                                      start_offsets_lens,
924                                                      end_offsets,
925                                                      size_new_group,
926                                                      &max_cci,
927                                                      &min_cci,
928                                                      &num_groups,
929                                                      &size_smallest_group);
930                     if (OMPI_SUCCESS != ret ) {
931                         opal_output (1, "mca_common_ompio_split_initial_groups: error in mca_common_ompio_split_a_group 2\n");
932                         return ret;
933                     }
934                  }
935                  else{
936                      break;
937                  }
938             }
939 	    size_last_group = size_smallest_group;
940 	break;
941 
942 	case OPTIMIZE_GROUPING:
943             //This case is a combination of Data volume, contiguity and uniform distribution
944 	    while(1){
945 	         if((max_cci < OMPIO_CONTG_THRESHOLD) &&
946 	            (size_new_group < size_old_group)){  //can be a better condition
947                  //monitor the previous iteration
948 		 //break if it has not changed.
949                      size_new_group = size_new_group + size_old_group;
950                      // integer round up
951                      size_new_group = size_new_group / 2 + (size_new_group % 2 ? 1 : 0);
952 		     ret = mca_common_ompio_split_a_group(fh,
953                                                       start_offsets_lens,
954                                                       end_offsets,
955                                                       size_new_group,
956                                                       &max_cci,
957                                                       &min_cci,
958                                                       &num_groups,
959                                                       &size_smallest_group);
960                     if (OMPI_SUCCESS != ret ) {
961                         opal_output (1, "mca_common_ompio_split_initial_groups: error in mca_common_ompio_split_a_group 3\n");
962                         return ret;
963                     }
964 		 }
965 		 else{
966 		     break;
967 		 }
968 	    }
969 
970 	   if(size_smallest_group <= OMPIO_UNIFORM_DIST_THRESHOLD * size_new_group){
971 	       //uneven split need to call split again
972 	       if( size_old_group % num_groups == 0 ){
973 	           //most even distribution possible
974 	           size_new_group = size_old_group / num_groups;
975 		   size_last_group = size_new_group;
976 	       }
977 	       else{
978 	            //merge the last small group with the previous group
979 	            size_last_group = size_new_group + size_smallest_group;
980 	       }
981 	   }
982 	   else{
983 	       //Considered uniform
984 	       size_last_group = size_smallest_group;
985 	   }
986 
987 	break;
988     }
989 
990     ret = mca_common_ompio_finalize_split(fh, size_new_group, size_last_group);
991 
992     return ret;
993 }
994 
995 
mca_common_ompio_retain_initial_groups(mca_io_ompio_file_t * fh)996 int mca_common_ompio_retain_initial_groups(mca_io_ompio_file_t *fh){
997 
998     int i = 0;
999 
1000     fh->f_procs_per_group = fh->f_init_procs_per_group;
1001     fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
1002     if (NULL == fh->f_procs_in_group) {
1003         opal_output (1, "OUT OF MEMORY\n");
1004         return OMPI_ERR_OUT_OF_RESOURCE;
1005     }
1006     for( i = 0 ; i < fh->f_procs_per_group; i++){
1007         fh->f_procs_in_group[i] = fh->f_init_procs_in_group[i];
1008     }
1009 
1010 
1011     return OMPI_SUCCESS;
1012 }
1013 
mca_common_ompio_merge_groups(mca_io_ompio_file_t * fh,int * merge_aggrs,int num_merge_aggrs)1014 int mca_common_ompio_merge_groups(mca_io_ompio_file_t *fh,
1015 		              int *merge_aggrs,
1016 			      int num_merge_aggrs)
1017 {
1018     int i = 0;
1019     int *sizes_old_group;
1020     int ret;
1021     int *displs = NULL;
1022 
1023     sizes_old_group = (int*)malloc(num_merge_aggrs * sizeof(int));
1024     if (NULL == sizes_old_group) {
1025         opal_output (1, "OUT OF MEMORY\n");
1026         ret = OMPI_ERR_OUT_OF_RESOURCE;
1027         goto exit;
1028     }
1029 
1030 
1031     displs = (int*)malloc(num_merge_aggrs * sizeof(int));
1032     if (NULL == displs) {
1033         opal_output (1, "OUT OF MEMORY\n");
1034         ret = OMPI_ERR_OUT_OF_RESOURCE;
1035         goto exit;
1036     }
1037 
1038 
1039     //merge_aggrs[0] is considered the new aggregator
1040     //New aggregator collects group sizes of the groups to be merged
1041     ret = ompi_fcoll_base_coll_allgather_array (&fh->f_init_procs_per_group,
1042                                            1,
1043                                            MPI_INT,
1044                                            sizes_old_group,
1045                                            1,
1046                                            MPI_INT,
1047                                            0,
1048                                            merge_aggrs,
1049                                            num_merge_aggrs,
1050                                            fh->f_comm);
1051 
1052     if ( OMPI_SUCCESS != ret ) {
1053         goto exit;
1054     }
1055     fh->f_procs_per_group = 0;
1056 
1057 
1058     for( i = 0; i < num_merge_aggrs; i++){
1059         fh->f_procs_per_group = fh->f_procs_per_group + sizes_old_group[i];
1060     }
1061 
1062     displs[0] = 0;
1063     for(i = 1; i < num_merge_aggrs; i++){
1064 	  displs[i] = displs[i-1] + sizes_old_group[i-1];
1065     }
1066 
1067     fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
1068     if (NULL == fh->f_procs_in_group) {
1069         opal_output (1, "OUT OF MEMORY\n");
1070         ret = OMPI_ERR_OUT_OF_RESOURCE;
1071         goto exit;
1072     }
1073 
1074     //New aggregator also collects the grouping distribution
1075     //This is the actual merge
1076     //use allgatherv array
1077     ret = ompi_fcoll_base_coll_allgatherv_array (fh->f_init_procs_in_group,
1078                                             fh->f_init_procs_per_group,
1079                                             MPI_INT,
1080                                             fh->f_procs_in_group,
1081                                             sizes_old_group,
1082                                             displs,
1083                                             MPI_INT,
1084                                             0,
1085                                             merge_aggrs,
1086                                             num_merge_aggrs,
1087                                             fh->f_comm);
1088 
1089 exit:
1090     if (NULL != displs) {
1091         free (displs);
1092     }
1093     if (NULL != sizes_old_group) {
1094         free (sizes_old_group);
1095     }
1096 
1097     return ret;
1098 
1099 }
1100 
1101 
1102 
mca_common_ompio_split_a_group(mca_io_ompio_file_t * fh,OMPI_MPI_OFFSET_TYPE * start_offsets_lens,OMPI_MPI_OFFSET_TYPE * end_offsets,int size_new_group,OMPI_MPI_OFFSET_TYPE * max_cci,OMPI_MPI_OFFSET_TYPE * min_cci,int * num_groups,int * size_smallest_group)1103 int mca_common_ompio_split_a_group(mca_io_ompio_file_t *fh,
1104      		             OMPI_MPI_OFFSET_TYPE *start_offsets_lens,
1105 		             OMPI_MPI_OFFSET_TYPE *end_offsets,
1106 		             int size_new_group,
1107 		             OMPI_MPI_OFFSET_TYPE *max_cci,
1108 		             OMPI_MPI_OFFSET_TYPE *min_cci,
1109 		             int *num_groups,
1110 		             int *size_smallest_group)
1111 {
1112 
1113     OMPI_MPI_OFFSET_TYPE *cci = NULL;
1114     *num_groups = fh->f_init_procs_per_group / size_new_group;
1115     *size_smallest_group = size_new_group;
1116     int i = 0;
1117     int k = 0;
1118     int flag = 0; //all groups same size
1119     int size = 0;
1120 
1121     if( fh->f_init_procs_per_group % size_new_group != 0 ){
1122         *num_groups = *num_groups + 1;
1123 	*size_smallest_group = fh->f_init_procs_per_group % size_new_group;
1124 	flag = 1;
1125     }
1126 
1127     cci = (OMPI_MPI_OFFSET_TYPE*)malloc(*num_groups * sizeof( OMPI_MPI_OFFSET_TYPE ));
1128     if (NULL == cci) {
1129         opal_output(1, "OUT OF MEMORY\n");
1130         return OMPI_ERR_OUT_OF_RESOURCE;
1131     }
1132 
1133     //check contiguity within new groups
1134     size = size_new_group;
1135     for( i = 0; i < *num_groups; i++){
1136          cci[i] = start_offsets_lens[3*size_new_group*i  + 1];
1137          //if it is the last group check if it is the smallest group
1138 	 if( (i == *num_groups-1) && flag == 1){
1139              size = *size_smallest_group;
1140 	 }
1141 	 for( k = 0; k < size-1; k++){
1142 	     if( end_offsets[size_new_group* i + k] == start_offsets_lens[3*size_new_group*i + 3*(k+1)] ){
1143 	         cci[i] += start_offsets_lens[3*size_new_group*i + 3*(k + 1) + 1];
1144 	     }
1145        	 }
1146      }
1147 
1148      //get min and max cci
1149      *min_cci = cci[0];
1150      *max_cci = cci[0];
1151      for( i = 1 ; i < *num_groups; i++){
1152          if(cci[i] > *max_cci){
1153 	     *max_cci = cci[i];
1154 	 }
1155 	 else if(cci[i] < *min_cci){
1156 	     *min_cci = cci[i];
1157 	 }
1158      }
1159 
1160      free (cci);
1161      return OMPI_SUCCESS;
1162 }
1163 
mca_common_ompio_finalize_split(mca_io_ompio_file_t * fh,int size_new_group,int size_last_group)1164 int mca_common_ompio_finalize_split(mca_io_ompio_file_t *fh,
1165                                   int size_new_group,
1166                                   int size_last_group)
1167 {
1168    //based on new group and last group finalize f_procs_per_group and f_procs_in_group
1169 
1170     int i = 0;
1171     int j = 0;
1172     int k = 0;
1173 
1174     for( i = 0; i < fh->f_init_procs_per_group ; i++){
1175 
1176         if( fh->f_rank == fh->f_init_procs_in_group[i]){
1177              if( i >= fh->f_init_procs_per_group - size_last_group ){
1178 	         fh->f_procs_per_group = size_last_group;
1179 	     }
1180              else{
1181 	         fh->f_procs_per_group = size_new_group;
1182 	     }
1183         }
1184     }
1185 
1186 
1187     fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
1188     if (NULL == fh->f_procs_in_group) {
1189         opal_output (1, "OUT OF MEMORY\n");
1190         return OMPI_ERR_OUT_OF_RESOURCE;
1191     }
1192 
1193     for( i = 0; i < fh->f_init_procs_per_group ; i++){
1194         if( fh->f_rank == fh->f_init_procs_in_group[i]){
1195             if( i >= fh->f_init_procs_per_group - size_last_group ){
1196 	       //distribution of last group
1197 	       for( j = 0; j < fh->f_procs_per_group; j++){
1198 	           fh->f_procs_in_group[j] = fh->f_init_procs_in_group[fh->f_init_procs_per_group - size_last_group + j];
1199 	       }
1200 	    }
1201 	    else{
1202 	         //distribute all other groups
1203 		 for( j = 0 ; j < fh->f_init_procs_per_group; j = j + size_new_group){
1204 	             if(i >= j && i < j+size_new_group  ){
1205                          for( k = 0; k < fh->f_procs_per_group ; k++){
1206 	                    fh->f_procs_in_group[k] = fh->f_init_procs_in_group[j+k];
1207 			 }
1208 		     }
1209 		 }
1210 	    }
1211 
1212         }
1213     }
1214 
1215     return OMPI_SUCCESS;
1216 }
1217 
mca_common_ompio_prepare_to_group(mca_io_ompio_file_t * fh,OMPI_MPI_OFFSET_TYPE ** start_offsets_lens,OMPI_MPI_OFFSET_TYPE ** end_offsets,OMPI_MPI_OFFSET_TYPE ** aggr_bytes_per_group,OMPI_MPI_OFFSET_TYPE * bytes_per_group,int ** decision_list,size_t bytes_per_proc,int * is_aggregator,int * ompio_grouping_flag)1218 int mca_common_ompio_prepare_to_group(mca_io_ompio_file_t *fh,
1219 		                  OMPI_MPI_OFFSET_TYPE **start_offsets_lens,
1220 				  OMPI_MPI_OFFSET_TYPE **end_offsets, // need it?
1221 				  OMPI_MPI_OFFSET_TYPE **aggr_bytes_per_group,
1222 				  OMPI_MPI_OFFSET_TYPE *bytes_per_group,
1223                                   int **decision_list,
1224 		                  size_t bytes_per_proc,
1225 				  int *is_aggregator,
1226 				  int *ompio_grouping_flag)
1227 {
1228 
1229     OMPI_MPI_OFFSET_TYPE start_offset_len[3] = {0};
1230     OMPI_MPI_OFFSET_TYPE *aggr_bytes_per_group_tmp = NULL;
1231     OMPI_MPI_OFFSET_TYPE *start_offsets_lens_tmp = NULL;
1232     OMPI_MPI_OFFSET_TYPE *end_offsets_tmp = NULL;
1233     int *decision_list_tmp = NULL;
1234 
1235     int i = 0;
1236     int j = 0;
1237     int k = 0;
1238     int merge_count = 0;
1239     int split_count = 0; //not req?
1240     int retain_as_is_count = 0; //not req?
1241     int ret=OMPI_SUCCESS;
1242 
1243     //Store start offset and length in an array //also add bytes per process
1244     if(NULL == fh->f_decoded_iov){
1245          start_offset_len[0] = 0;
1246          start_offset_len[1] = 0;
1247     }
1248     else{
1249          start_offset_len[0] = (OMPI_MPI_OFFSET_TYPE) fh->f_decoded_iov[0].iov_base;
1250          start_offset_len[1] = fh->f_decoded_iov[0].iov_len;
1251     }
1252     start_offset_len[2] = bytes_per_proc;
1253     start_offsets_lens_tmp = (OMPI_MPI_OFFSET_TYPE* )malloc (3 * fh->f_init_procs_per_group * sizeof(OMPI_MPI_OFFSET_TYPE));
1254     if (NULL == start_offsets_lens_tmp) {
1255         opal_output (1, "OUT OF MEMORY\n");
1256         return OMPI_ERR_OUT_OF_RESOURCE;
1257     }
1258 
1259     //Gather start offsets across processes in a group on aggregator
1260     ret = ompi_fcoll_base_coll_allgather_array (start_offset_len,
1261                                            3,
1262                                            OMPI_OFFSET_DATATYPE,
1263                                            start_offsets_lens_tmp,
1264                                            3,
1265                                            OMPI_OFFSET_DATATYPE,
1266                                            0,
1267                                            fh->f_init_procs_in_group,
1268                                            fh->f_init_procs_per_group,
1269                                            fh->f_comm);
1270     if ( OMPI_SUCCESS != ret ) {
1271         opal_output (1, "mca_common_ompio_prepare_to_grou[: error in ompi_fcoll_base_coll_allgather_array\n");
1272         goto exit;
1273     }
1274     end_offsets_tmp = (OMPI_MPI_OFFSET_TYPE* )malloc (fh->f_init_procs_per_group * sizeof(OMPI_MPI_OFFSET_TYPE));
1275     if (NULL == end_offsets_tmp) {
1276         opal_output (1, "OUT OF MEMORY\n");
1277         goto exit;
1278     }
1279     for( k = 0 ; k < fh->f_init_procs_per_group; k++){
1280         end_offsets_tmp[k] = start_offsets_lens_tmp[3*k] + start_offsets_lens_tmp[3*k+1];
1281     }
1282     //Every process has the total bytes written in its group
1283     for(j = 0; j < fh->f_init_procs_per_group; j++){
1284         *bytes_per_group = *bytes_per_group + start_offsets_lens_tmp[3*j+2];
1285     }
1286 
1287     *start_offsets_lens = &start_offsets_lens_tmp[0];
1288     *end_offsets = &end_offsets_tmp[0];
1289 
1290 
1291     for( j = 0 ; j < fh->f_init_num_aggrs ; j++){
1292         if(fh->f_rank == fh->f_init_aggr_list[j])
1293            *is_aggregator = 1;
1294     }
1295     //Decide groups going in for a merge or a split
1296     //Merge only if the groups are consecutive
1297     if(*is_aggregator == 1){
1298        aggr_bytes_per_group_tmp = (OMPI_MPI_OFFSET_TYPE*)malloc (fh->f_init_num_aggrs * sizeof(OMPI_MPI_OFFSET_TYPE));
1299        if (NULL == aggr_bytes_per_group_tmp) {
1300           opal_output (1, "OUT OF MEMORY\n");
1301           ret = OMPI_ERR_OUT_OF_RESOURCE;
1302           goto exit;
1303        }
1304     decision_list_tmp = (int* )malloc (fh->f_init_num_aggrs * sizeof(int));
1305     if (NULL == decision_list_tmp) {
1306         opal_output (1, "OUT OF MEMORY\n");
1307         ret = OMPI_ERR_OUT_OF_RESOURCE;
1308         goto exit;
1309     }
1310     //Communicate bytes per group between all aggregators
1311     ret = ompi_fcoll_base_coll_allgather_array (bytes_per_group,
1312                                            1,
1313                                            OMPI_OFFSET_DATATYPE,
1314                                            aggr_bytes_per_group_tmp,
1315                                            1,
1316                                            OMPI_OFFSET_DATATYPE,
1317                                            0,
1318                                            fh->f_init_aggr_list,
1319                                            fh->f_init_num_aggrs,
1320                                            fh->f_comm);
1321     if ( OMPI_SUCCESS != ret ) {
1322         opal_output (1, "mca_common_ompio_prepare_to_grou[: error in ompi_fcoll_base_coll_allgather_array 2\n");
1323         free(decision_list_tmp);
1324         goto exit;
1325     }
1326 
1327     for( i = 0; i < fh->f_init_num_aggrs; i++){
1328        if((size_t)(aggr_bytes_per_group_tmp[i])>
1329           (size_t)OMPIO_MCA_GET(fh, bytes_per_agg)){
1330           decision_list_tmp[i] = OMPIO_SPLIT;
1331           split_count++;
1332        }
1333        else if((size_t)(aggr_bytes_per_group_tmp[i])<
1334                (size_t)OMPIO_MCA_GET(fh,bytes_per_agg)){
1335             decision_list_tmp[i] = OMPIO_MERGE;
1336             merge_count++;
1337        }
1338        else{
1339 	   decision_list_tmp[i] = OMPIO_RETAIN;
1340 	   retain_as_is_count++;
1341 	   }
1342     }
1343 
1344     *aggr_bytes_per_group = &aggr_bytes_per_group_tmp[0];
1345     //Go through the decision list to see if non consecutive
1346     //processes intend to merge, if yes retain original grouping
1347     for( i = 0; i < fh->f_init_num_aggrs ; i++){
1348         if(decision_list_tmp[i] == OMPIO_MERGE){
1349 	    if( (i == 0) &&
1350 	        (decision_list_tmp[i+1] != OMPIO_MERGE)){ //first group
1351 		    decision_list_tmp[i] = OMPIO_RETAIN;
1352             }
1353 	    else if( (i == fh->f_init_num_aggrs-1) &&
1354 	             (decision_list_tmp[i-1] != OMPIO_MERGE)){
1355 
1356 	        decision_list_tmp[i] = OMPIO_RETAIN;
1357 	    }
1358 	    else if(!((decision_list_tmp[i-1] == OMPIO_MERGE) ||
1359                       (decision_list_tmp[i+1] == OMPIO_MERGE))){
1360 
1361 		 decision_list_tmp[i] = OMPIO_RETAIN;
1362 	    }
1363         }
1364     }
1365 
1366     //Set the flag as per the decision list
1367     for( i = 0 ; i < fh->f_init_num_aggrs; i++){
1368         if((decision_list_tmp[i] == OMPIO_MERGE)&&
1369 	   (fh->f_rank == fh->f_init_aggr_list[i]))
1370            *ompio_grouping_flag = OMPIO_MERGE;
1371 
1372        	if((decision_list_tmp[i] == OMPIO_SPLIT)&&
1373 	   (fh->f_rank == fh->f_init_aggr_list[i]))
1374            *ompio_grouping_flag = OMPIO_SPLIT;
1375 
1376 	if((decision_list_tmp[i] == OMPIO_RETAIN)&&
1377 	   (fh->f_rank == fh->f_init_aggr_list[i]))
1378            *ompio_grouping_flag = OMPIO_RETAIN;
1379     }
1380 
1381     //print decision list of aggregators
1382     /*printf("RANK%d  : Printing decsion list   : \n",fh->f_rank);
1383     for( i = 0; i < fh->f_init_num_aggrs; i++){
1384         if(decision_list_tmp[i] == OMPIO_MERGE)
1385             printf("MERGE,");
1386         else if(decision_list_tmp[i] == OMPIO_SPLIT)
1387             printf("SPLIT, ");
1388 	else if(decision_list_tmp[i] == OMPIO_RETAIN)
1389 	    printf("RETAIN, " );
1390     }
1391     printf("\n\n");
1392     */
1393     *decision_list = &decision_list_tmp[0];
1394     }
1395     //Communicate flag to all group members
1396     ret = ompi_fcoll_base_coll_bcast_array (ompio_grouping_flag,
1397                                        1,
1398                                        MPI_INT,
1399                                        0,
1400                                        fh->f_init_procs_in_group,
1401                                        fh->f_init_procs_per_group,
1402                                        fh->f_comm);
1403 
1404 exit:
1405     /* Do not free aggr_bytes_per_group_tmp,
1406     ** start_offsets_lens_tmp, and end_offsets_tmp
1407     ** here. The memory is released in the layer above.
1408     */
1409 
1410 
1411     return ret;
1412 }
1413 
1414 /*
1415 ** This is the actual formula of the cost function from the paper.
1416 ** One change made here is to use floating point values for
1417 ** all parameters, since the ceil() function leads to sometimes
1418 ** unexpected jumps in the execution time. Using float leads to
1419 ** more consistent predictions for the no. of aggregators.
1420 */
cost_calc(int P,int P_a,size_t d_p,size_t b_c,int dim)1421 static double cost_calc (int P, int P_a, size_t d_p, size_t b_c, int dim )
1422 {
1423     float  n_as=1.0, m_s=1.0, n_s=1.0;
1424     float  n_ar=1.0;
1425     double t_send, t_recv, t_tot;
1426 
1427     /* LogGP parameters based on DDR InfiniBand values */
1428     double L=.00000184;
1429     double o=.00000149;
1430     double g=.0000119;
1431     double G=.00000000067;
1432 
1433     long file_domain = (P * d_p) / P_a;
1434     float n_r = (float)file_domain/(float) b_c;
1435 
1436     switch (dim) {
1437 	case DIM1:
1438 	{
1439 	    if( d_p > b_c ){
1440 		//printf("case 1\n");
1441 		n_ar = 1;
1442 		n_as = 1;
1443 		m_s = b_c;
1444 		n_s = (float)d_p/(float)b_c;
1445 	    }
1446 	    else {
1447 		n_ar = (float)b_c/(float)d_p;
1448 		n_as = 1;
1449 		m_s = d_p;
1450 		n_s = 1;
1451 	    }
1452 	    break;
1453 	}
1454 	case DIM2:
1455 	{
1456 	    int P_x, P_y, c;
1457 
1458 	    P_x = P_y = (int) sqrt(P);
1459 	    c = (float) P_a / (float)P_x;
1460 
1461 	    n_ar = (float) P_y;
1462 	    n_as = (float) c;
1463 	    if ( d_p > (P_a*b_c/P )) {
1464 		m_s = fmin(b_c / P_y, d_p);
1465 	    }
1466 	    else {
1467 		m_s = fmin(d_p * P_x / P_a, d_p);
1468 	    }
1469 	    break;
1470 	}
1471 	default :
1472 	    printf("stop putting random values\n");
1473 	    break;
1474     }
1475 
1476     n_s = (float) d_p / (float)(n_as * m_s);
1477 
1478     if( m_s < 33554432) {
1479 	g = .00000108;
1480     }
1481     t_send = n_s * (L + 2 * o + (n_as -1) * g + (m_s - 1) * n_as * G);
1482     t_recv=  n_r * (L + 2 * o + (n_ar -1) * g + (m_s - 1) * n_ar * G);;
1483     t_tot = t_send + t_recv;
1484 
1485     return t_tot;
1486 }
1487 
1488