1 /* -*- Mode: C; c-basic-offset:4 ; -*- */
2 /*
3 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
4 * University Research and Technology
5 * Corporation. All rights reserved.
6 * Copyright (c) 2004-2017 The University of Tennessee and The University
7 * of Tennessee Research Foundation. All rights
8 * reserved.
9 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
10 * University of Stuttgart. All rights reserved.
11 * Copyright (c) 2004-2005 The Regents of the University of California.
12 * All rights reserved.
13 * Copyright (c) 2008-2017 University of Houston. All rights reserved.
14 * Copyright (c) 2011-2015 Cisco Systems, Inc. All rights reserved.
15 * Copyright (c) 2012-2013 Inria. All rights reserved.
16 * Copyright (c) 2015-2017 Research Organization for Information Science
17 * and Technology (RIST). All rights reserved.
18 * Copyright (c) 2017 IBM Corporation. All rights reserved.
19 * $COPYRIGHT$
20 *
21 * Additional copyrights may follow
22 *
23 * $HEADER$
24 */
25
26 #include "ompi_config.h"
27
28 #include "ompi/runtime/params.h"
29 #include "ompi/communicator/communicator.h"
30 #include "ompi/mca/pml/pml.h"
31 #include "ompi/mca/topo/topo.h"
32 #include "ompi/mca/fcoll/base/fcoll_base_coll_array.h"
33 #include "opal/datatype/opal_convertor.h"
34 #include "opal/datatype/opal_datatype.h"
35 #include "ompi/datatype/ompi_datatype.h"
36 #include "ompi/info/info.h"
37 #include "ompi/request/request.h"
38
39 #include <math.h>
40 #include <unistd.h>
41
42 #include "common_ompio_aggregators.h"
43
44 /*
45 ** This file contains all the functionality related to determing the number of aggregators
46 ** and the list of aggregators.
47 **
48 ** The first group functions determines the number of aggregators based on various characteristics
49 **
50 ** 1. simple_grouping: A heuristic based on a cost model
51 ** 2. fview_based_grouping: analysis the fileview to detect regular patterns
52 ** 3. cart_based_grouping: uses a cartesian communicator to derive certain (probable) properties
53 ** of the access pattern
54 */
55
56
57 static double cost_calc (int P, int P_agg, size_t Data_proc, size_t coll_buffer, int dim );
58 #define DIM1 1
59 #define DIM2 2
60
mca_common_ompio_simple_grouping(mca_io_ompio_file_t * fh,int * num_groups_out,mca_common_ompio_contg * contg_groups)61 int mca_common_ompio_simple_grouping(mca_io_ompio_file_t *fh,
62 int *num_groups_out,
63 mca_common_ompio_contg *contg_groups)
64 {
65 int group_size = 0;
66 int k=0, p=0, g=0;
67 int total_procs = 0;
68 int num_groups=1;
69
70 double time=0.0, time_prev=0.0, dtime=0.0, dtime_abs=0.0, dtime_diff=0.0, dtime_prev=0.0;
71 double dtime_threshold=0.0;
72
73 /* This is the threshold for absolute improvement. It is not
74 ** exposed as an MCA parameter to avoid overwhelming users. It is
75 ** mostly relevant for smaller process counts and data volumes.
76 */
77 double time_threshold=0.001;
78
79 int incr=1, mode=1;
80 int P_a, P_a_prev;
81
82 /* The aggregator selection algorithm is based on the formulas described
83 ** in: Shweta Jha, Edgar Gabriel, 'Performance Models for Communication in
84 ** Collective I/O operations', Proceedings of the 17th IEEE/ACM Symposium
85 ** on Cluster, Cloud and Grid Computing, Workshop on Theoretical
86 ** Approaches to Performance Evaluation, Modeling and Simulation, 2017.
87 **
88 ** The current implementation is based on the 1-D and 2-D models derived for the even
89 ** file partitioning strategy in the paper. Note, that the formulas currently only model
90 ** the communication aspect of collective I/O operations. There are two extensions in this
91 ** implementation:
92 **
93 ** 1. Since the resulting formula has an asymptotic behavior w.r.t. the
94 ** no. of aggregators, this version determines the no. of aggregators to
95 ** be used iteratively and stops increasing the no. of aggregators if the
96 ** benefits of increasing the aggregators is below a certain threshold
97 ** value relative to the last number tested. The aggresivnes of cutting of
98 ** the increasie in the number of aggregators is controlled by the new mca
99 ** parameter mca_io_ompio_aggregator_cutoff_threshold. Lower values for
100 ** this parameter will lead to higher number of aggregators (useful e.g
101 ** for PVFS2 and GPFS file systems), while higher number will lead to
102 ** lower no. of aggregators (useful for regular UNIX or NFS file systems).
103 **
104 ** 2. The algorithm further caps the maximum no. of aggregators used to not exceed
105 ** (no. of processes / mca_io_ompio_max_aggregators_ratio), i.e. a higher value
106 ** for mca_io_ompio_max_aggregators will decrease the maximum number of aggregators
107 ** allowed for the given no. of processes.
108 */
109 dtime_threshold = (double) OMPIO_MCA_GET(fh, aggregators_cutoff_threshold) / 100.0;
110
111 /* Determine whether to use the formula for 1-D or 2-D data decomposition. Anything
112 ** that is not 1-D is assumed to be 2-D in this version
113 */
114 mode = ( fh->f_cc_size == fh->f_view_size ) ? 1 : 2;
115
116 /* Determine the increment size when searching the optimal
117 ** no. of aggregators
118 */
119 if ( fh->f_size < 16 ) {
120 incr = 2;
121 }
122 else if (fh->f_size < 128 ) {
123 incr = 4;
124 }
125 else if ( fh->f_size < 4096 ) {
126 incr = 16;
127 }
128 else {
129 incr = 32;
130 }
131
132 P_a = 1;
133 time_prev = cost_calc ( fh->f_size, P_a, fh->f_view_size, (size_t) fh->f_bytes_per_agg, mode );
134 P_a_prev = P_a;
135 for ( P_a = incr; P_a <= fh->f_size; P_a += incr ) {
136 time = cost_calc ( fh->f_size, P_a, fh->f_view_size, (size_t) fh->f_bytes_per_agg, mode );
137 dtime_abs = (time_prev - time);
138 dtime = dtime_abs / time_prev;
139 dtime_diff = ( P_a == incr ) ? dtime : (dtime_prev - dtime);
140 #ifdef OMPIO_DEBUG
141 if ( 0 == fh->f_rank ){
142 printf(" d_p = %ld P_a = %d time = %lf dtime = %lf dtime_abs =%lf dtime_diff=%lf\n",
143 fh->f_view_size, P_a, time, dtime, dtime_abs, dtime_diff );
144 }
145 #endif
146 if ( dtime_diff < dtime_threshold ) {
147 /* The relative improvement compared to the last number
148 ** of aggregators was below a certain threshold. This is typically
149 ** the dominating factor for large data volumes and larger process
150 ** counts
151 */
152 #ifdef OMPIO_DEBUG
153 if ( 0 == fh->f_rank ) {
154 printf("dtime_diff below threshold\n");
155 }
156 #endif
157 break;
158 }
159 if ( dtime_abs < time_threshold ) {
160 /* The absolute improvement compared to the last number
161 ** of aggregators was below a given threshold. This is typically
162 ** important for small data valomes and smallers process counts
163 */
164 #ifdef OMPIO_DEBUG
165 if ( 0 == fh->f_rank ) {
166 printf("dtime_abs below threshold\n");
167 }
168 #endif
169 break;
170 }
171 time_prev = time;
172 dtime_prev = dtime;
173 P_a_prev = P_a;
174 }
175 num_groups = P_a_prev;
176 #ifdef OMPIO_DEBUG
177 printf(" For P=%d d_p=%ld b_c=%d threshold=%f chosen P_a = %d \n",
178 fh->f_size, fh->f_view_size, fh->f_bytes_per_agg, dtime_threshold, P_a_prev);
179 #endif
180
181 /* Cap the maximum number of aggregators.*/
182 if ( num_groups > (fh->f_size/OMPIO_MCA_GET(fh, max_aggregators_ratio))) {
183 num_groups = (fh->f_size/OMPIO_MCA_GET(fh, max_aggregators_ratio));
184 }
185 if ( 1 >= num_groups ) {
186 num_groups = 1;
187 }
188 group_size = fh->f_size / num_groups;
189
190 for ( k=0, p=0; p<num_groups; p++ ) {
191 if ( p == (num_groups - 1) ) {
192 contg_groups[p].procs_per_contg_group = fh->f_size - total_procs;
193 }
194 else {
195 contg_groups[p].procs_per_contg_group = group_size;
196 total_procs +=group_size;
197 }
198 for ( g=0; g<contg_groups[p].procs_per_contg_group; g++ ) {
199 contg_groups[p].procs_in_contg_group[g] = k;
200 k++;
201 }
202 }
203
204 *num_groups_out = num_groups;
205 return OMPI_SUCCESS;
206 }
207
mca_common_ompio_fview_based_grouping(mca_io_ompio_file_t * fh,int * num_groups,mca_common_ompio_contg * contg_groups)208 int mca_common_ompio_fview_based_grouping(mca_io_ompio_file_t *fh,
209 int *num_groups,
210 mca_common_ompio_contg *contg_groups)
211 {
212
213 int k = 0;
214 int p = 0;
215 int g = 0;
216 int ret = OMPI_SUCCESS;
217 OMPI_MPI_OFFSET_TYPE start_offset_len[3] = {0};
218 OMPI_MPI_OFFSET_TYPE *end_offsets = NULL;
219 OMPI_MPI_OFFSET_TYPE *start_offsets_lens = NULL;
220
221 //Store start offset,length and corresponding rank in an array
222 if(NULL == fh->f_decoded_iov){
223 start_offset_len[0] = 0;
224 start_offset_len[1] = 0;
225 }
226 else{
227 start_offset_len[0] = (OMPI_MPI_OFFSET_TYPE) fh->f_decoded_iov[0].iov_base;
228 start_offset_len[1] = fh->f_decoded_iov[0].iov_len;
229 }
230 start_offset_len[2] = fh->f_rank;
231
232 start_offsets_lens = (OMPI_MPI_OFFSET_TYPE* )malloc (3 * fh->f_size * sizeof(OMPI_MPI_OFFSET_TYPE));
233 if (NULL == start_offsets_lens) {
234 opal_output (1, "OUT OF MEMORY\n");
235 ret = OMPI_ERR_OUT_OF_RESOURCE;
236 goto exit;
237 }
238 end_offsets = (OMPI_MPI_OFFSET_TYPE* )malloc (fh->f_size * sizeof(OMPI_MPI_OFFSET_TYPE));
239 if (NULL == end_offsets) {
240 opal_output (1, "OUT OF MEMORY\n");
241 ret = OMPI_ERR_OUT_OF_RESOURCE;
242 goto exit;
243 }
244
245 //Allgather start offsets across processes in a group on aggregator
246 ret = fh->f_comm->c_coll->coll_allgather (start_offset_len,
247 3,
248 OMPI_OFFSET_DATATYPE,
249 start_offsets_lens,
250 3,
251 OMPI_OFFSET_DATATYPE,
252 fh->f_comm,
253 fh->f_comm->c_coll->coll_allgather_module);
254 if ( OMPI_SUCCESS != ret ) {
255 goto exit;
256 }
257
258
259 //Calculate contg chunk size and contg subgroups
260 for( k = 0 ; k < fh->f_size; k++){
261 end_offsets[k] = start_offsets_lens[3*k] + start_offsets_lens[3*k+1];
262 contg_groups[k].contg_chunk_size = 0;
263 }
264 k = 0;
265 while( k < fh->f_size){
266 if( k == 0){
267 contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1];
268 contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2];
269 g++;
270 contg_groups[p].procs_per_contg_group = g;
271 k++;
272 }
273 else if( start_offsets_lens[3*k] == end_offsets[k - 1] ){
274 contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1];
275 contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2];
276 g++;
277 contg_groups[p].procs_per_contg_group = g;
278 k++;
279 }
280 else{
281 p++;
282 g = 0;
283 contg_groups[p].contg_chunk_size += start_offsets_lens[3*k+1];
284 contg_groups[p].procs_in_contg_group[g] = start_offsets_lens[3*k + 2];
285 g++;
286 contg_groups[p].procs_per_contg_group = g;
287 k++;
288 }
289 }
290
291 *num_groups = p+1;
292 ret = OMPI_SUCCESS;
293
294 exit:
295 if (NULL != start_offsets_lens) {
296 free (start_offsets_lens);
297 }
298 if (NULL != end_offsets) {
299 free(end_offsets);
300 }
301
302 return ret;
303 }
304
mca_common_ompio_cart_based_grouping(mca_io_ompio_file_t * ompio_fh,int * num_groups,mca_common_ompio_contg * contg_groups)305 int mca_common_ompio_cart_based_grouping(mca_io_ompio_file_t *ompio_fh,
306 int *num_groups,
307 mca_common_ompio_contg *contg_groups)
308 {
309 int k = 0;
310 int g=0;
311 int ret = OMPI_SUCCESS, tmp_rank = 0;
312 int *coords_tmp = NULL;
313
314 mca_common_ompio_cart_topo_components cart_topo;
315 memset (&cart_topo, 0, sizeof(mca_common_ompio_cart_topo_components));
316
317 ret = ompio_fh->f_comm->c_topo->topo.cart.cartdim_get(ompio_fh->f_comm, &cart_topo.ndims);
318 if (OMPI_SUCCESS != ret ) {
319 goto exit;
320 }
321
322 if (cart_topo.ndims < 2 ) {
323 /* We shouldn't be here, this routine only works for more than 1 dimension */
324 ret = MPI_ERR_INTERN;
325 goto exit;
326 }
327
328 cart_topo.dims = (int*)malloc (cart_topo.ndims * sizeof(int));
329 if (NULL == cart_topo.dims) {
330 opal_output (1, "OUT OF MEMORY\n");
331 ret = OMPI_ERR_OUT_OF_RESOURCE;
332 goto exit;
333 }
334 cart_topo.periods = (int*)malloc (cart_topo.ndims * sizeof(int));
335 if (NULL == cart_topo.periods) {
336 opal_output (1, "OUT OF MEMORY\n");
337 ret = OMPI_ERR_OUT_OF_RESOURCE;
338 goto exit;
339 }
340 cart_topo.coords = (int*)malloc (cart_topo.ndims * sizeof(int));
341 if (NULL == cart_topo.coords) {
342 opal_output (1, "OUT OF MEMORY\n");
343 ret = OMPI_ERR_OUT_OF_RESOURCE;
344 goto exit;
345 }
346
347 coords_tmp = (int*)malloc (cart_topo.ndims * sizeof(int));
348 if (NULL == coords_tmp) {
349 opal_output (1, "OUT OF MEMORY\n");
350 ret = OMPI_ERR_OUT_OF_RESOURCE;
351 goto exit;
352 }
353
354 ret = ompio_fh->f_comm->c_topo->topo.cart.cart_get(ompio_fh->f_comm,
355 cart_topo.ndims,
356 cart_topo.dims,
357 cart_topo.periods,
358 cart_topo.coords);
359 if ( OMPI_SUCCESS != ret ) {
360 opal_output (1, "mca_common_ompio_cart_based_grouping: Error in cart_get \n");
361 goto exit;
362 }
363
364 *num_groups = cart_topo.dims[0]; //number of rows
365
366 for(k = 0; k < cart_topo.dims[0]; k++){
367 int done = 0;
368 int index = cart_topo.ndims-1;
369
370 memset ( coords_tmp, 0, cart_topo.ndims * sizeof(int));
371 contg_groups[k].procs_per_contg_group = (ompio_fh->f_size / cart_topo.dims[0]);
372 coords_tmp[0] = k;
373
374 ret = ompio_fh->f_comm->c_topo->topo.cart.cart_rank (ompio_fh->f_comm,coords_tmp,&tmp_rank);
375 if ( OMPI_SUCCESS != ret ) {
376 opal_output (1, "mca_common_ompio_cart_based_grouping: Error in cart_rank\n");
377 goto exit;
378 }
379 contg_groups[k].procs_in_contg_group[0] = tmp_rank;
380
381 for ( g=1; g< contg_groups[k].procs_per_contg_group; g++ ) {
382 done = 0;
383 index = cart_topo.ndims-1;
384
385 while ( ! done ) {
386 coords_tmp[index]++;
387 if ( coords_tmp[index] ==cart_topo.dims[index] ) {
388 coords_tmp[index]=0;
389 index--;
390 }
391 else {
392 done = 1;
393 }
394 if ( index == 0 ) {
395 done = 1;
396 }
397 }
398
399 ret = ompio_fh->f_comm->c_topo->topo.cart.cart_rank (ompio_fh->f_comm,coords_tmp,&tmp_rank);
400 if ( OMPI_SUCCESS != ret ) {
401 opal_output (1, "mca_common_ompio_cart_based_grouping: Error in cart_rank\n");
402 goto exit;
403 }
404 contg_groups[k].procs_in_contg_group[g] = tmp_rank;
405 }
406 }
407
408
409 exit:
410 if (NULL != cart_topo.dims) {
411 free (cart_topo.dims);
412 cart_topo.dims = NULL;
413 }
414 if (NULL != cart_topo.periods) {
415 free (cart_topo.periods);
416 cart_topo.periods = NULL;
417 }
418 if (NULL != cart_topo.coords) {
419 free (cart_topo.coords);
420 cart_topo.coords = NULL;
421 }
422 if (NULL != coords_tmp) {
423 free (coords_tmp);
424 coords_tmp = NULL;
425 }
426
427 return ret;
428 }
429
430
431
mca_common_ompio_finalize_initial_grouping(mca_io_ompio_file_t * fh,int num_groups,mca_common_ompio_contg * contg_groups)432 int mca_common_ompio_finalize_initial_grouping(mca_io_ompio_file_t *fh,
433 int num_groups,
434 mca_common_ompio_contg *contg_groups)
435 {
436
437 int z = 0;
438 int y = 0;
439
440 fh->f_init_num_aggrs = num_groups;
441 if (NULL != fh->f_init_aggr_list) {
442 free(fh->f_init_aggr_list);
443 }
444 fh->f_init_aggr_list = (int*)malloc (fh->f_init_num_aggrs * sizeof(int));
445 if (NULL == fh->f_init_aggr_list) {
446 opal_output (1, "OUT OF MEMORY\n");
447 return OMPI_ERR_OUT_OF_RESOURCE;
448 }
449
450 for( z = 0 ;z < num_groups; z++){
451 for( y = 0; y < contg_groups[z].procs_per_contg_group; y++){
452 if ( fh->f_rank == contg_groups[z].procs_in_contg_group[y] ) {
453 fh->f_init_procs_per_group = contg_groups[z].procs_per_contg_group;
454 if (NULL != fh->f_init_procs_in_group) {
455 free(fh->f_init_procs_in_group);
456 }
457 fh->f_init_procs_in_group = (int*)malloc (fh->f_init_procs_per_group * sizeof(int));
458 if (NULL == fh->f_init_procs_in_group) {
459 opal_output (1, "OUT OF MEMORY\n");
460 return OMPI_ERR_OUT_OF_RESOURCE;
461 }
462 memcpy ( fh->f_init_procs_in_group, contg_groups[z].procs_in_contg_group,
463 contg_groups[z].procs_per_contg_group * sizeof (int));
464
465 }
466 }
467 }
468
469 for( z = 0 ;z < num_groups; z++){
470 fh->f_init_aggr_list[z] = contg_groups[z].procs_in_contg_group[0];
471 }
472
473
474 return OMPI_SUCCESS;
475 }
476
477 /*****************************************************************************************************/
478 /*****************************************************************************************************/
479 /*****************************************************************************************************/
480 /*
481 ** This function is called by the collective I/O operations to determine the final number
482 ** of aggregators.
483 */
484
mca_common_ompio_set_aggregator_props(struct mca_io_ompio_file_t * fh,int num_aggregators,size_t bytes_per_proc)485 int mca_common_ompio_set_aggregator_props (struct mca_io_ompio_file_t *fh,
486 int num_aggregators,
487 size_t bytes_per_proc)
488 {
489 int j,procs_per_group = 0;
490 int ret=OMPI_SUCCESS;
491
492 /*If only one process used, no need to do aggregator selection!*/
493 if (fh->f_size == 1){
494 num_aggregators = 1;
495 }
496
497 fh->f_flags |= OMPIO_AGGREGATOR_IS_SET;
498
499 if (-1 == num_aggregators) {
500 if ( SIMPLE == OMPIO_MCA_GET(fh, grouping_option) ||
501 NO_REFINEMENT == OMPIO_MCA_GET(fh,grouping_option) ||
502 SIMPLE_PLUS == OMPIO_MCA_GET(fh,grouping_option) ) {
503 fh->f_aggregator_index = 0;
504 fh->f_final_num_aggrs = fh->f_init_num_aggrs;
505 fh->f_procs_per_group = fh->f_init_procs_per_group;
506
507 fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
508 if (NULL == fh->f_procs_in_group) {
509 opal_output (1, "OUT OF MEMORY\n");
510 return OMPI_ERR_OUT_OF_RESOURCE;
511 }
512
513 for (j=0 ; j<fh->f_procs_per_group ; j++) {
514 fh->f_procs_in_group[j] = fh->f_init_procs_in_group[j];
515 }
516 }
517 else {
518 ret = mca_common_ompio_create_groups(fh,bytes_per_proc);
519 }
520 return ret;
521 }
522
523 /* Forced number of aggregators
524 ** calculate the offset at which each group of processes will start
525 */
526 if ( num_aggregators > fh->f_size ) {
527 num_aggregators = fh->f_size;
528 }
529 procs_per_group = ceil ((float)fh->f_size/num_aggregators);
530
531 /* calculate the number of processes in the local group */
532 if (fh->f_size/procs_per_group != fh->f_rank/procs_per_group) {
533 fh->f_procs_per_group = procs_per_group;
534 }
535 else {
536 fh->f_procs_per_group = fh->f_size%procs_per_group;
537 }
538
539 fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
540 if (NULL == fh->f_procs_in_group) {
541 opal_output (1, "OUT OF MEMORY\n");
542 return OMPI_ERR_OUT_OF_RESOURCE;
543 }
544
545 for (j=0 ; j<fh->f_procs_per_group ; j++) {
546 fh->f_procs_in_group[j] = (fh->f_rank/procs_per_group) * procs_per_group + j;
547 }
548
549 fh->f_aggregator_index = 0;
550 fh->f_final_num_aggrs = num_aggregators;
551
552 return OMPI_SUCCESS;
553 }
554
555
556
mca_common_ompio_create_groups(mca_io_ompio_file_t * fh,size_t bytes_per_proc)557 int mca_common_ompio_create_groups(mca_io_ompio_file_t *fh,
558 size_t bytes_per_proc)
559 {
560
561 int is_aggregator = 0;
562 int final_aggr = 0;
563 int final_num_aggrs = 0;
564 int ret = OMPI_SUCCESS, ompio_grouping_flag = 0;
565
566 int *decision_list = NULL;
567
568 OMPI_MPI_OFFSET_TYPE *start_offsets_lens = NULL;
569 OMPI_MPI_OFFSET_TYPE *end_offsets = NULL;
570 OMPI_MPI_OFFSET_TYPE bytes_per_group = 0;
571 OMPI_MPI_OFFSET_TYPE *aggr_bytes_per_group = NULL;
572
573 ret = mca_common_ompio_prepare_to_group(fh,
574 &start_offsets_lens,
575 &end_offsets,
576 &aggr_bytes_per_group,
577 &bytes_per_group,
578 &decision_list,
579 bytes_per_proc,
580 &is_aggregator,
581 &ompio_grouping_flag);
582 if ( OMPI_SUCCESS != ret ) {
583 opal_output (1, "mca_common_ompio_create_groups: error in mca_common_ompio_prepare_to_group\n");
584 goto exit;
585 }
586
587 switch(ompio_grouping_flag){
588
589 case OMPIO_SPLIT:
590 ret = mca_common_ompio_split_initial_groups(fh,
591 start_offsets_lens,
592 end_offsets,
593 bytes_per_group);
594 break;
595
596 case OMPIO_MERGE:
597 ret = mca_common_ompio_merge_initial_groups(fh,
598 aggr_bytes_per_group,
599 decision_list,
600 is_aggregator);
601 break;
602
603 case OMPIO_RETAIN:
604
605 ret = mca_common_ompio_retain_initial_groups(fh);
606
607 break;
608
609
610 }
611 if ( OMPI_SUCCESS != ret ) {
612 opal_output (1, "mca_common_ompio_create_groups: error in subroutine called within switch statement\n");
613 goto exit;
614 }
615
616 //Set aggregator index
617 fh->f_aggregator_index = 0;
618
619 //Calculate final number of aggregators
620 if(fh->f_rank == fh->f_procs_in_group[fh->f_aggregator_index]){
621 final_aggr = 1;
622 }
623 ret = fh->f_comm->c_coll->coll_allreduce (&final_aggr,
624 &final_num_aggrs,
625 1,
626 MPI_INT,
627 MPI_SUM,
628 fh->f_comm,
629 fh->f_comm->c_coll->coll_allreduce_module);
630 if ( OMPI_SUCCESS != ret ) {
631 opal_output (1, "mca_common_ompio_create_groups: error in allreduce\n");
632 }
633
634 //Set final number of aggregators in file handle
635 fh->f_final_num_aggrs = final_num_aggrs;
636
637 exit:
638
639 if (NULL != start_offsets_lens) {
640 free (start_offsets_lens);
641 }
642 if (NULL != end_offsets) {
643 free (end_offsets);
644 }
645 if(NULL != aggr_bytes_per_group){
646 free(aggr_bytes_per_group);
647 }
648 if( NULL != decision_list){
649 free(decision_list);
650 }
651
652
653 return OMPI_SUCCESS;
654 }
655
mca_common_ompio_merge_initial_groups(mca_io_ompio_file_t * fh,OMPI_MPI_OFFSET_TYPE * aggr_bytes_per_group,int * decision_list,int is_aggregator)656 int mca_common_ompio_merge_initial_groups(mca_io_ompio_file_t *fh,
657 OMPI_MPI_OFFSET_TYPE *aggr_bytes_per_group,
658 int *decision_list,
659 int is_aggregator){
660
661 OMPI_MPI_OFFSET_TYPE sum_bytes = 0;
662 MPI_Request *sendreqs = NULL;
663
664 int start = 0;
665 int end = 0;
666 int i = 0;
667 int j = 0;
668 int r = 0;
669
670 int merge_pair_flag = 4;
671 int first_merge_flag = 4;
672 int *merge_aggrs = NULL;
673 int is_new_aggregator= 0;
674 int ret = OMPI_SUCCESS;
675
676 if(is_aggregator){
677 i = 0;
678 sum_bytes = 0;
679 //go through the decision list
680 //Find the aggregators that could merge
681
682 while(i < fh->f_init_num_aggrs){
683 while(1){
684 if( i >= fh->f_init_num_aggrs){
685 break;
686 }
687 else if((decision_list[i] == OMPIO_MERGE) &&
688 (sum_bytes <= OMPIO_MCA_GET(fh, bytes_per_agg))){
689 sum_bytes = sum_bytes + aggr_bytes_per_group[i];
690 decision_list[i] = merge_pair_flag;
691 i++;
692 }
693 else if((decision_list[i] == OMPIO_MERGE) &&
694 (sum_bytes >= OMPIO_MCA_GET(fh, bytes_per_agg)) ){
695 if(decision_list[i+1] == OMPIO_MERGE){
696 merge_pair_flag++;
697 decision_list[i] = merge_pair_flag;
698 sum_bytes = aggr_bytes_per_group[i];
699 i++;
700 }
701 else{
702 decision_list[i] = merge_pair_flag;
703 i++;
704 }
705 }
706 else{
707 i++;
708 if(decision_list[i] == OMPIO_MERGE)
709 merge_pair_flag++;
710 sum_bytes = 0;
711 break;
712 }
713 }
714 }
715
716 //Now go through the new edited decision list and
717 //make lists of aggregators to merge and number
718 //of groups to me merged.
719 i = 0;
720 j = 0;
721
722 while(i < fh->f_init_num_aggrs){
723 if(decision_list[i] >= first_merge_flag){
724 start = i;
725 while((decision_list[i] >= first_merge_flag) &&
726 (i < fh->f_init_num_aggrs-1)){
727 if(decision_list[i+1] == decision_list[i]){
728 i++;
729 }
730 else{
731 break;
732 }
733 end = i;
734 }
735 merge_aggrs = (int *)malloc((end - start + 1) * sizeof(int));
736 if (NULL == merge_aggrs) {
737 opal_output (1, "OUT OF MEMORY\n");
738 return OMPI_ERR_OUT_OF_RESOURCE;
739 }
740 j = 0;
741 for( j = 0 ; j < end - start + 1; j++){
742 merge_aggrs[j] = fh->f_init_aggr_list[start+j];
743 }
744 if(fh->f_rank == merge_aggrs[0])
745 is_new_aggregator = 1;
746
747 for( j = 0 ; j < end-start+1 ;j++){
748 if(fh->f_rank == merge_aggrs[j]){
749 ret = mca_common_ompio_merge_groups(fh, merge_aggrs,
750 end-start+1);
751 if ( OMPI_SUCCESS != ret ) {
752 opal_output (1, "mca_common_ompio_merge_initial_groups: error in mca_common_ompio_merge_groups\n");
753 free ( merge_aggrs );
754 return ret;
755 }
756 }
757 }
758 if(NULL != merge_aggrs){
759 free(merge_aggrs);
760 merge_aggrs = NULL;
761 }
762
763 }
764 i++;
765 }
766
767 }//end old aggregators
768
769 //New aggregators communicate new grouping info to the groups
770 if(is_new_aggregator){
771 sendreqs = (MPI_Request *)malloc ( 2 *fh->f_procs_per_group * sizeof(MPI_Request));
772 if (NULL == sendreqs) {
773 return OMPI_ERR_OUT_OF_RESOURCE;
774 }
775 //Communicate grouping info
776 for( j = 0 ; j < fh->f_procs_per_group; j++){
777 if (fh->f_procs_in_group[j] == fh->f_rank ) {
778 continue;
779 }
780 //new aggregator sends new procs_per_group to all its members
781 ret = MCA_PML_CALL(isend(&fh->f_procs_per_group,
782 1,
783 MPI_INT,
784 fh->f_procs_in_group[j],
785 OMPIO_PROCS_PER_GROUP_TAG,
786 MCA_PML_BASE_SEND_STANDARD,
787 fh->f_comm,
788 sendreqs + r++));
789 if ( OMPI_SUCCESS != ret ) {
790 opal_output (1, "mca_common_ompio_merge_initial_groups: error in Isend\n");
791 goto exit;
792 }
793 //new aggregator sends distribution of process to all its new members
794 ret = MCA_PML_CALL(isend(fh->f_procs_in_group,
795 fh->f_procs_per_group,
796 MPI_INT,
797 fh->f_procs_in_group[j],
798 OMPIO_PROCS_IN_GROUP_TAG,
799 MCA_PML_BASE_SEND_STANDARD,
800 fh->f_comm,
801 sendreqs + r++));
802 if ( OMPI_SUCCESS != ret ) {
803 opal_output (1, "mca_common_ompio_merge_initial_groups: error in Isend 2\n");
804 goto exit;
805 }
806
807 }
808 }
809 else {
810 //All non aggregators
811 //All processes receive initial process distribution from aggregators
812 ret = MCA_PML_CALL(recv(&fh->f_procs_per_group,
813 1,
814 MPI_INT,
815 MPI_ANY_SOURCE,
816 OMPIO_PROCS_PER_GROUP_TAG,
817 fh->f_comm,
818 MPI_STATUS_IGNORE));
819 if ( OMPI_SUCCESS != ret ) {
820 opal_output (1, "mca_common_ompio_merge_initial_groups: error in Recv\n");
821 return ret;
822 }
823
824 fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
825 if (NULL == fh->f_procs_in_group) {
826 opal_output (1, "OUT OF MEMORY\n");
827 return OMPI_ERR_OUT_OF_RESOURCE;
828 }
829
830 ret = MCA_PML_CALL(recv(fh->f_procs_in_group,
831 fh->f_procs_per_group,
832 MPI_INT,
833 MPI_ANY_SOURCE,
834 OMPIO_PROCS_IN_GROUP_TAG,
835 fh->f_comm,
836 MPI_STATUS_IGNORE));
837 if ( OMPI_SUCCESS != ret ) {
838 opal_output (1, "mca_common_ompio_merge_initial_groups: error in Recv 2\n");
839 return ret;
840 }
841
842 }
843
844 if(is_new_aggregator) {
845 ret = ompi_request_wait_all (r, sendreqs, MPI_STATUSES_IGNORE);
846 }
847
848 exit:
849 if (NULL != sendreqs) {
850 free(sendreqs);
851 }
852
853 return ret;
854 }
855
mca_common_ompio_split_initial_groups(mca_io_ompio_file_t * fh,OMPI_MPI_OFFSET_TYPE * start_offsets_lens,OMPI_MPI_OFFSET_TYPE * end_offsets,OMPI_MPI_OFFSET_TYPE bytes_per_group)856 int mca_common_ompio_split_initial_groups(mca_io_ompio_file_t *fh,
857 OMPI_MPI_OFFSET_TYPE *start_offsets_lens,
858 OMPI_MPI_OFFSET_TYPE *end_offsets,
859 OMPI_MPI_OFFSET_TYPE bytes_per_group){
860
861
862 int size_new_group = 0;
863 int size_old_group = 0;
864 int size_last_group = 0;
865 int size_smallest_group = 0;
866 int num_groups = 0;
867 int ret = OMPI_SUCCESS;
868
869 OMPI_MPI_OFFSET_TYPE max_cci = 0;
870 OMPI_MPI_OFFSET_TYPE min_cci = 0;
871
872 // integer round up
873 size_new_group = (int)(OMPIO_MCA_GET(fh, bytes_per_agg) / bytes_per_group + (OMPIO_MCA_GET(fh, bytes_per_agg) % bytes_per_group ? 1u : 0u));
874 size_old_group = fh->f_init_procs_per_group;
875
876 ret = mca_common_ompio_split_a_group(fh,
877 start_offsets_lens,
878 end_offsets,
879 size_new_group,
880 &max_cci,
881 &min_cci,
882 &num_groups,
883 &size_smallest_group);
884 if (OMPI_SUCCESS != ret ) {
885 opal_output (1, "mca_common_ompio_split_initial_groups: error in mca_common_ompio_split_a_group\n");
886 return ret;
887 }
888
889
890 switch( OMPIO_MCA_GET(fh,grouping_option)){
891 case DATA_VOLUME:
892 //Just use size as returned by split group
893 size_last_group = size_smallest_group;
894 break;
895
896 case UNIFORM_DISTRIBUTION:
897 if(size_smallest_group <= OMPIO_UNIFORM_DIST_THRESHOLD * size_new_group){
898 //uneven split need to call split again
899 if( size_old_group % num_groups == 0 ){
900 //most even distribution possible
901 size_new_group = size_old_group / num_groups;
902 size_last_group = size_new_group;
903 }
904 else{
905 //merge the last small group with the previous group
906 size_last_group = size_new_group + size_smallest_group;
907 }
908 }
909 else{
910 //Considered uniform
911 size_last_group = size_smallest_group;
912 }
913 break;
914
915 case CONTIGUITY:
916
917 while(1){
918 if((max_cci < OMPIO_CONTG_THRESHOLD) &&
919 (size_new_group < size_old_group)){
920
921 size_new_group = (size_new_group + size_old_group ) / 2;
922 ret = mca_common_ompio_split_a_group(fh,
923 start_offsets_lens,
924 end_offsets,
925 size_new_group,
926 &max_cci,
927 &min_cci,
928 &num_groups,
929 &size_smallest_group);
930 if (OMPI_SUCCESS != ret ) {
931 opal_output (1, "mca_common_ompio_split_initial_groups: error in mca_common_ompio_split_a_group 2\n");
932 return ret;
933 }
934 }
935 else{
936 break;
937 }
938 }
939 size_last_group = size_smallest_group;
940 break;
941
942 case OPTIMIZE_GROUPING:
943 //This case is a combination of Data volume, contiguity and uniform distribution
944 while(1){
945 if((max_cci < OMPIO_CONTG_THRESHOLD) &&
946 (size_new_group < size_old_group)){ //can be a better condition
947 //monitor the previous iteration
948 //break if it has not changed.
949 size_new_group = size_new_group + size_old_group;
950 // integer round up
951 size_new_group = size_new_group / 2 + (size_new_group % 2 ? 1 : 0);
952 ret = mca_common_ompio_split_a_group(fh,
953 start_offsets_lens,
954 end_offsets,
955 size_new_group,
956 &max_cci,
957 &min_cci,
958 &num_groups,
959 &size_smallest_group);
960 if (OMPI_SUCCESS != ret ) {
961 opal_output (1, "mca_common_ompio_split_initial_groups: error in mca_common_ompio_split_a_group 3\n");
962 return ret;
963 }
964 }
965 else{
966 break;
967 }
968 }
969
970 if(size_smallest_group <= OMPIO_UNIFORM_DIST_THRESHOLD * size_new_group){
971 //uneven split need to call split again
972 if( size_old_group % num_groups == 0 ){
973 //most even distribution possible
974 size_new_group = size_old_group / num_groups;
975 size_last_group = size_new_group;
976 }
977 else{
978 //merge the last small group with the previous group
979 size_last_group = size_new_group + size_smallest_group;
980 }
981 }
982 else{
983 //Considered uniform
984 size_last_group = size_smallest_group;
985 }
986
987 break;
988 }
989
990 ret = mca_common_ompio_finalize_split(fh, size_new_group, size_last_group);
991
992 return ret;
993 }
994
995
mca_common_ompio_retain_initial_groups(mca_io_ompio_file_t * fh)996 int mca_common_ompio_retain_initial_groups(mca_io_ompio_file_t *fh){
997
998 int i = 0;
999
1000 fh->f_procs_per_group = fh->f_init_procs_per_group;
1001 fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
1002 if (NULL == fh->f_procs_in_group) {
1003 opal_output (1, "OUT OF MEMORY\n");
1004 return OMPI_ERR_OUT_OF_RESOURCE;
1005 }
1006 for( i = 0 ; i < fh->f_procs_per_group; i++){
1007 fh->f_procs_in_group[i] = fh->f_init_procs_in_group[i];
1008 }
1009
1010
1011 return OMPI_SUCCESS;
1012 }
1013
mca_common_ompio_merge_groups(mca_io_ompio_file_t * fh,int * merge_aggrs,int num_merge_aggrs)1014 int mca_common_ompio_merge_groups(mca_io_ompio_file_t *fh,
1015 int *merge_aggrs,
1016 int num_merge_aggrs)
1017 {
1018 int i = 0;
1019 int *sizes_old_group;
1020 int ret;
1021 int *displs = NULL;
1022
1023 sizes_old_group = (int*)malloc(num_merge_aggrs * sizeof(int));
1024 if (NULL == sizes_old_group) {
1025 opal_output (1, "OUT OF MEMORY\n");
1026 ret = OMPI_ERR_OUT_OF_RESOURCE;
1027 goto exit;
1028 }
1029
1030
1031 displs = (int*)malloc(num_merge_aggrs * sizeof(int));
1032 if (NULL == displs) {
1033 opal_output (1, "OUT OF MEMORY\n");
1034 ret = OMPI_ERR_OUT_OF_RESOURCE;
1035 goto exit;
1036 }
1037
1038
1039 //merge_aggrs[0] is considered the new aggregator
1040 //New aggregator collects group sizes of the groups to be merged
1041 ret = ompi_fcoll_base_coll_allgather_array (&fh->f_init_procs_per_group,
1042 1,
1043 MPI_INT,
1044 sizes_old_group,
1045 1,
1046 MPI_INT,
1047 0,
1048 merge_aggrs,
1049 num_merge_aggrs,
1050 fh->f_comm);
1051
1052 if ( OMPI_SUCCESS != ret ) {
1053 goto exit;
1054 }
1055 fh->f_procs_per_group = 0;
1056
1057
1058 for( i = 0; i < num_merge_aggrs; i++){
1059 fh->f_procs_per_group = fh->f_procs_per_group + sizes_old_group[i];
1060 }
1061
1062 displs[0] = 0;
1063 for(i = 1; i < num_merge_aggrs; i++){
1064 displs[i] = displs[i-1] + sizes_old_group[i-1];
1065 }
1066
1067 fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
1068 if (NULL == fh->f_procs_in_group) {
1069 opal_output (1, "OUT OF MEMORY\n");
1070 ret = OMPI_ERR_OUT_OF_RESOURCE;
1071 goto exit;
1072 }
1073
1074 //New aggregator also collects the grouping distribution
1075 //This is the actual merge
1076 //use allgatherv array
1077 ret = ompi_fcoll_base_coll_allgatherv_array (fh->f_init_procs_in_group,
1078 fh->f_init_procs_per_group,
1079 MPI_INT,
1080 fh->f_procs_in_group,
1081 sizes_old_group,
1082 displs,
1083 MPI_INT,
1084 0,
1085 merge_aggrs,
1086 num_merge_aggrs,
1087 fh->f_comm);
1088
1089 exit:
1090 if (NULL != displs) {
1091 free (displs);
1092 }
1093 if (NULL != sizes_old_group) {
1094 free (sizes_old_group);
1095 }
1096
1097 return ret;
1098
1099 }
1100
1101
1102
mca_common_ompio_split_a_group(mca_io_ompio_file_t * fh,OMPI_MPI_OFFSET_TYPE * start_offsets_lens,OMPI_MPI_OFFSET_TYPE * end_offsets,int size_new_group,OMPI_MPI_OFFSET_TYPE * max_cci,OMPI_MPI_OFFSET_TYPE * min_cci,int * num_groups,int * size_smallest_group)1103 int mca_common_ompio_split_a_group(mca_io_ompio_file_t *fh,
1104 OMPI_MPI_OFFSET_TYPE *start_offsets_lens,
1105 OMPI_MPI_OFFSET_TYPE *end_offsets,
1106 int size_new_group,
1107 OMPI_MPI_OFFSET_TYPE *max_cci,
1108 OMPI_MPI_OFFSET_TYPE *min_cci,
1109 int *num_groups,
1110 int *size_smallest_group)
1111 {
1112
1113 OMPI_MPI_OFFSET_TYPE *cci = NULL;
1114 *num_groups = fh->f_init_procs_per_group / size_new_group;
1115 *size_smallest_group = size_new_group;
1116 int i = 0;
1117 int k = 0;
1118 int flag = 0; //all groups same size
1119 int size = 0;
1120
1121 if( fh->f_init_procs_per_group % size_new_group != 0 ){
1122 *num_groups = *num_groups + 1;
1123 *size_smallest_group = fh->f_init_procs_per_group % size_new_group;
1124 flag = 1;
1125 }
1126
1127 cci = (OMPI_MPI_OFFSET_TYPE*)malloc(*num_groups * sizeof( OMPI_MPI_OFFSET_TYPE ));
1128 if (NULL == cci) {
1129 opal_output(1, "OUT OF MEMORY\n");
1130 return OMPI_ERR_OUT_OF_RESOURCE;
1131 }
1132
1133 //check contiguity within new groups
1134 size = size_new_group;
1135 for( i = 0; i < *num_groups; i++){
1136 cci[i] = start_offsets_lens[3*size_new_group*i + 1];
1137 //if it is the last group check if it is the smallest group
1138 if( (i == *num_groups-1) && flag == 1){
1139 size = *size_smallest_group;
1140 }
1141 for( k = 0; k < size-1; k++){
1142 if( end_offsets[size_new_group* i + k] == start_offsets_lens[3*size_new_group*i + 3*(k+1)] ){
1143 cci[i] += start_offsets_lens[3*size_new_group*i + 3*(k + 1) + 1];
1144 }
1145 }
1146 }
1147
1148 //get min and max cci
1149 *min_cci = cci[0];
1150 *max_cci = cci[0];
1151 for( i = 1 ; i < *num_groups; i++){
1152 if(cci[i] > *max_cci){
1153 *max_cci = cci[i];
1154 }
1155 else if(cci[i] < *min_cci){
1156 *min_cci = cci[i];
1157 }
1158 }
1159
1160 free (cci);
1161 return OMPI_SUCCESS;
1162 }
1163
mca_common_ompio_finalize_split(mca_io_ompio_file_t * fh,int size_new_group,int size_last_group)1164 int mca_common_ompio_finalize_split(mca_io_ompio_file_t *fh,
1165 int size_new_group,
1166 int size_last_group)
1167 {
1168 //based on new group and last group finalize f_procs_per_group and f_procs_in_group
1169
1170 int i = 0;
1171 int j = 0;
1172 int k = 0;
1173
1174 for( i = 0; i < fh->f_init_procs_per_group ; i++){
1175
1176 if( fh->f_rank == fh->f_init_procs_in_group[i]){
1177 if( i >= fh->f_init_procs_per_group - size_last_group ){
1178 fh->f_procs_per_group = size_last_group;
1179 }
1180 else{
1181 fh->f_procs_per_group = size_new_group;
1182 }
1183 }
1184 }
1185
1186
1187 fh->f_procs_in_group = (int*)malloc (fh->f_procs_per_group * sizeof(int));
1188 if (NULL == fh->f_procs_in_group) {
1189 opal_output (1, "OUT OF MEMORY\n");
1190 return OMPI_ERR_OUT_OF_RESOURCE;
1191 }
1192
1193 for( i = 0; i < fh->f_init_procs_per_group ; i++){
1194 if( fh->f_rank == fh->f_init_procs_in_group[i]){
1195 if( i >= fh->f_init_procs_per_group - size_last_group ){
1196 //distribution of last group
1197 for( j = 0; j < fh->f_procs_per_group; j++){
1198 fh->f_procs_in_group[j] = fh->f_init_procs_in_group[fh->f_init_procs_per_group - size_last_group + j];
1199 }
1200 }
1201 else{
1202 //distribute all other groups
1203 for( j = 0 ; j < fh->f_init_procs_per_group; j = j + size_new_group){
1204 if(i >= j && i < j+size_new_group ){
1205 for( k = 0; k < fh->f_procs_per_group ; k++){
1206 fh->f_procs_in_group[k] = fh->f_init_procs_in_group[j+k];
1207 }
1208 }
1209 }
1210 }
1211
1212 }
1213 }
1214
1215 return OMPI_SUCCESS;
1216 }
1217
mca_common_ompio_prepare_to_group(mca_io_ompio_file_t * fh,OMPI_MPI_OFFSET_TYPE ** start_offsets_lens,OMPI_MPI_OFFSET_TYPE ** end_offsets,OMPI_MPI_OFFSET_TYPE ** aggr_bytes_per_group,OMPI_MPI_OFFSET_TYPE * bytes_per_group,int ** decision_list,size_t bytes_per_proc,int * is_aggregator,int * ompio_grouping_flag)1218 int mca_common_ompio_prepare_to_group(mca_io_ompio_file_t *fh,
1219 OMPI_MPI_OFFSET_TYPE **start_offsets_lens,
1220 OMPI_MPI_OFFSET_TYPE **end_offsets, // need it?
1221 OMPI_MPI_OFFSET_TYPE **aggr_bytes_per_group,
1222 OMPI_MPI_OFFSET_TYPE *bytes_per_group,
1223 int **decision_list,
1224 size_t bytes_per_proc,
1225 int *is_aggregator,
1226 int *ompio_grouping_flag)
1227 {
1228
1229 OMPI_MPI_OFFSET_TYPE start_offset_len[3] = {0};
1230 OMPI_MPI_OFFSET_TYPE *aggr_bytes_per_group_tmp = NULL;
1231 OMPI_MPI_OFFSET_TYPE *start_offsets_lens_tmp = NULL;
1232 OMPI_MPI_OFFSET_TYPE *end_offsets_tmp = NULL;
1233 int *decision_list_tmp = NULL;
1234
1235 int i = 0;
1236 int j = 0;
1237 int k = 0;
1238 int merge_count = 0;
1239 int split_count = 0; //not req?
1240 int retain_as_is_count = 0; //not req?
1241 int ret=OMPI_SUCCESS;
1242
1243 //Store start offset and length in an array //also add bytes per process
1244 if(NULL == fh->f_decoded_iov){
1245 start_offset_len[0] = 0;
1246 start_offset_len[1] = 0;
1247 }
1248 else{
1249 start_offset_len[0] = (OMPI_MPI_OFFSET_TYPE) fh->f_decoded_iov[0].iov_base;
1250 start_offset_len[1] = fh->f_decoded_iov[0].iov_len;
1251 }
1252 start_offset_len[2] = bytes_per_proc;
1253 start_offsets_lens_tmp = (OMPI_MPI_OFFSET_TYPE* )malloc (3 * fh->f_init_procs_per_group * sizeof(OMPI_MPI_OFFSET_TYPE));
1254 if (NULL == start_offsets_lens_tmp) {
1255 opal_output (1, "OUT OF MEMORY\n");
1256 return OMPI_ERR_OUT_OF_RESOURCE;
1257 }
1258
1259 //Gather start offsets across processes in a group on aggregator
1260 ret = ompi_fcoll_base_coll_allgather_array (start_offset_len,
1261 3,
1262 OMPI_OFFSET_DATATYPE,
1263 start_offsets_lens_tmp,
1264 3,
1265 OMPI_OFFSET_DATATYPE,
1266 0,
1267 fh->f_init_procs_in_group,
1268 fh->f_init_procs_per_group,
1269 fh->f_comm);
1270 if ( OMPI_SUCCESS != ret ) {
1271 opal_output (1, "mca_common_ompio_prepare_to_grou[: error in ompi_fcoll_base_coll_allgather_array\n");
1272 goto exit;
1273 }
1274 end_offsets_tmp = (OMPI_MPI_OFFSET_TYPE* )malloc (fh->f_init_procs_per_group * sizeof(OMPI_MPI_OFFSET_TYPE));
1275 if (NULL == end_offsets_tmp) {
1276 opal_output (1, "OUT OF MEMORY\n");
1277 goto exit;
1278 }
1279 for( k = 0 ; k < fh->f_init_procs_per_group; k++){
1280 end_offsets_tmp[k] = start_offsets_lens_tmp[3*k] + start_offsets_lens_tmp[3*k+1];
1281 }
1282 //Every process has the total bytes written in its group
1283 for(j = 0; j < fh->f_init_procs_per_group; j++){
1284 *bytes_per_group = *bytes_per_group + start_offsets_lens_tmp[3*j+2];
1285 }
1286
1287 *start_offsets_lens = &start_offsets_lens_tmp[0];
1288 *end_offsets = &end_offsets_tmp[0];
1289
1290
1291 for( j = 0 ; j < fh->f_init_num_aggrs ; j++){
1292 if(fh->f_rank == fh->f_init_aggr_list[j])
1293 *is_aggregator = 1;
1294 }
1295 //Decide groups going in for a merge or a split
1296 //Merge only if the groups are consecutive
1297 if(*is_aggregator == 1){
1298 aggr_bytes_per_group_tmp = (OMPI_MPI_OFFSET_TYPE*)malloc (fh->f_init_num_aggrs * sizeof(OMPI_MPI_OFFSET_TYPE));
1299 if (NULL == aggr_bytes_per_group_tmp) {
1300 opal_output (1, "OUT OF MEMORY\n");
1301 ret = OMPI_ERR_OUT_OF_RESOURCE;
1302 goto exit;
1303 }
1304 decision_list_tmp = (int* )malloc (fh->f_init_num_aggrs * sizeof(int));
1305 if (NULL == decision_list_tmp) {
1306 opal_output (1, "OUT OF MEMORY\n");
1307 ret = OMPI_ERR_OUT_OF_RESOURCE;
1308 goto exit;
1309 }
1310 //Communicate bytes per group between all aggregators
1311 ret = ompi_fcoll_base_coll_allgather_array (bytes_per_group,
1312 1,
1313 OMPI_OFFSET_DATATYPE,
1314 aggr_bytes_per_group_tmp,
1315 1,
1316 OMPI_OFFSET_DATATYPE,
1317 0,
1318 fh->f_init_aggr_list,
1319 fh->f_init_num_aggrs,
1320 fh->f_comm);
1321 if ( OMPI_SUCCESS != ret ) {
1322 opal_output (1, "mca_common_ompio_prepare_to_grou[: error in ompi_fcoll_base_coll_allgather_array 2\n");
1323 free(decision_list_tmp);
1324 goto exit;
1325 }
1326
1327 for( i = 0; i < fh->f_init_num_aggrs; i++){
1328 if((size_t)(aggr_bytes_per_group_tmp[i])>
1329 (size_t)OMPIO_MCA_GET(fh, bytes_per_agg)){
1330 decision_list_tmp[i] = OMPIO_SPLIT;
1331 split_count++;
1332 }
1333 else if((size_t)(aggr_bytes_per_group_tmp[i])<
1334 (size_t)OMPIO_MCA_GET(fh,bytes_per_agg)){
1335 decision_list_tmp[i] = OMPIO_MERGE;
1336 merge_count++;
1337 }
1338 else{
1339 decision_list_tmp[i] = OMPIO_RETAIN;
1340 retain_as_is_count++;
1341 }
1342 }
1343
1344 *aggr_bytes_per_group = &aggr_bytes_per_group_tmp[0];
1345 //Go through the decision list to see if non consecutive
1346 //processes intend to merge, if yes retain original grouping
1347 for( i = 0; i < fh->f_init_num_aggrs ; i++){
1348 if(decision_list_tmp[i] == OMPIO_MERGE){
1349 if( (i == 0) &&
1350 (decision_list_tmp[i+1] != OMPIO_MERGE)){ //first group
1351 decision_list_tmp[i] = OMPIO_RETAIN;
1352 }
1353 else if( (i == fh->f_init_num_aggrs-1) &&
1354 (decision_list_tmp[i-1] != OMPIO_MERGE)){
1355
1356 decision_list_tmp[i] = OMPIO_RETAIN;
1357 }
1358 else if(!((decision_list_tmp[i-1] == OMPIO_MERGE) ||
1359 (decision_list_tmp[i+1] == OMPIO_MERGE))){
1360
1361 decision_list_tmp[i] = OMPIO_RETAIN;
1362 }
1363 }
1364 }
1365
1366 //Set the flag as per the decision list
1367 for( i = 0 ; i < fh->f_init_num_aggrs; i++){
1368 if((decision_list_tmp[i] == OMPIO_MERGE)&&
1369 (fh->f_rank == fh->f_init_aggr_list[i]))
1370 *ompio_grouping_flag = OMPIO_MERGE;
1371
1372 if((decision_list_tmp[i] == OMPIO_SPLIT)&&
1373 (fh->f_rank == fh->f_init_aggr_list[i]))
1374 *ompio_grouping_flag = OMPIO_SPLIT;
1375
1376 if((decision_list_tmp[i] == OMPIO_RETAIN)&&
1377 (fh->f_rank == fh->f_init_aggr_list[i]))
1378 *ompio_grouping_flag = OMPIO_RETAIN;
1379 }
1380
1381 //print decision list of aggregators
1382 /*printf("RANK%d : Printing decsion list : \n",fh->f_rank);
1383 for( i = 0; i < fh->f_init_num_aggrs; i++){
1384 if(decision_list_tmp[i] == OMPIO_MERGE)
1385 printf("MERGE,");
1386 else if(decision_list_tmp[i] == OMPIO_SPLIT)
1387 printf("SPLIT, ");
1388 else if(decision_list_tmp[i] == OMPIO_RETAIN)
1389 printf("RETAIN, " );
1390 }
1391 printf("\n\n");
1392 */
1393 *decision_list = &decision_list_tmp[0];
1394 }
1395 //Communicate flag to all group members
1396 ret = ompi_fcoll_base_coll_bcast_array (ompio_grouping_flag,
1397 1,
1398 MPI_INT,
1399 0,
1400 fh->f_init_procs_in_group,
1401 fh->f_init_procs_per_group,
1402 fh->f_comm);
1403
1404 exit:
1405 /* Do not free aggr_bytes_per_group_tmp,
1406 ** start_offsets_lens_tmp, and end_offsets_tmp
1407 ** here. The memory is released in the layer above.
1408 */
1409
1410
1411 return ret;
1412 }
1413
1414 /*
1415 ** This is the actual formula of the cost function from the paper.
1416 ** One change made here is to use floating point values for
1417 ** all parameters, since the ceil() function leads to sometimes
1418 ** unexpected jumps in the execution time. Using float leads to
1419 ** more consistent predictions for the no. of aggregators.
1420 */
cost_calc(int P,int P_a,size_t d_p,size_t b_c,int dim)1421 static double cost_calc (int P, int P_a, size_t d_p, size_t b_c, int dim )
1422 {
1423 float n_as=1.0, m_s=1.0, n_s=1.0;
1424 float n_ar=1.0;
1425 double t_send, t_recv, t_tot;
1426
1427 /* LogGP parameters based on DDR InfiniBand values */
1428 double L=.00000184;
1429 double o=.00000149;
1430 double g=.0000119;
1431 double G=.00000000067;
1432
1433 long file_domain = (P * d_p) / P_a;
1434 float n_r = (float)file_domain/(float) b_c;
1435
1436 switch (dim) {
1437 case DIM1:
1438 {
1439 if( d_p > b_c ){
1440 //printf("case 1\n");
1441 n_ar = 1;
1442 n_as = 1;
1443 m_s = b_c;
1444 n_s = (float)d_p/(float)b_c;
1445 }
1446 else {
1447 n_ar = (float)b_c/(float)d_p;
1448 n_as = 1;
1449 m_s = d_p;
1450 n_s = 1;
1451 }
1452 break;
1453 }
1454 case DIM2:
1455 {
1456 int P_x, P_y, c;
1457
1458 P_x = P_y = (int) sqrt(P);
1459 c = (float) P_a / (float)P_x;
1460
1461 n_ar = (float) P_y;
1462 n_as = (float) c;
1463 if ( d_p > (P_a*b_c/P )) {
1464 m_s = fmin(b_c / P_y, d_p);
1465 }
1466 else {
1467 m_s = fmin(d_p * P_x / P_a, d_p);
1468 }
1469 break;
1470 }
1471 default :
1472 printf("stop putting random values\n");
1473 break;
1474 }
1475
1476 n_s = (float) d_p / (float)(n_as * m_s);
1477
1478 if( m_s < 33554432) {
1479 g = .00000108;
1480 }
1481 t_send = n_s * (L + 2 * o + (n_as -1) * g + (m_s - 1) * n_as * G);
1482 t_recv= n_r * (L + 2 * o + (n_ar -1) * g + (m_s - 1) * n_ar * G);;
1483 t_tot = t_send + t_recv;
1484
1485 return t_tot;
1486 }
1487
1488