1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
4  *                         University Research and Technology
5  *                         Corporation.  All rights reserved.
6  * Copyright (c) 2004-2017 The University of Tennessee and The University
7  *                         of Tennessee Research Foundation.  All rights
8  *                         reserved.
9  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
10  *                         University of Stuttgart.  All rights reserved.
11  * Copyright (c) 2004-2005 The Regents of the University of California.
12  *                         All rights reserved.
13  * Copyright (c) 2008-2014 University of Houston. All rights reserved.
14  * Copyright (c) 2015      Cisco Systems, Inc.  All rights reserved.
15  * Copyright (c) 2015      Los Alamos National Security, LLC. All rights
16  *                         reserved.
17  * Copyright (c) 2017-2018 Research Organization for Information Science
18  *                         and Technology (RIST). All rights reserved.
19  * $COPYRIGHT$
20  *
21  * Additional copyrights may follow
22  *
23  * $HEADER$
24 */
25 
26 #include "ompi_config.h"
27 #include "fcoll_two_phase.h"
28 #include "mpi.h"
29 #include "ompi/constants.h"
30 #include "ompi/communicator/communicator.h"
31 #include "ompi/mca/fcoll/fcoll.h"
32 #include "ompi/mca/common/ompio/common_ompio.h"
33 #include "ompi/mca/io/io.h"
34 #include "opal/mca/base/base.h"
35 #include "math.h"
36 #include "ompi/mca/pml/pml.h"
37 #include <unistd.h>
38 
39 #define DEBUG 0
40 
41 /* Two Phase implementation from ROMIO ported to OMPIO infrastructure
42  * This is pretty much the same as ROMIO's two_phase and based on ROMIO's code
43  * base
44  */
45 
46 
47 /* Datastructure to support specifying the flat-list. */
48 typedef struct flat_list_node {
49     MPI_Datatype type;
50     int count;
51     OMPI_MPI_OFFSET_TYPE *blocklens;
52     OMPI_MPI_OFFSET_TYPE *indices;
53     struct flat_list_node *next;
54 }Flatlist_node;
55 
56 /* local function declarations  */
57 static int two_phase_read_and_exch(ompio_file_t *fh,
58 				   void *buf,
59 				   MPI_Datatype datatype,
60 				   mca_common_ompio_access_array_t *others_req,
61 				   struct iovec *offset_len,
62 				   int contig_access_count,
63 				   OMPI_MPI_OFFSET_TYPE min_st_offset,
64 				   OMPI_MPI_OFFSET_TYPE fd_size,
65 				   OMPI_MPI_OFFSET_TYPE *fd_start,
66 				   OMPI_MPI_OFFSET_TYPE *fd_end,
67 				   Flatlist_node *flat_buf,
68 				   size_t *buf_idx, int striping_unit,
69 				   int num_io_procs, int *aggregator_list);
70 
71 static int  two_phase_exchange_data(ompio_file_t *fh,
72 				    void *buf,
73 				    struct iovec *offset_length,
74 				    int *send_size, int *start_pos,
75 				    int *recv_size,
76 				    int *count,
77 				    int *partial_send, int *recd_from_proc,
78 				    int contig_access_count,
79 				    OMPI_MPI_OFFSET_TYPE min_st_offset,
80 				    OMPI_MPI_OFFSET_TYPE fd_size,
81 				    OMPI_MPI_OFFSET_TYPE *fd_start,
82 				    OMPI_MPI_OFFSET_TYPE *fd_end,
83 				    Flatlist_node *flat_buf,
84 				    mca_common_ompio_access_array_t *others_req,
85 				    int iter,
86 				    size_t *buf_idx, MPI_Aint buftype_extent,
87 				    int striping_unit, int num_io_procs,
88 				    int *aggregator_list);
89 
90 
91 static void two_phase_fill_user_buffer(ompio_file_t *fh,
92 				       void *buf,
93 				       Flatlist_node *flat_buf,
94 				       char **recv_buf,
95 				       struct iovec *offset_length,
96 				       unsigned *recv_size,
97 				       MPI_Request *requests,
98 				       int *recd_from_proc,
99 				       int contig_access_count,
100 				       OMPI_MPI_OFFSET_TYPE min_st_offset,
101 				       OMPI_MPI_OFFSET_TYPE fd_size,
102 				       OMPI_MPI_OFFSET_TYPE *fd_start,
103 				       OMPI_MPI_OFFSET_TYPE *fd_end,
104 				       MPI_Aint buftype_extent,
105 				       int striping_unit,
106 				       int num_io_procs, int *aggregator_list);
107 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
108 static int isread_aggregator(int rank,
109 			     int nprocs_for_coll,
110 			     int *aggregator_list);
111 
112 #endif
113 
114 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
115 double read_time = 0.0, start_read_time = 0.0, end_read_time = 0.0;
116 double rcomm_time = 0.0, start_rcomm_time = 0.0, end_rcomm_time = 0.0;
117 double read_exch = 0.0, start_rexch = 0.0, end_rexch = 0.0;
118 #endif
119 
120 
121 int
mca_fcoll_two_phase_file_read_all(ompio_file_t * fh,void * buf,int count,struct ompi_datatype_t * datatype,ompi_status_public_t * status)122 mca_fcoll_two_phase_file_read_all (ompio_file_t *fh,
123 				   void *buf,
124 				   int count,
125 				   struct ompi_datatype_t *datatype,
126 				   ompi_status_public_t *status)
127 {
128 
129     int ret = OMPI_SUCCESS, i = 0, j = 0, interleave_count = 0, striping_unit = 0;
130     MPI_Aint recv_buf_addr = 0;
131     uint32_t iov_count = 0, ti = 0;
132     struct iovec *decoded_iov = NULL, *temp_iov = NULL, *iov = NULL;
133     size_t max_data = 0;
134     long long_max_data = 0, long_total_bytes = 0;
135     int domain_size=0, *count_my_req_per_proc=NULL, count_my_req_procs = 0;
136     int count_other_req_procs;
137     size_t *buf_indices=NULL;
138     int *aggregator_list = NULL, local_count = 0, local_size = 0;
139     int two_phase_num_io_procs=1;
140     OMPI_MPI_OFFSET_TYPE start_offset = 0, end_offset = 0, fd_size = 0;
141     OMPI_MPI_OFFSET_TYPE *start_offsets=NULL, *end_offsets=NULL;
142     OMPI_MPI_OFFSET_TYPE *fd_start=NULL, *fd_end=NULL, min_st_offset = 0;
143     Flatlist_node *flat_buf=NULL;
144     mca_common_ompio_access_array_t *my_req=NULL, *others_req=NULL;
145 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
146     mca_common_ompio_print_entry nentry;
147 #endif
148 //    if (opal_datatype_is_predefined(&datatype->super)) {
149 //	fh->f_flags = fh->f_flags |  OMPIO_CONTIGUOUS_MEMORY;
150 //    }
151 
152     if (! (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
153 	ret =   mca_common_ompio_decode_datatype ((struct ompio_file_t *)fh,
154 				                  datatype,
155 				                  count,
156 				                  buf,
157 				                  &max_data,
158                                                   fh->f_mem_convertor,
159 				                  &temp_iov,
160 				                  &iov_count);
161 	if (OMPI_SUCCESS != ret ){
162 	    goto exit;
163 	}
164 
165 	recv_buf_addr = (size_t)(buf);
166 	decoded_iov  = (struct iovec *) calloc
167 	    (iov_count, sizeof(struct iovec));
168 
169 	for (ti = 0; ti < iov_count; ti++){
170 
171 	    decoded_iov[ti].iov_base = (IOVBASE_TYPE *)
172 		((ptrdiff_t)temp_iov[ti].iov_base - recv_buf_addr);
173 	    decoded_iov[ti].iov_len = temp_iov[ti].iov_len;
174 #if DEBUG
175 	    printf("d_offset[%d]: %ld, d_len[%d]: %ld\n",
176 		   ti, (ptrdiff_t)decoded_iov[ti].iov_base,
177 		   ti, decoded_iov[ti].iov_len);
178 #endif
179 	}
180 
181     }
182     else{
183 	max_data = count * datatype->super.size;
184     }
185 
186     if ( MPI_STATUS_IGNORE != status ) {
187 	status->_ucount = max_data;
188     }
189 
190     two_phase_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
191     if ( OMPI_ERR_MAX == two_phase_num_io_procs ) {
192         ret = OMPI_ERROR;
193         goto exit;
194     }
195     if (-1 == two_phase_num_io_procs ){
196 	ret = mca_common_ompio_set_aggregator_props ((struct ompio_file_t *)fh,
197 					             two_phase_num_io_procs,
198 					             max_data);
199 	if (OMPI_SUCCESS != ret){
200             goto exit;
201 	}
202 
203 	two_phase_num_io_procs = fh->f_num_aggrs;
204 
205     }
206 
207     if (two_phase_num_io_procs > fh->f_size){
208         two_phase_num_io_procs = fh->f_size;
209     }
210 
211     aggregator_list = (int *) calloc (two_phase_num_io_procs, sizeof(int));
212     if (NULL == aggregator_list){
213 	ret = OMPI_ERR_OUT_OF_RESOURCE;
214 	goto exit;
215     }
216 
217     if ( OMPI_COMM_IS_MAPBY_NODE (&ompi_mpi_comm_world.comm) ) {
218         for (i =0; i< two_phase_num_io_procs; i++){
219             aggregator_list[i] = i;
220         }
221     }
222     else {
223         for (i =0; i< two_phase_num_io_procs; i++){
224             aggregator_list[i] = i * fh->f_size / two_phase_num_io_procs;
225         }
226     }
227 
228     ret = fh->f_generate_current_file_view ((struct ompio_file_t *)fh,
229 					    max_data,
230 					    &iov,
231 					    &local_count);
232 
233     if (OMPI_SUCCESS != ret){
234 	goto exit;
235     }
236 
237     long_max_data = (long) max_data;
238     ret = fh->f_comm->c_coll->coll_allreduce (&long_max_data,
239 					     &long_total_bytes,
240 					     1,
241 					     MPI_LONG,
242 					     MPI_SUM,
243 					     fh->f_comm,
244 					     fh->f_comm->c_coll->coll_allreduce_module);
245 
246     if ( OMPI_SUCCESS != ret ) {
247 	goto exit;
248     }
249 
250     if (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
251 
252 	/* This datastructre translates between OMPIO->ROMIO its a little hacky!*/
253 	/* But helps to re-use romio's code for handling non-contiguous file-type*/
254 	/*Flattened datatype for ompio is in decoded_iov it translated into
255 	  flatbuf*/
256 
257 	flat_buf = (Flatlist_node *)calloc(1, sizeof(Flatlist_node));
258 	if ( NULL == flat_buf ){
259 	    ret = OMPI_ERR_OUT_OF_RESOURCE;
260 	    goto exit;
261 	}
262 
263 	flat_buf->type = datatype;
264 	flat_buf->next = NULL;
265 	flat_buf->count = 0;
266 	flat_buf->indices = NULL;
267 	flat_buf->blocklens = NULL;
268 
269 	if ( 0 < count ) {
270 	    local_size = OMPIO_MAX(1,iov_count/count);
271 	}
272 	else {
273 	    local_size = 0;
274 	}
275 
276 
277 	if ( 0 < local_size ) {
278 	    flat_buf->indices =
279 		(OMPI_MPI_OFFSET_TYPE *)calloc(local_size,
280 					       sizeof(OMPI_MPI_OFFSET_TYPE));
281 	    if (NULL == flat_buf->indices){
282 		ret = OMPI_ERR_OUT_OF_RESOURCE;
283 		goto exit;
284 	    }
285 
286 	    flat_buf->blocklens =
287 		(OMPI_MPI_OFFSET_TYPE *)calloc(local_size,
288 					       sizeof(OMPI_MPI_OFFSET_TYPE));
289 	    if ( NULL == flat_buf->blocklens ){
290 		ret = OMPI_ERR_OUT_OF_RESOURCE;
291 		goto exit;
292 	    }
293 	}
294 	flat_buf->count = local_size;
295         for (j = 0 ; j < local_size ; ++j) {
296 	    flat_buf->indices[j] = (OMPI_MPI_OFFSET_TYPE)(intptr_t)decoded_iov[j].iov_base;
297 	    flat_buf->blocklens[j] = decoded_iov[j].iov_len;
298 	}
299 
300 #if DEBUG
301 	printf("flat_buf count: %d\n",
302 	       flat_buf->count);
303 	for(i=0;i<flat_buf->count;i++){
304 	    printf("%d: blocklen[%d] : %lld, indices[%d]: %lld\n",
305 		   fh->f_rank, i, flat_buf->blocklens[i], i ,flat_buf->indices[i]);
306 	}
307 #endif
308     }
309 
310 #if DEBUG
311     printf("%d: total_bytes:%ld, local_count: %d\n",
312 	   fh->f_rank, long_total_bytes, local_count);
313     for (i=0 ; i<local_count ; i++) {
314 	printf("%d: fcoll:two_phase:read_all:OFFSET:%ld,LENGTH:%ld\n",
315 	       fh->f_rank,
316 	       (size_t)iov[i].iov_base,
317 	       (size_t)iov[i].iov_len);
318     }
319 #endif
320 
321     start_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)iov[0].iov_base;
322     if ( 0 < local_count ) {
323 	end_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)iov[local_count-1].iov_base +
324 	    (OMPI_MPI_OFFSET_TYPE)(intptr_t)iov[local_count-1].iov_len - 1;
325     }
326     else {
327 	end_offset = 0;
328     }
329 #if DEBUG
330     printf("%d: START OFFSET:%ld, END OFFSET:%ld\n",
331 	   fh->f_rank,
332 	   (size_t)start_offset,
333 	   (size_t)end_offset);
334 #endif
335 
336     start_offsets = (OMPI_MPI_OFFSET_TYPE *)calloc
337 	(fh->f_size, sizeof(OMPI_MPI_OFFSET_TYPE));
338 
339     if ( NULL == start_offsets ){
340 	ret = OMPI_ERR_OUT_OF_RESOURCE;
341 	goto exit;
342     }
343 
344     end_offsets = (OMPI_MPI_OFFSET_TYPE *)calloc
345 	(fh->f_size, sizeof(OMPI_MPI_OFFSET_TYPE));
346 
347     if (NULL == end_offsets){
348 	ret = OMPI_ERR_OUT_OF_RESOURCE;
349 	goto exit;
350     }
351 
352     ret = fh->f_comm->c_coll->coll_allgather(&start_offset,
353 					    1,
354 					    OMPI_OFFSET_DATATYPE,
355 					    start_offsets,
356 					    1,
357 					    OMPI_OFFSET_DATATYPE,
358 					    fh->f_comm,
359 					    fh->f_comm->c_coll->coll_allgather_module);
360 
361     if ( OMPI_SUCCESS != ret ){
362 	goto exit;
363     }
364 
365     ret = fh->f_comm->c_coll->coll_allgather(&end_offset,
366 					    1,
367 					    OMPI_OFFSET_DATATYPE,
368 					    end_offsets,
369 					    1,
370 					    OMPI_OFFSET_DATATYPE,
371 					    fh->f_comm,
372 					    fh->f_comm->c_coll->coll_allgather_module);
373 
374 
375     if ( OMPI_SUCCESS != ret ){
376 	goto exit;
377     }
378 
379 #if DEBUG
380     for (i=0;i<fh->f_size;i++){
381 	printf("%d: start[%d]:%ld,end[%d]:%ld\n",
382 	       fh->f_rank,i,
383 	       (size_t)start_offsets[i],i,
384 	       (size_t)end_offsets[i]);
385     }
386 #endif
387 
388     for (i=1; i<fh->f_size; i++){
389 	if ((start_offsets[i] < end_offsets[i-1]) &&
390 	    (start_offsets[i] <= end_offsets[i])){
391 	    interleave_count++;
392 	}
393     }
394 
395 #if DEBUG
396     printf("%d: interleave_count:%d\n",
397 	   fh->f_rank,interleave_count);
398 #endif
399 
400     ret = mca_fcoll_two_phase_domain_partition(fh,
401 					       start_offsets,
402 					       end_offsets,
403 					       &min_st_offset,
404 					       &fd_start,
405 					       &fd_end,
406 					       domain_size,
407 					       &fd_size,
408 					       striping_unit,
409 					       two_phase_num_io_procs);
410     if (OMPI_SUCCESS != ret){
411 	goto exit;
412     }
413 
414 #if DEBUG
415     for (i=0;i<two_phase_num_io_procs;i++){
416 	printf("fd_start[%d] : %lld, fd_end[%d] : %lld, local_count: %d\n",
417 	       i, fd_start[i], i, fd_end[i], local_count);
418     }
419 #endif
420 
421     ret = mca_fcoll_two_phase_calc_my_requests (fh,
422 						iov,
423 						local_count,
424 						min_st_offset,
425 						fd_start,
426 						fd_end,
427 						fd_size,
428 						&count_my_req_procs,
429 						&count_my_req_per_proc,
430 						&my_req,
431 						&buf_indices,
432 						striping_unit,
433 						two_phase_num_io_procs,
434 						aggregator_list);
435     if ( OMPI_SUCCESS != ret ){
436 	goto exit;
437     }
438 
439     ret = mca_fcoll_two_phase_calc_others_requests(fh,
440 						   count_my_req_procs,
441 						   count_my_req_per_proc,
442 						   my_req,
443 						   &count_other_req_procs,
444 						   &others_req);
445     if (OMPI_SUCCESS != ret ){
446 	goto exit;
447     }
448 
449 #if DEBUG
450     printf("%d count_other_req_procs : %d\n",
451 	   fh->f_rank,
452 	   count_other_req_procs);
453 #endif
454 
455 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
456     start_rexch = MPI_Wtime();
457 #endif
458 
459 
460     ret = two_phase_read_and_exch(fh,
461 				  buf,
462 				  datatype,
463 				  others_req,
464 				  iov,
465 				  local_count,
466 				  min_st_offset,
467 				  fd_size,
468 				  fd_start,
469 				  fd_end,
470 				  flat_buf,
471 				  buf_indices,
472 				  striping_unit,
473 				  two_phase_num_io_procs,
474 				  aggregator_list);
475 
476 
477     if (OMPI_SUCCESS != ret){
478 	goto exit;
479     }
480 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
481     end_rexch = MPI_Wtime();
482     read_exch += (end_rexch - start_rexch);
483     nentry.time[0] = read_time;
484     nentry.time[1] = rcomm_time;
485     nentry.time[2] = read_exch;
486     if (isread_aggregator(fh->f_rank,
487 			  two_phase_num_io_procs,
488 			  aggregator_list)){
489 	nentry.aggregator = 1;
490     }
491     else{
492 	nentry.aggregator = 0;
493     }
494     nentry.nprocs_for_coll = two_phase_num_io_procs;
495 
496 
497     if (!mca_common_ompio_full_print_queue(fh->f_coll_read_time)){
498 	mca_common_ompio_register_print_entry(fh->f_coll_read_time,
499                                               nentry);
500     }
501 #endif
502 
503 
504 exit:
505     if (flat_buf != NULL){
506 	if (flat_buf->blocklens != NULL){
507 	    free (flat_buf->blocklens);
508 	}
509 	if (flat_buf->indices != NULL){
510 	    free (flat_buf->indices);
511 	}
512         free (flat_buf);
513     }
514 
515     free (start_offsets);
516     free (end_offsets);
517     free (aggregator_list);
518     free (fd_start);
519     free (decoded_iov);
520     free (buf_indices);
521     free (count_my_req_per_proc);
522     free (my_req);
523     free (others_req);
524     free (fd_end);
525 
526     return ret;
527 }
528 
529 
530 
531 
two_phase_read_and_exch(ompio_file_t * fh,void * buf,MPI_Datatype datatype,mca_common_ompio_access_array_t * others_req,struct iovec * offset_len,int contig_access_count,OMPI_MPI_OFFSET_TYPE min_st_offset,OMPI_MPI_OFFSET_TYPE fd_size,OMPI_MPI_OFFSET_TYPE * fd_start,OMPI_MPI_OFFSET_TYPE * fd_end,Flatlist_node * flat_buf,size_t * buf_idx,int striping_unit,int two_phase_num_io_procs,int * aggregator_list)532 static int two_phase_read_and_exch(ompio_file_t *fh,
533 				   void *buf,
534 				   MPI_Datatype datatype,
535 				   mca_common_ompio_access_array_t *others_req,
536 				   struct iovec *offset_len,
537 				   int contig_access_count,
538 				   OMPI_MPI_OFFSET_TYPE min_st_offset,
539 				   OMPI_MPI_OFFSET_TYPE fd_size,
540 				   OMPI_MPI_OFFSET_TYPE *fd_start,
541 				   OMPI_MPI_OFFSET_TYPE *fd_end,
542 				   Flatlist_node *flat_buf,
543 				   size_t *buf_idx, int striping_unit,
544 				   int two_phase_num_io_procs,
545 				   int *aggregator_list){
546 
547 
548     int ret=OMPI_SUCCESS, i = 0, j = 0, ntimes = 0, max_ntimes = 0;
549     int m = 0;
550     int *curr_offlen_ptr=NULL, *count=NULL, *send_size=NULL, *recv_size=NULL;
551     int *partial_send=NULL, *start_pos=NULL, req_len=0, flag=0;
552     int *recd_from_proc=NULL;
553     MPI_Aint buftype_extent=0;
554     size_t byte_size = 0;
555     OMPI_MPI_OFFSET_TYPE st_loc=-1, end_loc=-1, off=0, done=0, for_next_iter=0;
556     OMPI_MPI_OFFSET_TYPE size=0, req_off=0, real_size=0, real_off=0, len=0;
557     OMPI_MPI_OFFSET_TYPE for_curr_iter=0;
558     char *read_buf=NULL, *tmp_buf=NULL;
559     MPI_Datatype byte = MPI_BYTE;
560     int two_phase_cycle_buffer_size=0;
561 
562     opal_datatype_type_size(&byte->super,
563 			    &byte_size);
564 
565     for (i = 0; i < fh->f_size; i++){
566 	if (others_req[i].count) {
567 	    st_loc = others_req[i].offsets[0];
568 	    end_loc = others_req[i].offsets[0];
569 	    break;
570 	}
571     }
572 
573     for (i=0;i<fh->f_size;i++){
574 	for(j=0;j< others_req[i].count; j++){
575 	    st_loc =
576 		OMPIO_MIN(st_loc, others_req[i].offsets[j]);
577 	    end_loc =
578 		OMPIO_MAX(end_loc, (others_req[i].offsets[j] +
579 				    others_req[i].lens[j] - 1));
580 	}
581     }
582 
583     two_phase_cycle_buffer_size = fh->f_bytes_per_agg;
584     ntimes = (int)((end_loc - st_loc + two_phase_cycle_buffer_size)/
585 		   two_phase_cycle_buffer_size);
586 
587     if ((st_loc == -1) && (end_loc == -1)){
588 	ntimes = 0;
589     }
590 
591     fh->f_comm->c_coll->coll_allreduce (&ntimes,
592 				       &max_ntimes,
593 				       1,
594 				       MPI_INT,
595 				       MPI_MAX,
596 				       fh->f_comm,
597 				       fh->f_comm->c_coll->coll_allreduce_module);
598 
599     if (ntimes){
600 	read_buf = (char *) calloc (two_phase_cycle_buffer_size, sizeof(char));
601 	if ( NULL == read_buf ){
602 	    ret =  OMPI_ERR_OUT_OF_RESOURCE;
603 	    goto exit;
604 	}
605     }
606 
607     curr_offlen_ptr = (int *)calloc (fh->f_size,
608 				     sizeof(int));
609     if (NULL == curr_offlen_ptr){
610 	ret = OMPI_ERR_OUT_OF_RESOURCE;
611 	goto exit;
612     }
613 
614     count = (int *)calloc (fh->f_size,
615 			   sizeof(int));
616     if (NULL == count){
617 	ret = OMPI_ERR_OUT_OF_RESOURCE;
618 	goto exit;
619     }
620 
621     partial_send = (int *)calloc(fh->f_size, sizeof(int));
622     if ( NULL == partial_send ){
623 	ret = OMPI_ERR_OUT_OF_RESOURCE;
624 	goto exit;
625     }
626 
627     send_size = (int *)malloc(fh->f_size * sizeof(int));
628     if (NULL == send_size){
629 	ret = OMPI_ERR_OUT_OF_RESOURCE;
630 	goto exit;
631     }
632 
633     recv_size = (int *)malloc(fh->f_size * sizeof(int));
634     if (NULL == recv_size){
635 	ret = OMPI_ERR_OUT_OF_RESOURCE;
636 	goto exit;
637     }
638 
639     recd_from_proc = (int *)calloc(fh->f_size,sizeof(int));
640     if (NULL == recd_from_proc){
641 	ret = OMPI_ERR_OUT_OF_RESOURCE;
642 	goto exit;
643     }
644 
645     start_pos = (int *) calloc(fh->f_size, sizeof(int));
646     if ( NULL == start_pos ){
647         ret = OMPI_ERR_OUT_OF_RESOURCE;
648         goto exit;
649     }
650 
651     done = 0;
652     off = st_loc;
653     for_curr_iter = for_next_iter = 0;
654 
655     ompi_datatype_type_extent(datatype, &buftype_extent);
656 
657     for (m=0; m<ntimes; m++) {
658 
659 	size = OMPIO_MIN((unsigned)two_phase_cycle_buffer_size, end_loc-st_loc+1-done);
660 	real_off = off - for_curr_iter;
661 	real_size = size + for_curr_iter;
662 
663 	for (i=0; i<fh->f_size; i++) count[i] = send_size[i] = 0;
664 	for_next_iter = 0;
665 
666 	for (i=0; i<fh->f_size; i++) {
667 	    if (others_req[i].count) {
668 		start_pos[i] = curr_offlen_ptr[i];
669 		for (j=curr_offlen_ptr[i]; j<others_req[i].count;
670 		     j++) {
671 		    if (partial_send[i]) {
672 			/* this request may have been partially
673 			   satisfied in the previous iteration. */
674 			req_off = others_req[i].offsets[j] +
675 			    partial_send[i];
676 			req_len = others_req[i].lens[j] -
677 			    partial_send[i];
678 			partial_send[i] = 0;
679 			/* modify the off-len pair to reflect this change */
680 			others_req[i].offsets[j] = req_off;
681 			others_req[i].lens[j] = req_len;
682 		    }
683 		    else {
684 			req_off = others_req[i].offsets[j];
685 			req_len = others_req[i].lens[j];
686 		    }
687 		    if (req_off < real_off + real_size) {
688 			count[i]++;
689 			PMPI_Get_address(read_buf+req_off-real_off,
690 				     &(others_req[i].mem_ptrs[j]));
691 
692 			send_size[i] += (int)(OMPIO_MIN(real_off + real_size - req_off,
693 							(OMPI_MPI_OFFSET_TYPE)req_len));
694 
695 			if (real_off+real_size-req_off < (OMPI_MPI_OFFSET_TYPE)req_len) {
696 			    partial_send[i] = (int) (real_off + real_size - req_off);
697 			    if ((j+1 < others_req[i].count) &&
698 				(others_req[i].offsets[j+1] <
699 				 real_off+real_size)) {
700 				/* this is the case illustrated in the
701 				   figure above. */
702 				for_next_iter = OMPIO_MAX(for_next_iter,
703 							  real_off + real_size - others_req[i].offsets[j+1]);
704 				/* max because it must cover requests
705 				   from different processes */
706 			    }
707 			    break;
708 			}
709 		    }
710 		    else break;
711 		}
712 		curr_offlen_ptr[i] = j;
713 	    }
714 	}
715 	flag = 0;
716 	for (i=0; i<fh->f_size; i++)
717 	    if (count[i]) flag = 1;
718 
719 	if (flag) {
720 
721 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
722 	    start_read_time = MPI_Wtime();
723 #endif
724 
725 	    len = size * byte_size;
726 	    fh->f_io_array = (mca_common_ompio_io_array_t *)calloc
727 		(1,sizeof(mca_common_ompio_io_array_t));
728 	    if (NULL == fh->f_io_array) {
729 		opal_output(1, "OUT OF MEMORY\n");
730                 ret = OMPI_ERR_OUT_OF_RESOURCE;
731                 goto exit;
732 	    }
733 	    fh->f_io_array[0].offset = (IOVBASE_TYPE *)(intptr_t)off;
734 	    fh->f_io_array[0].length = len;
735 	    fh->f_io_array[0].memory_address =
736 		read_buf+for_curr_iter;
737 	    fh->f_num_of_io_entries = 1;
738 
739 	    if (fh->f_num_of_io_entries){
740 		if ( 0 > fh->f_fbtl->fbtl_preadv (fh)) {
741 		    opal_output(1, "READ FAILED\n");
742                     ret = OMPI_ERROR;
743                     goto exit;
744 		}
745 	    }
746 
747 #if 0
748 	    int ii;
749 	    printf("%d: len/4 : %lld\n",
750 		   fh->f_rank,
751 		   len/4);
752 	    for (ii = 0; ii < len/4 ;ii++){
753 		printf("%d: read_buf[%d]: %ld\n",
754 		       fh->f_rank,
755 		       ii,
756 		       (int *)read_buf[ii]);
757 	    }
758 #endif
759 	    fh->f_num_of_io_entries = 0;
760 	    if (NULL != fh->f_io_array) {
761 		free (fh->f_io_array);
762 		fh->f_io_array = NULL;
763 	    }
764 
765 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
766 	    end_read_time = MPI_Wtime();
767 	    read_time += (end_read_time - start_read_time);
768 #endif
769 
770 
771 	}
772 
773 	for_curr_iter = for_next_iter;
774 
775 	for (i=0; i< fh->f_size; i++){
776 	    recv_size[i]  = 0;
777 	}
778 	two_phase_exchange_data(fh, buf, offset_len,
779 				send_size, start_pos, recv_size, count,
780 				partial_send, recd_from_proc,
781 				contig_access_count,
782 				min_st_offset, fd_size, fd_start, fd_end,
783 				flat_buf, others_req, m, buf_idx,
784 				buftype_extent, striping_unit, two_phase_num_io_procs,
785 				aggregator_list);
786 
787 	if (for_next_iter){
788 	    tmp_buf = (char *) calloc (for_next_iter, sizeof(char));
789 	    memcpy(tmp_buf,
790 		   read_buf+real_size-for_next_iter,
791 		   for_next_iter);
792 	    free(read_buf);
793 	    read_buf = (char *)malloc(for_next_iter+two_phase_cycle_buffer_size);
794 	    memcpy(read_buf, tmp_buf, for_next_iter);
795 	    free(tmp_buf);
796 	}
797 
798 	off += size;
799 	done += size;
800     }
801 
802     for (i=0; i<fh->f_size; i++) count[i] = send_size[i] = 0;
803     for (m=ntimes; m<max_ntimes; m++)
804 	two_phase_exchange_data(fh, buf, offset_len, send_size,
805 				start_pos, recv_size, count,
806 				partial_send, recd_from_proc,
807 				contig_access_count,
808 				min_st_offset, fd_size, fd_start, fd_end,
809 				flat_buf, others_req, m, buf_idx,
810 				buftype_extent, striping_unit, two_phase_num_io_procs,
811 				aggregator_list);
812 
813 exit:
814     free (read_buf);
815     free (curr_offlen_ptr);
816     free (count);
817     free (partial_send);
818     free (send_size);
819     free (recv_size);
820     free (recd_from_proc);
821     free (start_pos);
822 
823     return ret;
824 
825 }
826 
two_phase_exchange_data(ompio_file_t * fh,void * buf,struct iovec * offset_len,int * send_size,int * start_pos,int * recv_size,int * count,int * partial_send,int * recd_from_proc,int contig_access_count,OMPI_MPI_OFFSET_TYPE min_st_offset,OMPI_MPI_OFFSET_TYPE fd_size,OMPI_MPI_OFFSET_TYPE * fd_start,OMPI_MPI_OFFSET_TYPE * fd_end,Flatlist_node * flat_buf,mca_common_ompio_access_array_t * others_req,int iter,size_t * buf_idx,MPI_Aint buftype_extent,int striping_unit,int two_phase_num_io_procs,int * aggregator_list)827 static int two_phase_exchange_data(ompio_file_t *fh,
828 				   void *buf, struct iovec *offset_len,
829 				   int *send_size, int *start_pos, int *recv_size,
830 				   int *count, int *partial_send,
831 				   int *recd_from_proc, int contig_access_count,
832 				   OMPI_MPI_OFFSET_TYPE min_st_offset,
833 				   OMPI_MPI_OFFSET_TYPE fd_size,
834 				   OMPI_MPI_OFFSET_TYPE *fd_start,
835 				   OMPI_MPI_OFFSET_TYPE *fd_end,
836 				   Flatlist_node *flat_buf,
837 				   mca_common_ompio_access_array_t *others_req,
838 				   int iter, size_t *buf_idx,
839 				   MPI_Aint buftype_extent, int striping_unit,
840 				   int two_phase_num_io_procs, int *aggregator_list)
841 {
842 
843     int i=0, j=0, k=0, tmp=0, nprocs_recv=0, nprocs_send=0;
844     int ret = OMPI_SUCCESS;
845     char **recv_buf = NULL;
846     MPI_Request *requests=NULL;
847     MPI_Datatype send_type;
848 
849 
850 
851 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
852     start_rcomm_time = MPI_Wtime();
853 #endif
854 
855     ret = fh->f_comm->c_coll->coll_alltoall (send_size,
856 					    1,
857 					    MPI_INT,
858 					    recv_size,
859 					    1,
860 					    MPI_INT,
861 					    fh->f_comm,
862 					    fh->f_comm->c_coll->coll_alltoall_module);
863 
864     if ( OMPI_SUCCESS != ret ){
865 	goto exit;
866     }
867 
868 
869 #if DEBUG
870     for (i=0; i<fh->f_size; i++){
871 	printf("%d: RS[%d]: %d\n", fh->f_rank,
872 	       i,
873 	       recv_size[i]);
874     }
875 #endif
876 
877 
878     nprocs_recv = 0;
879     for (i=0; i < fh->f_size; i++)
880 	if (recv_size[i]) nprocs_recv++;
881 
882     nprocs_send = 0;
883     for (i=0; i< fh->f_size; i++)
884 	if (send_size[i]) nprocs_send++;
885 
886     requests = (MPI_Request *)
887 	malloc((nprocs_send+nprocs_recv+1) *  sizeof(MPI_Request));
888 
889     if (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY) {
890 	j = 0;
891 	for (i=0; i < fh->f_size; i++){
892 	    if (recv_size[i]){
893 		ret = MCA_PML_CALL(irecv(((char *) buf)+ buf_idx[i],
894 					 recv_size[i],
895 					 MPI_BYTE,
896 					 i,
897 					 fh->f_rank+i+100*iter,
898 					 fh->f_comm,
899 					 requests+j));
900 
901 		if ( OMPI_SUCCESS != ret ){
902 		    return ret;
903 		}
904 		j++;
905 		buf_idx[i] += recv_size[i];
906 	    }
907 	}
908     }
909     else{
910 
911 	recv_buf = (char **) calloc (fh->f_size, sizeof(char *));
912 	if (NULL == recv_buf){
913 	    ret = OMPI_ERR_OUT_OF_RESOURCE;
914 	    goto exit;
915 	}
916 
917 	for (i=0; i < fh->f_size; i++)
918 	    if(recv_size[i]) recv_buf[i] =
919 				 (char *) malloc (recv_size[i] *  sizeof(char));
920 	j = 0;
921 	for(i=0; i<fh->f_size; i++)
922 	    if (recv_size[i]) {
923 		ret = MCA_PML_CALL(irecv(recv_buf[i],
924 					 recv_size[i],
925 					 MPI_BYTE,
926 					 i,
927 					 fh->f_rank+i+100*iter,
928 					 fh->f_comm,
929 					 requests+j));
930 		j++;
931 
932 	    }
933     }
934 
935 
936 
937     j = 0;
938     for (i = 0; i< fh->f_size; i++){
939 	if (send_size[i]){
940 	    if (partial_send[i]){
941 		k = start_pos[i] + count[i] - 1;
942 		tmp = others_req[i].lens[k];
943 		others_req[i].lens[k] = partial_send[i];
944 	    }
945 
946 	    ompi_datatype_create_hindexed(count[i],
947 					  &(others_req[i].lens[start_pos[i]]),
948 					  &(others_req[i].mem_ptrs[start_pos[i]]),
949 					  MPI_BYTE,
950 					  &send_type);
951 
952 	    ompi_datatype_commit(&send_type);
953 
954 	    ret = MCA_PML_CALL(isend(MPI_BOTTOM,
955 				     1,
956 				     send_type,
957 				     i,
958 				     fh->f_rank+i+100*iter,
959 				     MCA_PML_BASE_SEND_STANDARD,
960 				     fh->f_comm,
961 				     requests+nprocs_recv+j));
962 	    ompi_datatype_destroy(&send_type);
963 
964 	    if (partial_send[i]) others_req[i].lens[k] = tmp;
965 	    j++;
966 	}
967     }
968 
969 
970     if (nprocs_recv) {
971 
972 	ret = ompi_request_wait_all(nprocs_recv,
973 				    requests,
974 				    MPI_STATUS_IGNORE);
975         if (OMPI_SUCCESS != ret) {
976             goto exit;
977         }
978 
979 	if (! (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
980 
981 	    two_phase_fill_user_buffer(fh, buf, flat_buf,
982 				       recv_buf, offset_len,
983 				       (unsigned *)recv_size, requests,
984 				       recd_from_proc, contig_access_count,
985 				       min_st_offset, fd_size, fd_start, fd_end,
986 				       buftype_extent, striping_unit, two_phase_num_io_procs,
987 				       aggregator_list);
988 	}
989     }
990 
991     ret = ompi_request_wait_all(nprocs_send,
992 				requests+nprocs_recv,
993 				MPI_STATUS_IGNORE);
994 
995 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
996     end_rcomm_time = MPI_Wtime();
997     rcomm_time += (end_rcomm_time - start_rcomm_time);
998 #endif
999 
1000 exit:
1001 
1002     if (recv_buf) {
1003 	for (i=0; i< fh->f_size; i++){
1004             free(recv_buf[i]);
1005 	}
1006 
1007 	free(recv_buf);
1008     }
1009 
1010     free(requests);
1011 
1012     return ret;
1013 
1014 }
1015 
1016 
1017 #define TWO_PHASE_BUF_INCR			\
1018     {						\
1019 	while (buf_incr) {				\
1020 	    size_in_buf = OMPIO_MIN(buf_incr, flat_buf_sz);	\
1021 	    user_buf_idx += size_in_buf;			\
1022 	    flat_buf_sz -= size_in_buf;				\
1023 	    if (!flat_buf_sz) {					      \
1024 		if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1025 		else {							\
1026 		    flat_buf_idx = 0;					\
1027 		    n_buftypes++;					\
1028 		}							\
1029 		user_buf_idx = flat_buf->indices[flat_buf_idx] +	\
1030 		(OMPI_MPI_OFFSET_TYPE)n_buftypes*(OMPI_MPI_OFFSET_TYPE)buftype_extent; \
1031             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
1032         } \
1033         buf_incr -= size_in_buf; \
1034     } \
1035 }
1036 
1037 
1038 #define TWO_PHASE_BUF_COPY \
1039 { \
1040     while (size) { \
1041         size_in_buf = OMPIO_MIN(size, flat_buf_sz); \
1042 	memcpy(((char *) buf) + user_buf_idx, \
1043 	       &(recv_buf[p][recv_buf_idx[p]]), size_in_buf); \
1044 	recv_buf_idx[p] += size_in_buf; \
1045 	user_buf_idx += size_in_buf; \
1046 	flat_buf_sz -= size_in_buf; \
1047         if (!flat_buf_sz) { \
1048            if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1049             else { \
1050                 flat_buf_idx = 0; \
1051                 n_buftypes++; \
1052             } \
1053             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
1054 	      (OMPI_MPI_OFFSET_TYPE)n_buftypes*(OMPI_MPI_OFFSET_TYPE)buftype_extent; \
1055             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
1056         } \
1057         size -= size_in_buf; \
1058         buf_incr -= size_in_buf; \
1059     } \
1060     TWO_PHASE_BUF_INCR \
1061 }
1062 
1063 
1064 
two_phase_fill_user_buffer(ompio_file_t * fh,void * buf,Flatlist_node * flat_buf,char ** recv_buf,struct iovec * offset_length,unsigned * recv_size,MPI_Request * requests,int * recd_from_proc,int contig_access_count,OMPI_MPI_OFFSET_TYPE min_st_offset,OMPI_MPI_OFFSET_TYPE fd_size,OMPI_MPI_OFFSET_TYPE * fd_start,OMPI_MPI_OFFSET_TYPE * fd_end,MPI_Aint buftype_extent,int striping_unit,int two_phase_num_io_procs,int * aggregator_list)1065 static void two_phase_fill_user_buffer(ompio_file_t *fh,
1066 				       void *buf,
1067 				       Flatlist_node *flat_buf,
1068 				       char **recv_buf,
1069 				       struct iovec *offset_length,
1070 				       unsigned *recv_size,
1071 				       MPI_Request *requests,
1072 				       int *recd_from_proc,
1073 				       int contig_access_count,
1074 				       OMPI_MPI_OFFSET_TYPE min_st_offset,
1075 				       OMPI_MPI_OFFSET_TYPE fd_size,
1076 				       OMPI_MPI_OFFSET_TYPE *fd_start,
1077 				       OMPI_MPI_OFFSET_TYPE *fd_end,
1078 				       MPI_Aint buftype_extent,
1079 				       int striping_unit, int two_phase_num_io_procs,
1080 				       int *aggregator_list){
1081 
1082     int i = 0, p = 0, flat_buf_idx = 0;
1083     OMPI_MPI_OFFSET_TYPE flat_buf_sz = 0, size_in_buf = 0, buf_incr = 0, size = 0;
1084     int n_buftypes = 0;
1085     OMPI_MPI_OFFSET_TYPE off=0, len=0, rem_len=0, user_buf_idx=0;
1086     unsigned *curr_from_proc=NULL, *done_from_proc=NULL, *recv_buf_idx=NULL;
1087 
1088     curr_from_proc = (unsigned *) malloc (fh->f_size * sizeof(unsigned));
1089     done_from_proc = (unsigned *) malloc (fh->f_size * sizeof(unsigned));
1090     recv_buf_idx = (unsigned *) malloc (fh->f_size * sizeof(unsigned));
1091 
1092     for (i=0; i < fh->f_size; i++) {
1093 	recv_buf_idx[i] = curr_from_proc[i] = 0;
1094 	done_from_proc[i] = recd_from_proc[i];
1095     }
1096 
1097     flat_buf_idx = 0;
1098     n_buftypes = 0;
1099 
1100     if ( flat_buf->count > 0 ) {
1101 	user_buf_idx = flat_buf->indices[0];
1102 	flat_buf_sz = flat_buf->blocklens[0];
1103     }
1104 
1105     /* flat_buf_idx = current index into flattened buftype
1106        flat_buf_sz = size of current contiguous component in
1107        flattened buf */
1108 
1109     for (i=0; i<contig_access_count; i++) {
1110 
1111 	off     = (OMPI_MPI_OFFSET_TYPE)(intptr_t)offset_length[i].iov_base;
1112 	rem_len = (OMPI_MPI_OFFSET_TYPE)offset_length[i].iov_len;
1113 
1114 	/* this request may span the file domains of more than one process */
1115 	while (rem_len != 0) {
1116 	    len = rem_len;
1117 	    /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no
1118 	     * longer than the single region that processor "p" is responsible
1119 	     * for.
1120 	     */
1121 	    p = mca_fcoll_two_phase_calc_aggregator(fh,
1122 						    off,
1123 						    min_st_offset,
1124 						    &len,
1125 						    fd_size,
1126 						    fd_start,
1127 						    fd_end,
1128 						    striping_unit,
1129 						    two_phase_num_io_procs,
1130 						    aggregator_list);
1131 
1132 	    if (recv_buf_idx[p] < recv_size[p]) {
1133 		if (curr_from_proc[p]+len > done_from_proc[p]) {
1134 		    if (done_from_proc[p] > curr_from_proc[p]) {
1135 			size = OMPIO_MIN(curr_from_proc[p] + len -
1136 					 done_from_proc[p], recv_size[p]-recv_buf_idx[p]);
1137 			buf_incr = done_from_proc[p] - curr_from_proc[p];
1138 			TWO_PHASE_BUF_INCR
1139 			    buf_incr = curr_from_proc[p]+len-done_from_proc[p];
1140 			curr_from_proc[p] = done_from_proc[p] + size;
1141 			TWO_PHASE_BUF_COPY
1142 			    }
1143 		    else {
1144 			size = OMPIO_MIN(len,recv_size[p]-recv_buf_idx[p]);
1145 			buf_incr = len;
1146 			curr_from_proc[p] += (unsigned) size;
1147 			TWO_PHASE_BUF_COPY
1148 			    }
1149 		}
1150 		else {
1151 		    curr_from_proc[p] += (unsigned) len;
1152 		    buf_incr = len;
1153 		    TWO_PHASE_BUF_INCR
1154 			}
1155 	    }
1156 	    else {
1157 		buf_incr = len;
1158 		TWO_PHASE_BUF_INCR
1159 		    }
1160 	    off += len;
1161 	    rem_len -= len;
1162 	}
1163     }
1164     for (i=0; i < fh->f_size; i++)
1165 	if (recv_size[i]) recd_from_proc[i] = curr_from_proc[i];
1166 
1167     free(curr_from_proc);
1168     free(done_from_proc);
1169     free(recv_buf_idx);
1170 
1171 }
1172 
1173 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
isread_aggregator(int rank,int nprocs_for_coll,int * aggregator_list)1174 int isread_aggregator(int rank,
1175 		      int nprocs_for_coll,
1176 		      int *aggregator_list){
1177 
1178     int i=0;
1179     for (i=0; i<nprocs_for_coll; i++){
1180 	if (aggregator_list[i] == rank)
1181 	    return 1;
1182     }
1183     return 0;
1184 }
1185 #endif
1186