1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
4  *                         University Research and Technology
5  *                         Corporation.  All rights reserved.
6  * Copyright (c) 2004-2017 The University of Tennessee and The University
7  *                         of Tennessee Research Foundation.  All rights
8  *                         reserved.
9  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
10  *                         University of Stuttgart.  All rights reserved.
11  * Copyright (c) 2004-2005 The Regents of the University of California.
12  *                         All rights reserved.
13  * Copyright (c) 2008-2014 University of Houston. All rights reserved.
14  * Copyright (c) 2015      Cisco Systems, Inc.  All rights reserved.
15  * Copyright (c) 2015      Los Alamos National Security, LLC. All rights
16  *                         reserved.
17  * Copyright (c) 2017      Research Organization for Information Science
18  *                         and Technology (RIST). All rights reserved.
19  * $COPYRIGHT$
20  *
21  * Additional copyrights may follow
22  *
23  * $HEADER$
24 */
25 
26 #include "ompi_config.h"
27 #include "fcoll_two_phase.h"
28 #include "mpi.h"
29 #include "ompi/constants.h"
30 #include "ompi/communicator/communicator.h"
31 #include "ompi/mca/fcoll/fcoll.h"
32 #include "ompi/mca/io/ompio/io_ompio.h"
33 #include "ompi/mca/io/io.h"
34 #include "opal/mca/base/base.h"
35 #include "math.h"
36 #include "ompi/mca/pml/pml.h"
37 #include <unistd.h>
38 
39 #define DEBUG 0
40 
41 /* Two Phase implementation from ROMIO ported to OMPIO infrastructure
42  * This is pretty much the same as ROMIO's two_phase and based on ROMIO's code
43  * base
44  */
45 
46 
47 /* Datastructure to support specifying the flat-list. */
48 typedef struct flat_list_node {
49     MPI_Datatype type;
50     int count;
51     OMPI_MPI_OFFSET_TYPE *blocklens;
52     OMPI_MPI_OFFSET_TYPE *indices;
53     struct flat_list_node *next;
54 }Flatlist_node;
55 
56 /* local function declarations  */
57 static int two_phase_read_and_exch(mca_io_ompio_file_t *fh,
58 				   void *buf,
59 				   MPI_Datatype datatype,
60 				   mca_io_ompio_access_array_t *others_req,
61 				   struct iovec *offset_len,
62 				   int contig_access_count,
63 				   OMPI_MPI_OFFSET_TYPE min_st_offset,
64 				   OMPI_MPI_OFFSET_TYPE fd_size,
65 				   OMPI_MPI_OFFSET_TYPE *fd_start,
66 				   OMPI_MPI_OFFSET_TYPE *fd_end,
67 				   Flatlist_node *flat_buf,
68 				   size_t *buf_idx, int striping_unit,
69 				   int num_io_procs, int *aggregator_list);
70 
71 static int  two_phase_exchange_data(mca_io_ompio_file_t *fh,
72 				    void *buf,
73 				    struct iovec *offset_length,
74 				    int *send_size, int *start_pos,
75 				    int *recv_size,
76 				    int *count,
77 				    int *partial_send, int *recd_from_proc,
78 				    int contig_access_count,
79 				    OMPI_MPI_OFFSET_TYPE min_st_offset,
80 				    OMPI_MPI_OFFSET_TYPE fd_size,
81 				    OMPI_MPI_OFFSET_TYPE *fd_start,
82 				    OMPI_MPI_OFFSET_TYPE *fd_end,
83 				    Flatlist_node *flat_buf,
84 				    mca_io_ompio_access_array_t *others_req,
85 				    int iter,
86 				    size_t *buf_idx, MPI_Aint buftype_extent,
87 				    int striping_unit, int num_io_procs,
88 				    int *aggregator_list);
89 
90 
91 static void two_phase_fill_user_buffer(mca_io_ompio_file_t *fh,
92 				       void *buf,
93 				       Flatlist_node *flat_buf,
94 				       char **recv_buf,
95 				       struct iovec *offset_length,
96 				       unsigned *recv_size,
97 				       MPI_Request *requests,
98 				       int *recd_from_proc,
99 				       int contig_access_count,
100 				       OMPI_MPI_OFFSET_TYPE min_st_offset,
101 				       OMPI_MPI_OFFSET_TYPE fd_size,
102 				       OMPI_MPI_OFFSET_TYPE *fd_start,
103 				       OMPI_MPI_OFFSET_TYPE *fd_end,
104 				       MPI_Aint buftype_extent,
105 				       int striping_unit,
106 				       int num_io_procs, int *aggregator_list);
107 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
108 static int isread_aggregator(int rank,
109 			     int nprocs_for_coll,
110 			     int *aggregator_list);
111 
112 #endif
113 
114 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
115 double read_time = 0.0, start_read_time = 0.0, end_read_time = 0.0;
116 double rcomm_time = 0.0, start_rcomm_time = 0.0, end_rcomm_time = 0.0;
117 double read_exch = 0.0, start_rexch = 0.0, end_rexch = 0.0;
118 #endif
119 
120 
121 int
mca_fcoll_two_phase_file_read_all(mca_io_ompio_file_t * fh,void * buf,int count,struct ompi_datatype_t * datatype,ompi_status_public_t * status)122 mca_fcoll_two_phase_file_read_all (mca_io_ompio_file_t *fh,
123 				   void *buf,
124 				   int count,
125 				   struct ompi_datatype_t *datatype,
126 				   ompi_status_public_t *status)
127 {
128 
129     int ret = OMPI_SUCCESS, i = 0, j = 0, interleave_count = 0, striping_unit = 0;
130     MPI_Aint recv_buf_addr = 0;
131     uint32_t iov_count = 0, ti = 0;
132     struct iovec *decoded_iov = NULL, *temp_iov = NULL, *iov = NULL;
133     size_t max_data = 0;
134     long long_max_data = 0, long_total_bytes = 0;
135     int domain_size=0, *count_my_req_per_proc=NULL, count_my_req_procs = 0;
136     int count_other_req_procs;
137     size_t *buf_indices=NULL;
138     int *aggregator_list = NULL, local_count = 0, local_size = 0;
139     int two_phase_num_io_procs=1;
140     OMPI_MPI_OFFSET_TYPE start_offset = 0, end_offset = 0, fd_size = 0;
141     OMPI_MPI_OFFSET_TYPE *start_offsets=NULL, *end_offsets=NULL;
142     OMPI_MPI_OFFSET_TYPE *fd_start=NULL, *fd_end=NULL, min_st_offset = 0;
143     Flatlist_node *flat_buf=NULL;
144     mca_io_ompio_access_array_t *my_req=NULL, *others_req=NULL;
145 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
146     mca_common_ompio_print_entry nentry;
147 #endif
148 //    if (opal_datatype_is_predefined(&datatype->super)) {
149 //	fh->f_flags = fh->f_flags |  OMPIO_CONTIGUOUS_MEMORY;
150 //    }
151 
152     if (! (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
153 	ret =   fh->f_decode_datatype ((struct mca_io_ompio_file_t *)fh,
154 				       datatype,
155 				       count,
156 				       buf,
157 				       &max_data,
158 				       &temp_iov,
159 				       &iov_count);
160 	if (OMPI_SUCCESS != ret ){
161 	    goto exit;
162 	}
163 
164 	recv_buf_addr = (size_t)(buf);
165 	decoded_iov  = (struct iovec *) calloc
166 	    (iov_count, sizeof(struct iovec));
167 
168 	for (ti = 0; ti < iov_count; ti++){
169 
170 	    decoded_iov[ti].iov_base = (IOVBASE_TYPE *)
171 		((ptrdiff_t)temp_iov[ti].iov_base - recv_buf_addr);
172 	    decoded_iov[ti].iov_len = temp_iov[ti].iov_len;
173 #if DEBUG
174 	    printf("d_offset[%d]: %ld, d_len[%d]: %ld\n",
175 		   ti, (ptrdiff_t)decoded_iov[ti].iov_base,
176 		   ti, decoded_iov[ti].iov_len);
177 #endif
178 	}
179 
180     }
181     else{
182 	max_data = count * datatype->super.size;
183     }
184 
185     if ( MPI_STATUS_IGNORE != status ) {
186 	status->_ucount = max_data;
187     }
188 
189     fh->f_get_num_aggregators (&two_phase_num_io_procs);
190     if (-1 == two_phase_num_io_procs ){
191 	ret = fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *)fh,
192 					  two_phase_num_io_procs,
193 					  max_data);
194 	if (OMPI_SUCCESS != ret){
195             goto exit;
196 	}
197 
198 	two_phase_num_io_procs = fh->f_final_num_aggrs;
199 
200     }
201 
202     if (two_phase_num_io_procs > fh->f_size){
203         two_phase_num_io_procs = fh->f_size;
204     }
205 
206     aggregator_list = (int *) calloc (two_phase_num_io_procs, sizeof(int));
207     if (NULL == aggregator_list){
208 	ret = OMPI_ERR_OUT_OF_RESOURCE;
209 	goto exit;
210     }
211 
212     if ( OMPI_COMM_IS_MAPBY_NODE (&ompi_mpi_comm_world.comm) ) {
213         for (i =0; i< two_phase_num_io_procs; i++){
214             aggregator_list[i] = i;
215         }
216     }
217     else {
218         for (i =0; i< two_phase_num_io_procs; i++){
219             aggregator_list[i] = i * fh->f_size / two_phase_num_io_procs;
220         }
221     }
222 
223     ret = fh->f_generate_current_file_view ((struct mca_io_ompio_file_t *)fh,
224 					    max_data,
225 					    &iov,
226 					    &local_count);
227 
228     if (OMPI_SUCCESS != ret){
229 	goto exit;
230     }
231 
232     long_max_data = (long) max_data;
233     ret = fh->f_comm->c_coll->coll_allreduce (&long_max_data,
234 					     &long_total_bytes,
235 					     1,
236 					     MPI_LONG,
237 					     MPI_SUM,
238 					     fh->f_comm,
239 					     fh->f_comm->c_coll->coll_allreduce_module);
240 
241     if ( OMPI_SUCCESS != ret ) {
242 	goto exit;
243     }
244 
245     if (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
246 
247 	/* This datastructre translates between OMPIO->ROMIO its a little hacky!*/
248 	/* But helps to re-use romio's code for handling non-contiguous file-type*/
249 	/*Flattened datatype for ompio is in decoded_iov it translated into
250 	  flatbuf*/
251 
252 	flat_buf = (Flatlist_node *)calloc(1, sizeof(Flatlist_node));
253 	if ( NULL == flat_buf ){
254 	    ret = OMPI_ERR_OUT_OF_RESOURCE;
255 	    goto exit;
256 	}
257 
258 	flat_buf->type = datatype;
259 	flat_buf->next = NULL;
260 	flat_buf->count = 0;
261 	flat_buf->indices = NULL;
262 	flat_buf->blocklens = NULL;
263 
264 	if ( 0 < count ) {
265 	    local_size = OMPIO_MAX(1,iov_count/count);
266 	}
267 	else {
268 	    local_size = 0;
269 	}
270 
271 
272 	if ( 0 < local_size ) {
273 	    flat_buf->indices =
274 		(OMPI_MPI_OFFSET_TYPE *)calloc(local_size,
275 					       sizeof(OMPI_MPI_OFFSET_TYPE));
276 	    if (NULL == flat_buf->indices){
277 		ret = OMPI_ERR_OUT_OF_RESOURCE;
278 		goto exit;
279 	    }
280 
281 	    flat_buf->blocklens =
282 		(OMPI_MPI_OFFSET_TYPE *)calloc(local_size,
283 					       sizeof(OMPI_MPI_OFFSET_TYPE));
284 	    if ( NULL == flat_buf->blocklens ){
285 		ret = OMPI_ERR_OUT_OF_RESOURCE;
286 		goto exit;
287 	    }
288 	}
289 	flat_buf->count = local_size;
290         for (j = 0 ; j < local_size ; ++j) {
291 	    flat_buf->indices[j] = (OMPI_MPI_OFFSET_TYPE)(intptr_t)decoded_iov[j].iov_base;
292 	    flat_buf->blocklens[j] = decoded_iov[j].iov_len;
293 	}
294 
295 #if DEBUG
296 	printf("flat_buf count: %d\n",
297 	       flat_buf->count);
298 	for(i=0;i<flat_buf->count;i++){
299 	    printf("%d: blocklen[%d] : %lld, indices[%d]: %lld\n",
300 		   fh->f_rank, i, flat_buf->blocklens[i], i ,flat_buf->indices[i]);
301 	}
302 #endif
303     }
304 
305 #if DEBUG
306     printf("%d: total_bytes:%ld, local_count: %d\n",
307 	   fh->f_rank, long_total_bytes, local_count);
308     for (i=0 ; i<local_count ; i++) {
309 	printf("%d: fcoll:two_phase:read_all:OFFSET:%ld,LENGTH:%ld\n",
310 	       fh->f_rank,
311 	       (size_t)iov[i].iov_base,
312 	       (size_t)iov[i].iov_len);
313     }
314 #endif
315 
316     start_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)iov[0].iov_base;
317     if ( 0 < local_count ) {
318 	end_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)iov[local_count-1].iov_base +
319 	    (OMPI_MPI_OFFSET_TYPE)(intptr_t)iov[local_count-1].iov_len - 1;
320     }
321     else {
322 	end_offset = 0;
323     }
324 #if DEBUG
325     printf("%d: START OFFSET:%ld, END OFFSET:%ld\n",
326 	   fh->f_rank,
327 	   (size_t)start_offset,
328 	   (size_t)end_offset);
329 #endif
330 
331     start_offsets = (OMPI_MPI_OFFSET_TYPE *)calloc
332 	(fh->f_size, sizeof(OMPI_MPI_OFFSET_TYPE));
333 
334     if ( NULL == start_offsets ){
335 	ret = OMPI_ERR_OUT_OF_RESOURCE;
336 	goto exit;
337     }
338 
339     end_offsets = (OMPI_MPI_OFFSET_TYPE *)calloc
340 	(fh->f_size, sizeof(OMPI_MPI_OFFSET_TYPE));
341 
342     if (NULL == end_offsets){
343 	ret = OMPI_ERR_OUT_OF_RESOURCE;
344 	goto exit;
345     }
346 
347     ret = fh->f_comm->c_coll->coll_allgather(&start_offset,
348 					    1,
349 					    OMPI_OFFSET_DATATYPE,
350 					    start_offsets,
351 					    1,
352 					    OMPI_OFFSET_DATATYPE,
353 					    fh->f_comm,
354 					    fh->f_comm->c_coll->coll_allgather_module);
355 
356     if ( OMPI_SUCCESS != ret ){
357 	goto exit;
358     }
359 
360     ret = fh->f_comm->c_coll->coll_allgather(&end_offset,
361 					    1,
362 					    OMPI_OFFSET_DATATYPE,
363 					    end_offsets,
364 					    1,
365 					    OMPI_OFFSET_DATATYPE,
366 					    fh->f_comm,
367 					    fh->f_comm->c_coll->coll_allgather_module);
368 
369 
370     if ( OMPI_SUCCESS != ret ){
371 	goto exit;
372     }
373 
374 #if DEBUG
375     for (i=0;i<fh->f_size;i++){
376 	printf("%d: start[%d]:%ld,end[%d]:%ld\n",
377 	       fh->f_rank,i,
378 	       (size_t)start_offsets[i],i,
379 	       (size_t)end_offsets[i]);
380     }
381 #endif
382 
383     for (i=1; i<fh->f_size; i++){
384 	if ((start_offsets[i] < end_offsets[i-1]) &&
385 	    (start_offsets[i] <= end_offsets[i])){
386 	    interleave_count++;
387 	}
388     }
389 
390 #if DEBUG
391     printf("%d: interleave_count:%d\n",
392 	   fh->f_rank,interleave_count);
393 #endif
394 
395     ret = mca_fcoll_two_phase_domain_partition(fh,
396 					       start_offsets,
397 					       end_offsets,
398 					       &min_st_offset,
399 					       &fd_start,
400 					       &fd_end,
401 					       domain_size,
402 					       &fd_size,
403 					       striping_unit,
404 					       two_phase_num_io_procs);
405     if (OMPI_SUCCESS != ret){
406 	goto exit;
407     }
408 
409 #if DEBUG
410     for (i=0;i<two_phase_num_io_procs;i++){
411 	printf("fd_start[%d] : %lld, fd_end[%d] : %lld, local_count: %d\n",
412 	       i, fd_start[i], i, fd_end[i], local_count);
413     }
414 #endif
415 
416     ret = mca_fcoll_two_phase_calc_my_requests (fh,
417 						iov,
418 						local_count,
419 						min_st_offset,
420 						fd_start,
421 						fd_end,
422 						fd_size,
423 						&count_my_req_procs,
424 						&count_my_req_per_proc,
425 						&my_req,
426 						&buf_indices,
427 						striping_unit,
428 						two_phase_num_io_procs,
429 						aggregator_list);
430     if ( OMPI_SUCCESS != ret ){
431 	goto exit;
432     }
433 
434     ret = mca_fcoll_two_phase_calc_others_requests(fh,
435 						   count_my_req_procs,
436 						   count_my_req_per_proc,
437 						   my_req,
438 						   &count_other_req_procs,
439 						   &others_req);
440     if (OMPI_SUCCESS != ret ){
441 	goto exit;
442     }
443 
444 #if DEBUG
445     printf("%d count_other_req_procs : %d\n",
446 	   fh->f_rank,
447 	   count_other_req_procs);
448 #endif
449 
450 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
451     start_rexch = MPI_Wtime();
452 #endif
453 
454 
455     ret = two_phase_read_and_exch(fh,
456 				  buf,
457 				  datatype,
458 				  others_req,
459 				  iov,
460 				  local_count,
461 				  min_st_offset,
462 				  fd_size,
463 				  fd_start,
464 				  fd_end,
465 				  flat_buf,
466 				  buf_indices,
467 				  striping_unit,
468 				  two_phase_num_io_procs,
469 				  aggregator_list);
470 
471 
472     if (OMPI_SUCCESS != ret){
473 	goto exit;
474     }
475 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
476     end_rexch = MPI_Wtime();
477     read_exch += (end_rexch - start_rexch);
478     nentry.time[0] = read_time;
479     nentry.time[1] = rcomm_time;
480     nentry.time[2] = read_exch;
481     if (isread_aggregator(fh->f_rank,
482 			  two_phase_num_io_procs,
483 			  aggregator_list)){
484 	nentry.aggregator = 1;
485     }
486     else{
487 	nentry.aggregator = 0;
488     }
489     nentry.nprocs_for_coll = two_phase_num_io_procs;
490 
491 
492     if (!mca_common_ompio_full_print_queue(fh->f_coll_read_time)){
493 	mca_common_ompio_register_print_entry(fh->f_coll_read_time,
494                                               nentry);
495     }
496 #endif
497 
498 
499 exit:
500     if (flat_buf != NULL){
501 	if (flat_buf->blocklens != NULL){
502 	    free (flat_buf->blocklens);
503 	}
504 	if (flat_buf->indices != NULL){
505 	    free (flat_buf->indices);
506 	}
507         free (flat_buf);
508     }
509 
510     free (start_offsets);
511     free (end_offsets);
512     free (aggregator_list);
513     free (fd_start);
514     free (decoded_iov);
515     free (buf_indices);
516     free (count_my_req_per_proc);
517     free (my_req);
518     free (others_req);
519     free (fd_end);
520 
521     return ret;
522 }
523 
524 
525 
526 
two_phase_read_and_exch(mca_io_ompio_file_t * fh,void * buf,MPI_Datatype datatype,mca_io_ompio_access_array_t * others_req,struct iovec * offset_len,int contig_access_count,OMPI_MPI_OFFSET_TYPE min_st_offset,OMPI_MPI_OFFSET_TYPE fd_size,OMPI_MPI_OFFSET_TYPE * fd_start,OMPI_MPI_OFFSET_TYPE * fd_end,Flatlist_node * flat_buf,size_t * buf_idx,int striping_unit,int two_phase_num_io_procs,int * aggregator_list)527 static int two_phase_read_and_exch(mca_io_ompio_file_t *fh,
528 				   void *buf,
529 				   MPI_Datatype datatype,
530 				   mca_io_ompio_access_array_t *others_req,
531 				   struct iovec *offset_len,
532 				   int contig_access_count,
533 				   OMPI_MPI_OFFSET_TYPE min_st_offset,
534 				   OMPI_MPI_OFFSET_TYPE fd_size,
535 				   OMPI_MPI_OFFSET_TYPE *fd_start,
536 				   OMPI_MPI_OFFSET_TYPE *fd_end,
537 				   Flatlist_node *flat_buf,
538 				   size_t *buf_idx, int striping_unit,
539 				   int two_phase_num_io_procs,
540 				   int *aggregator_list){
541 
542 
543     int ret=OMPI_SUCCESS, i = 0, j = 0, ntimes = 0, max_ntimes = 0;
544     int m = 0;
545     int *curr_offlen_ptr=NULL, *count=NULL, *send_size=NULL, *recv_size=NULL;
546     int *partial_send=NULL, *start_pos=NULL, req_len=0, flag=0;
547     int *recd_from_proc=NULL;
548     MPI_Aint buftype_extent=0;
549     size_t byte_size = 0;
550     OMPI_MPI_OFFSET_TYPE st_loc=-1, end_loc=-1, off=0, done=0, for_next_iter=0;
551     OMPI_MPI_OFFSET_TYPE size=0, req_off=0, real_size=0, real_off=0, len=0;
552     OMPI_MPI_OFFSET_TYPE for_curr_iter=0;
553     char *read_buf=NULL, *tmp_buf=NULL;
554     MPI_Datatype byte = MPI_BYTE;
555     int two_phase_cycle_buffer_size=0;
556 
557     opal_datatype_type_size(&byte->super,
558 			    &byte_size);
559 
560     for (i = 0; i < fh->f_size; i++){
561 	if (others_req[i].count) {
562 	    st_loc = others_req[i].offsets[0];
563 	    end_loc = others_req[i].offsets[0];
564 	    break;
565 	}
566     }
567 
568     for (i=0;i<fh->f_size;i++){
569 	for(j=0;j< others_req[i].count; j++){
570 	    st_loc =
571 		OMPIO_MIN(st_loc, others_req[i].offsets[j]);
572 	    end_loc =
573 		OMPIO_MAX(end_loc, (others_req[i].offsets[j] +
574 				    others_req[i].lens[j] - 1));
575 	}
576     }
577 
578     fh->f_get_bytes_per_agg ( &two_phase_cycle_buffer_size);
579     ntimes = (int)((end_loc - st_loc + two_phase_cycle_buffer_size)/
580 		   two_phase_cycle_buffer_size);
581 
582     if ((st_loc == -1) && (end_loc == -1)){
583 	ntimes = 0;
584     }
585 
586     fh->f_comm->c_coll->coll_allreduce (&ntimes,
587 				       &max_ntimes,
588 				       1,
589 				       MPI_INT,
590 				       MPI_MAX,
591 				       fh->f_comm,
592 				       fh->f_comm->c_coll->coll_allreduce_module);
593 
594     if (ntimes){
595 	read_buf = (char *) calloc (two_phase_cycle_buffer_size, sizeof(char));
596 	if ( NULL == read_buf ){
597 	    ret =  OMPI_ERR_OUT_OF_RESOURCE;
598 	    goto exit;
599 	}
600     }
601 
602     curr_offlen_ptr = (int *)calloc (fh->f_size,
603 				     sizeof(int));
604     if (NULL == curr_offlen_ptr){
605 	ret = OMPI_ERR_OUT_OF_RESOURCE;
606 	goto exit;
607     }
608 
609     count = (int *)calloc (fh->f_size,
610 			   sizeof(int));
611     if (NULL == count){
612 	ret = OMPI_ERR_OUT_OF_RESOURCE;
613 	goto exit;
614     }
615 
616     partial_send = (int *)calloc(fh->f_size, sizeof(int));
617     if ( NULL == partial_send ){
618 	ret = OMPI_ERR_OUT_OF_RESOURCE;
619 	goto exit;
620     }
621 
622     send_size = (int *)malloc(fh->f_size * sizeof(int));
623     if (NULL == send_size){
624 	ret = OMPI_ERR_OUT_OF_RESOURCE;
625 	goto exit;
626     }
627 
628     recv_size = (int *)malloc(fh->f_size * sizeof(int));
629     if (NULL == recv_size){
630 	ret = OMPI_ERR_OUT_OF_RESOURCE;
631 	goto exit;
632     }
633 
634     recd_from_proc = (int *)calloc(fh->f_size,sizeof(int));
635     if (NULL == recd_from_proc){
636 	ret = OMPI_ERR_OUT_OF_RESOURCE;
637 	goto exit;
638     }
639 
640     start_pos = (int *) calloc(fh->f_size, sizeof(int));
641     if ( NULL == start_pos ){
642         ret = OMPI_ERR_OUT_OF_RESOURCE;
643         goto exit;
644     }
645 
646     done = 0;
647     off = st_loc;
648     for_curr_iter = for_next_iter = 0;
649 
650     ompi_datatype_type_extent(datatype, &buftype_extent);
651 
652     for (m=0; m<ntimes; m++) {
653 
654 	size = OMPIO_MIN((unsigned)two_phase_cycle_buffer_size, end_loc-st_loc+1-done);
655 	real_off = off - for_curr_iter;
656 	real_size = size + for_curr_iter;
657 
658 	for (i=0; i<fh->f_size; i++) count[i] = send_size[i] = 0;
659 	for_next_iter = 0;
660 
661 	for (i=0; i<fh->f_size; i++) {
662 	    if (others_req[i].count) {
663 		start_pos[i] = curr_offlen_ptr[i];
664 		for (j=curr_offlen_ptr[i]; j<others_req[i].count;
665 		     j++) {
666 		    if (partial_send[i]) {
667 			/* this request may have been partially
668 			   satisfied in the previous iteration. */
669 			req_off = others_req[i].offsets[j] +
670 			    partial_send[i];
671 			req_len = others_req[i].lens[j] -
672 			    partial_send[i];
673 			partial_send[i] = 0;
674 			/* modify the off-len pair to reflect this change */
675 			others_req[i].offsets[j] = req_off;
676 			others_req[i].lens[j] = req_len;
677 		    }
678 		    else {
679 			req_off = others_req[i].offsets[j];
680 			req_len = others_req[i].lens[j];
681 		    }
682 		    if (req_off < real_off + real_size) {
683 			count[i]++;
684 			PMPI_Address(read_buf+req_off-real_off,
685 				     &(others_req[i].mem_ptrs[j]));
686 
687 			send_size[i] += (int)(OMPIO_MIN(real_off + real_size - req_off,
688 							(OMPI_MPI_OFFSET_TYPE)req_len));
689 
690 			if (real_off+real_size-req_off < (OMPI_MPI_OFFSET_TYPE)req_len) {
691 			    partial_send[i] = (int) (real_off + real_size - req_off);
692 			    if ((j+1 < others_req[i].count) &&
693 				(others_req[i].offsets[j+1] <
694 				 real_off+real_size)) {
695 				/* this is the case illustrated in the
696 				   figure above. */
697 				for_next_iter = OMPIO_MAX(for_next_iter,
698 							  real_off + real_size - others_req[i].offsets[j+1]);
699 				/* max because it must cover requests
700 				   from different processes */
701 			    }
702 			    break;
703 			}
704 		    }
705 		    else break;
706 		}
707 		curr_offlen_ptr[i] = j;
708 	    }
709 	}
710 	flag = 0;
711 	for (i=0; i<fh->f_size; i++)
712 	    if (count[i]) flag = 1;
713 
714 	if (flag) {
715 
716 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
717 	    start_read_time = MPI_Wtime();
718 #endif
719 
720 	    len = size * byte_size;
721 	    fh->f_io_array = (mca_io_ompio_io_array_t *)calloc
722 		(1,sizeof(mca_io_ompio_io_array_t));
723 	    if (NULL == fh->f_io_array) {
724 		opal_output(1, "OUT OF MEMORY\n");
725                 ret = OMPI_ERR_OUT_OF_RESOURCE;
726                 goto exit;
727 	    }
728 	    fh->f_io_array[0].offset = (IOVBASE_TYPE *)(intptr_t)off;
729 	    fh->f_io_array[0].length = len;
730 	    fh->f_io_array[0].memory_address =
731 		read_buf+for_curr_iter;
732 	    fh->f_num_of_io_entries = 1;
733 
734 	    if (fh->f_num_of_io_entries){
735 		if ( 0 > fh->f_fbtl->fbtl_preadv (fh)) {
736 		    opal_output(1, "READ FAILED\n");
737                     ret = OMPI_ERROR;
738                     goto exit;
739 		}
740 	    }
741 
742 #if 0
743 	    int ii;
744 	    printf("%d: len/4 : %lld\n",
745 		   fh->f_rank,
746 		   len/4);
747 	    for (ii = 0; ii < len/4 ;ii++){
748 		printf("%d: read_buf[%d]: %ld\n",
749 		       fh->f_rank,
750 		       ii,
751 		       (int *)read_buf[ii]);
752 	    }
753 #endif
754 	    fh->f_num_of_io_entries = 0;
755 	    if (NULL != fh->f_io_array) {
756 		free (fh->f_io_array);
757 		fh->f_io_array = NULL;
758 	    }
759 
760 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
761 	    end_read_time = MPI_Wtime();
762 	    read_time += (end_read_time - start_read_time);
763 #endif
764 
765 
766 	}
767 
768 	for_curr_iter = for_next_iter;
769 
770 	for (i=0; i< fh->f_size; i++){
771 	    recv_size[i]  = 0;
772 	}
773 	two_phase_exchange_data(fh, buf, offset_len,
774 				send_size, start_pos, recv_size, count,
775 				partial_send, recd_from_proc,
776 				contig_access_count,
777 				min_st_offset, fd_size, fd_start, fd_end,
778 				flat_buf, others_req, m, buf_idx,
779 				buftype_extent, striping_unit, two_phase_num_io_procs,
780 				aggregator_list);
781 
782 	if (for_next_iter){
783 	    tmp_buf = (char *) calloc (for_next_iter, sizeof(char));
784 	    memcpy(tmp_buf,
785 		   read_buf+real_size-for_next_iter,
786 		   for_next_iter);
787 	    free(read_buf);
788 	    read_buf = (char *)malloc(for_next_iter+two_phase_cycle_buffer_size);
789 	    memcpy(read_buf, tmp_buf, for_next_iter);
790 	    free(tmp_buf);
791 	}
792 
793 	off += size;
794 	done += size;
795     }
796 
797     for (i=0; i<fh->f_size; i++) count[i] = send_size[i] = 0;
798     for (m=ntimes; m<max_ntimes; m++)
799 	two_phase_exchange_data(fh, buf, offset_len, send_size,
800 				start_pos, recv_size, count,
801 				partial_send, recd_from_proc,
802 				contig_access_count,
803 				min_st_offset, fd_size, fd_start, fd_end,
804 				flat_buf, others_req, m, buf_idx,
805 				buftype_extent, striping_unit, two_phase_num_io_procs,
806 				aggregator_list);
807 
808 exit:
809     free (read_buf);
810     free (curr_offlen_ptr);
811     free (count);
812     free (partial_send);
813     free (send_size);
814     free (recv_size);
815     free (recd_from_proc);
816     free (start_pos);
817 
818     return ret;
819 
820 }
821 
two_phase_exchange_data(mca_io_ompio_file_t * fh,void * buf,struct iovec * offset_len,int * send_size,int * start_pos,int * recv_size,int * count,int * partial_send,int * recd_from_proc,int contig_access_count,OMPI_MPI_OFFSET_TYPE min_st_offset,OMPI_MPI_OFFSET_TYPE fd_size,OMPI_MPI_OFFSET_TYPE * fd_start,OMPI_MPI_OFFSET_TYPE * fd_end,Flatlist_node * flat_buf,mca_io_ompio_access_array_t * others_req,int iter,size_t * buf_idx,MPI_Aint buftype_extent,int striping_unit,int two_phase_num_io_procs,int * aggregator_list)822 static int two_phase_exchange_data(mca_io_ompio_file_t *fh,
823 				   void *buf, struct iovec *offset_len,
824 				   int *send_size, int *start_pos, int *recv_size,
825 				   int *count, int *partial_send,
826 				   int *recd_from_proc, int contig_access_count,
827 				   OMPI_MPI_OFFSET_TYPE min_st_offset,
828 				   OMPI_MPI_OFFSET_TYPE fd_size,
829 				   OMPI_MPI_OFFSET_TYPE *fd_start,
830 				   OMPI_MPI_OFFSET_TYPE *fd_end,
831 				   Flatlist_node *flat_buf,
832 				   mca_io_ompio_access_array_t *others_req,
833 				   int iter, size_t *buf_idx,
834 				   MPI_Aint buftype_extent, int striping_unit,
835 				   int two_phase_num_io_procs, int *aggregator_list)
836 {
837 
838     int i=0, j=0, k=0, tmp=0, nprocs_recv=0, nprocs_send=0;
839     int ret = OMPI_SUCCESS;
840     char **recv_buf = NULL;
841     MPI_Request *requests=NULL;
842     MPI_Datatype send_type;
843 
844 
845 
846 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
847     start_rcomm_time = MPI_Wtime();
848 #endif
849 
850     ret = fh->f_comm->c_coll->coll_alltoall (send_size,
851 					    1,
852 					    MPI_INT,
853 					    recv_size,
854 					    1,
855 					    MPI_INT,
856 					    fh->f_comm,
857 					    fh->f_comm->c_coll->coll_alltoall_module);
858 
859     if ( OMPI_SUCCESS != ret ){
860 	goto exit;
861     }
862 
863 
864 #if DEBUG
865     for (i=0; i<fh->f_size; i++){
866 	printf("%d: RS[%d]: %d\n", fh->f_rank,
867 	       i,
868 	       recv_size[i]);
869     }
870 #endif
871 
872 
873     nprocs_recv = 0;
874     for (i=0; i < fh->f_size; i++)
875 	if (recv_size[i]) nprocs_recv++;
876 
877     nprocs_send = 0;
878     for (i=0; i< fh->f_size; i++)
879 	if (send_size[i]) nprocs_send++;
880 
881     requests = (MPI_Request *)
882 	malloc((nprocs_send+nprocs_recv+1) *  sizeof(MPI_Request));
883 
884     if (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY) {
885 	j = 0;
886 	for (i=0; i < fh->f_size; i++){
887 	    if (recv_size[i]){
888 		ret = MCA_PML_CALL(irecv(((char *) buf)+ buf_idx[i],
889 					 recv_size[i],
890 					 MPI_BYTE,
891 					 i,
892 					 fh->f_rank+i+100*iter,
893 					 fh->f_comm,
894 					 requests+j));
895 
896 		if ( OMPI_SUCCESS != ret ){
897 		    return ret;
898 		}
899 		j++;
900 		buf_idx[i] += recv_size[i];
901 	    }
902 	}
903     }
904     else{
905 
906 	recv_buf = (char **) calloc (fh->f_size, sizeof(char *));
907 	if (NULL == recv_buf){
908 	    ret = OMPI_ERR_OUT_OF_RESOURCE;
909 	    goto exit;
910 	}
911 
912 	for (i=0; i < fh->f_size; i++)
913 	    if(recv_size[i]) recv_buf[i] =
914 				 (char *) malloc (recv_size[i] *  sizeof(char));
915 	j = 0;
916 	for(i=0; i<fh->f_size; i++)
917 	    if (recv_size[i]) {
918 		ret = MCA_PML_CALL(irecv(recv_buf[i],
919 					 recv_size[i],
920 					 MPI_BYTE,
921 					 i,
922 					 fh->f_rank+i+100*iter,
923 					 fh->f_comm,
924 					 requests+j));
925 		j++;
926 
927 	    }
928     }
929 
930 
931 
932     j = 0;
933     for (i = 0; i< fh->f_size; i++){
934 	if (send_size[i]){
935 	    if (partial_send[i]){
936 		k = start_pos[i] + count[i] - 1;
937 		tmp = others_req[i].lens[k];
938 		others_req[i].lens[k] = partial_send[i];
939 	    }
940 
941 	    ompi_datatype_create_hindexed(count[i],
942 					  &(others_req[i].lens[start_pos[i]]),
943 					  &(others_req[i].mem_ptrs[start_pos[i]]),
944 					  MPI_BYTE,
945 					  &send_type);
946 
947 	    ompi_datatype_commit(&send_type);
948 
949 	    ret = MCA_PML_CALL(isend(MPI_BOTTOM,
950 				     1,
951 				     send_type,
952 				     i,
953 				     fh->f_rank+i+100*iter,
954 				     MCA_PML_BASE_SEND_STANDARD,
955 				     fh->f_comm,
956 				     requests+nprocs_recv+j));
957 	    ompi_datatype_destroy(&send_type);
958 
959 	    if (partial_send[i]) others_req[i].lens[k] = tmp;
960 	    j++;
961 	}
962     }
963 
964 
965     if (nprocs_recv) {
966 
967 	ret = ompi_request_wait_all(nprocs_recv,
968 				    requests,
969 				    MPI_STATUS_IGNORE);
970         if (OMPI_SUCCESS != ret) {
971             goto exit;
972         }
973 
974 	if (! (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
975 
976 	    two_phase_fill_user_buffer(fh, buf, flat_buf,
977 				       recv_buf, offset_len,
978 				       (unsigned *)recv_size, requests,
979 				       recd_from_proc, contig_access_count,
980 				       min_st_offset, fd_size, fd_start, fd_end,
981 				       buftype_extent, striping_unit, two_phase_num_io_procs,
982 				       aggregator_list);
983 	}
984     }
985 
986     ret = ompi_request_wait_all(nprocs_send,
987 				requests+nprocs_recv,
988 				MPI_STATUS_IGNORE);
989 
990 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
991     end_rcomm_time = MPI_Wtime();
992     rcomm_time += (end_rcomm_time - start_rcomm_time);
993 #endif
994 
995 exit:
996 
997     if (recv_buf) {
998 	for (i=0; i< fh->f_size; i++){
999             free(recv_buf[i]);
1000 	}
1001 
1002 	free(recv_buf);
1003     }
1004 
1005     free(requests);
1006 
1007     return ret;
1008 
1009 }
1010 
1011 
1012 #define TWO_PHASE_BUF_INCR			\
1013     {						\
1014 	while (buf_incr) {				\
1015 	    size_in_buf = OMPIO_MIN(buf_incr, flat_buf_sz);	\
1016 	    user_buf_idx += size_in_buf;			\
1017 	    flat_buf_sz -= size_in_buf;				\
1018 	    if (!flat_buf_sz) {					      \
1019 		if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1020 		else {							\
1021 		    flat_buf_idx = 0;					\
1022 		    n_buftypes++;					\
1023 		}							\
1024 		user_buf_idx = flat_buf->indices[flat_buf_idx] +	\
1025 		(OMPI_MPI_OFFSET_TYPE)n_buftypes*(OMPI_MPI_OFFSET_TYPE)buftype_extent; \
1026             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
1027         } \
1028         buf_incr -= size_in_buf; \
1029     } \
1030 }
1031 
1032 
1033 #define TWO_PHASE_BUF_COPY \
1034 { \
1035     while (size) { \
1036         size_in_buf = OMPIO_MIN(size, flat_buf_sz); \
1037 	memcpy(((char *) buf) + user_buf_idx, \
1038 	       &(recv_buf[p][recv_buf_idx[p]]), size_in_buf); \
1039 	recv_buf_idx[p] += size_in_buf; \
1040 	user_buf_idx += size_in_buf; \
1041 	flat_buf_sz -= size_in_buf; \
1042         if (!flat_buf_sz) { \
1043            if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1044             else { \
1045                 flat_buf_idx = 0; \
1046                 n_buftypes++; \
1047             } \
1048             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
1049 	      (OMPI_MPI_OFFSET_TYPE)n_buftypes*(OMPI_MPI_OFFSET_TYPE)buftype_extent; \
1050             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
1051         } \
1052         size -= size_in_buf; \
1053         buf_incr -= size_in_buf; \
1054     } \
1055     TWO_PHASE_BUF_INCR \
1056 }
1057 
1058 
1059 
two_phase_fill_user_buffer(mca_io_ompio_file_t * fh,void * buf,Flatlist_node * flat_buf,char ** recv_buf,struct iovec * offset_length,unsigned * recv_size,MPI_Request * requests,int * recd_from_proc,int contig_access_count,OMPI_MPI_OFFSET_TYPE min_st_offset,OMPI_MPI_OFFSET_TYPE fd_size,OMPI_MPI_OFFSET_TYPE * fd_start,OMPI_MPI_OFFSET_TYPE * fd_end,MPI_Aint buftype_extent,int striping_unit,int two_phase_num_io_procs,int * aggregator_list)1060 static void two_phase_fill_user_buffer(mca_io_ompio_file_t *fh,
1061 				       void *buf,
1062 				       Flatlist_node *flat_buf,
1063 				       char **recv_buf,
1064 				       struct iovec *offset_length,
1065 				       unsigned *recv_size,
1066 				       MPI_Request *requests,
1067 				       int *recd_from_proc,
1068 				       int contig_access_count,
1069 				       OMPI_MPI_OFFSET_TYPE min_st_offset,
1070 				       OMPI_MPI_OFFSET_TYPE fd_size,
1071 				       OMPI_MPI_OFFSET_TYPE *fd_start,
1072 				       OMPI_MPI_OFFSET_TYPE *fd_end,
1073 				       MPI_Aint buftype_extent,
1074 				       int striping_unit, int two_phase_num_io_procs,
1075 				       int *aggregator_list){
1076 
1077     int i = 0, p = 0, flat_buf_idx = 0;
1078     OMPI_MPI_OFFSET_TYPE flat_buf_sz = 0, size_in_buf = 0, buf_incr = 0, size = 0;
1079     int n_buftypes = 0;
1080     OMPI_MPI_OFFSET_TYPE off=0, len=0, rem_len=0, user_buf_idx=0;
1081     unsigned *curr_from_proc=NULL, *done_from_proc=NULL, *recv_buf_idx=NULL;
1082 
1083     curr_from_proc = (unsigned *) malloc (fh->f_size * sizeof(unsigned));
1084     done_from_proc = (unsigned *) malloc (fh->f_size * sizeof(unsigned));
1085     recv_buf_idx = (unsigned *) malloc (fh->f_size * sizeof(unsigned));
1086 
1087     for (i=0; i < fh->f_size; i++) {
1088 	recv_buf_idx[i] = curr_from_proc[i] = 0;
1089 	done_from_proc[i] = recd_from_proc[i];
1090     }
1091 
1092     flat_buf_idx = 0;
1093     n_buftypes = 0;
1094 
1095     if ( flat_buf->count > 0 ) {
1096 	user_buf_idx = flat_buf->indices[0];
1097 	flat_buf_sz = flat_buf->blocklens[0];
1098     }
1099 
1100     /* flat_buf_idx = current index into flattened buftype
1101        flat_buf_sz = size of current contiguous component in
1102        flattened buf */
1103 
1104     for (i=0; i<contig_access_count; i++) {
1105 
1106 	off     = (OMPI_MPI_OFFSET_TYPE)(intptr_t)offset_length[i].iov_base;
1107 	rem_len = (OMPI_MPI_OFFSET_TYPE)offset_length[i].iov_len;
1108 
1109 	/* this request may span the file domains of more than one process */
1110 	while (rem_len != 0) {
1111 	    len = rem_len;
1112 	    /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no
1113 	     * longer than the single region that processor "p" is responsible
1114 	     * for.
1115 	     */
1116 	    p = mca_fcoll_two_phase_calc_aggregator(fh,
1117 						    off,
1118 						    min_st_offset,
1119 						    &len,
1120 						    fd_size,
1121 						    fd_start,
1122 						    fd_end,
1123 						    striping_unit,
1124 						    two_phase_num_io_procs,
1125 						    aggregator_list);
1126 
1127 	    if (recv_buf_idx[p] < recv_size[p]) {
1128 		if (curr_from_proc[p]+len > done_from_proc[p]) {
1129 		    if (done_from_proc[p] > curr_from_proc[p]) {
1130 			size = OMPIO_MIN(curr_from_proc[p] + len -
1131 					 done_from_proc[p], recv_size[p]-recv_buf_idx[p]);
1132 			buf_incr = done_from_proc[p] - curr_from_proc[p];
1133 			TWO_PHASE_BUF_INCR
1134 			    buf_incr = curr_from_proc[p]+len-done_from_proc[p];
1135 			curr_from_proc[p] = done_from_proc[p] + size;
1136 			TWO_PHASE_BUF_COPY
1137 			    }
1138 		    else {
1139 			size = OMPIO_MIN(len,recv_size[p]-recv_buf_idx[p]);
1140 			buf_incr = len;
1141 			curr_from_proc[p] += (unsigned) size;
1142 			TWO_PHASE_BUF_COPY
1143 			    }
1144 		}
1145 		else {
1146 		    curr_from_proc[p] += (unsigned) len;
1147 		    buf_incr = len;
1148 		    TWO_PHASE_BUF_INCR
1149 			}
1150 	    }
1151 	    else {
1152 		buf_incr = len;
1153 		TWO_PHASE_BUF_INCR
1154 		    }
1155 	    off += len;
1156 	    rem_len -= len;
1157 	}
1158     }
1159     for (i=0; i < fh->f_size; i++)
1160 	if (recv_size[i]) recd_from_proc[i] = curr_from_proc[i];
1161 
1162     free(curr_from_proc);
1163     free(done_from_proc);
1164     free(recv_buf_idx);
1165 
1166 }
1167 
1168 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
isread_aggregator(int rank,int nprocs_for_coll,int * aggregator_list)1169 int isread_aggregator(int rank,
1170 		      int nprocs_for_coll,
1171 		      int *aggregator_list){
1172 
1173     int i=0;
1174     for (i=0; i<nprocs_for_coll; i++){
1175 	if (aggregator_list[i] == rank)
1176 	    return 1;
1177     }
1178     return 0;
1179 }
1180 #endif
1181