1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
4 * University Research and Technology
5 * Corporation. All rights reserved.
6 * Copyright (c) 2004-2017 The University of Tennessee and The University
7 * of Tennessee Research Foundation. All rights
8 * reserved.
9 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
10 * University of Stuttgart. All rights reserved.
11 * Copyright (c) 2004-2005 The Regents of the University of California.
12 * All rights reserved.
13 * Copyright (c) 2008-2014 University of Houston. All rights reserved.
14 * Copyright (c) 2015 Cisco Systems, Inc. All rights reserved.
15 * Copyright (c) 2015 Los Alamos National Security, LLC. All rights
16 * reserved.
17 * Copyright (c) 2017-2018 Research Organization for Information Science
18 * and Technology (RIST). All rights reserved.
19 * $COPYRIGHT$
20 *
21 * Additional copyrights may follow
22 *
23 * $HEADER$
24 */
25
26 #include "ompi_config.h"
27 #include "fcoll_two_phase.h"
28 #include "mpi.h"
29 #include "ompi/constants.h"
30 #include "ompi/communicator/communicator.h"
31 #include "ompi/mca/fcoll/fcoll.h"
32 #include "ompi/mca/common/ompio/common_ompio.h"
33 #include "ompi/mca/io/io.h"
34 #include "opal/mca/base/base.h"
35 #include "math.h"
36 #include "ompi/mca/pml/pml.h"
37 #include <unistd.h>
38
39 #define DEBUG 0
40
41 /* Two Phase implementation from ROMIO ported to OMPIO infrastructure
42 * This is pretty much the same as ROMIO's two_phase and based on ROMIO's code
43 * base
44 */
45
46
47 /* Datastructure to support specifying the flat-list. */
48 typedef struct flat_list_node {
49 MPI_Datatype type;
50 int count;
51 OMPI_MPI_OFFSET_TYPE *blocklens;
52 OMPI_MPI_OFFSET_TYPE *indices;
53 struct flat_list_node *next;
54 }Flatlist_node;
55
56 /* local function declarations */
57 static int two_phase_read_and_exch(ompio_file_t *fh,
58 void *buf,
59 MPI_Datatype datatype,
60 mca_common_ompio_access_array_t *others_req,
61 struct iovec *offset_len,
62 int contig_access_count,
63 OMPI_MPI_OFFSET_TYPE min_st_offset,
64 OMPI_MPI_OFFSET_TYPE fd_size,
65 OMPI_MPI_OFFSET_TYPE *fd_start,
66 OMPI_MPI_OFFSET_TYPE *fd_end,
67 Flatlist_node *flat_buf,
68 size_t *buf_idx, int striping_unit,
69 int num_io_procs, int *aggregator_list);
70
71 static int two_phase_exchange_data(ompio_file_t *fh,
72 void *buf,
73 struct iovec *offset_length,
74 int *send_size, int *start_pos,
75 int *recv_size,
76 int *count,
77 int *partial_send, int *recd_from_proc,
78 int contig_access_count,
79 OMPI_MPI_OFFSET_TYPE min_st_offset,
80 OMPI_MPI_OFFSET_TYPE fd_size,
81 OMPI_MPI_OFFSET_TYPE *fd_start,
82 OMPI_MPI_OFFSET_TYPE *fd_end,
83 Flatlist_node *flat_buf,
84 mca_common_ompio_access_array_t *others_req,
85 int iter,
86 size_t *buf_idx, MPI_Aint buftype_extent,
87 int striping_unit, int num_io_procs,
88 int *aggregator_list);
89
90
91 static void two_phase_fill_user_buffer(ompio_file_t *fh,
92 void *buf,
93 Flatlist_node *flat_buf,
94 char **recv_buf,
95 struct iovec *offset_length,
96 unsigned *recv_size,
97 MPI_Request *requests,
98 int *recd_from_proc,
99 int contig_access_count,
100 OMPI_MPI_OFFSET_TYPE min_st_offset,
101 OMPI_MPI_OFFSET_TYPE fd_size,
102 OMPI_MPI_OFFSET_TYPE *fd_start,
103 OMPI_MPI_OFFSET_TYPE *fd_end,
104 MPI_Aint buftype_extent,
105 int striping_unit,
106 int num_io_procs, int *aggregator_list);
107 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
108 static int isread_aggregator(int rank,
109 int nprocs_for_coll,
110 int *aggregator_list);
111
112 #endif
113
114 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
115 double read_time = 0.0, start_read_time = 0.0, end_read_time = 0.0;
116 double rcomm_time = 0.0, start_rcomm_time = 0.0, end_rcomm_time = 0.0;
117 double read_exch = 0.0, start_rexch = 0.0, end_rexch = 0.0;
118 #endif
119
120
121 int
mca_fcoll_two_phase_file_read_all(ompio_file_t * fh,void * buf,int count,struct ompi_datatype_t * datatype,ompi_status_public_t * status)122 mca_fcoll_two_phase_file_read_all (ompio_file_t *fh,
123 void *buf,
124 int count,
125 struct ompi_datatype_t *datatype,
126 ompi_status_public_t *status)
127 {
128
129 int ret = OMPI_SUCCESS, i = 0, j = 0, interleave_count = 0, striping_unit = 0;
130 MPI_Aint recv_buf_addr = 0;
131 uint32_t iov_count = 0, ti = 0;
132 struct iovec *decoded_iov = NULL, *temp_iov = NULL, *iov = NULL;
133 size_t max_data = 0;
134 long long_max_data = 0, long_total_bytes = 0;
135 int domain_size=0, *count_my_req_per_proc=NULL, count_my_req_procs = 0;
136 int count_other_req_procs;
137 size_t *buf_indices=NULL;
138 int *aggregator_list = NULL, local_count = 0, local_size = 0;
139 int two_phase_num_io_procs=1;
140 OMPI_MPI_OFFSET_TYPE start_offset = 0, end_offset = 0, fd_size = 0;
141 OMPI_MPI_OFFSET_TYPE *start_offsets=NULL, *end_offsets=NULL;
142 OMPI_MPI_OFFSET_TYPE *fd_start=NULL, *fd_end=NULL, min_st_offset = 0;
143 Flatlist_node *flat_buf=NULL;
144 mca_common_ompio_access_array_t *my_req=NULL, *others_req=NULL;
145 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
146 mca_common_ompio_print_entry nentry;
147 #endif
148 // if (opal_datatype_is_predefined(&datatype->super)) {
149 // fh->f_flags = fh->f_flags | OMPIO_CONTIGUOUS_MEMORY;
150 // }
151
152 if (! (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
153 ret = mca_common_ompio_decode_datatype ((struct ompio_file_t *)fh,
154 datatype,
155 count,
156 buf,
157 &max_data,
158 fh->f_mem_convertor,
159 &temp_iov,
160 &iov_count);
161 if (OMPI_SUCCESS != ret ){
162 goto exit;
163 }
164
165 recv_buf_addr = (size_t)(buf);
166 decoded_iov = (struct iovec *) calloc
167 (iov_count, sizeof(struct iovec));
168
169 for (ti = 0; ti < iov_count; ti++){
170
171 decoded_iov[ti].iov_base = (IOVBASE_TYPE *)
172 ((ptrdiff_t)temp_iov[ti].iov_base - recv_buf_addr);
173 decoded_iov[ti].iov_len = temp_iov[ti].iov_len;
174 #if DEBUG
175 printf("d_offset[%d]: %ld, d_len[%d]: %ld\n",
176 ti, (ptrdiff_t)decoded_iov[ti].iov_base,
177 ti, decoded_iov[ti].iov_len);
178 #endif
179 }
180
181 }
182 else{
183 max_data = count * datatype->super.size;
184 }
185
186 if ( MPI_STATUS_IGNORE != status ) {
187 status->_ucount = max_data;
188 }
189
190 two_phase_num_io_procs = fh->f_get_mca_parameter_value ( "num_aggregators", strlen ("num_aggregators"));
191 if ( OMPI_ERR_MAX == two_phase_num_io_procs ) {
192 ret = OMPI_ERROR;
193 goto exit;
194 }
195 if (-1 == two_phase_num_io_procs ){
196 ret = mca_common_ompio_set_aggregator_props ((struct ompio_file_t *)fh,
197 two_phase_num_io_procs,
198 max_data);
199 if (OMPI_SUCCESS != ret){
200 goto exit;
201 }
202
203 two_phase_num_io_procs = fh->f_num_aggrs;
204
205 }
206
207 if (two_phase_num_io_procs > fh->f_size){
208 two_phase_num_io_procs = fh->f_size;
209 }
210
211 aggregator_list = (int *) calloc (two_phase_num_io_procs, sizeof(int));
212 if (NULL == aggregator_list){
213 ret = OMPI_ERR_OUT_OF_RESOURCE;
214 goto exit;
215 }
216
217 if ( OMPI_COMM_IS_MAPBY_NODE (&ompi_mpi_comm_world.comm) ) {
218 for (i =0; i< two_phase_num_io_procs; i++){
219 aggregator_list[i] = i;
220 }
221 }
222 else {
223 for (i =0; i< two_phase_num_io_procs; i++){
224 aggregator_list[i] = i * fh->f_size / two_phase_num_io_procs;
225 }
226 }
227
228 ret = fh->f_generate_current_file_view ((struct ompio_file_t *)fh,
229 max_data,
230 &iov,
231 &local_count);
232
233 if (OMPI_SUCCESS != ret){
234 goto exit;
235 }
236
237 long_max_data = (long) max_data;
238 ret = fh->f_comm->c_coll->coll_allreduce (&long_max_data,
239 &long_total_bytes,
240 1,
241 MPI_LONG,
242 MPI_SUM,
243 fh->f_comm,
244 fh->f_comm->c_coll->coll_allreduce_module);
245
246 if ( OMPI_SUCCESS != ret ) {
247 goto exit;
248 }
249
250 if (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
251
252 /* This datastructre translates between OMPIO->ROMIO its a little hacky!*/
253 /* But helps to re-use romio's code for handling non-contiguous file-type*/
254 /*Flattened datatype for ompio is in decoded_iov it translated into
255 flatbuf*/
256
257 flat_buf = (Flatlist_node *)calloc(1, sizeof(Flatlist_node));
258 if ( NULL == flat_buf ){
259 ret = OMPI_ERR_OUT_OF_RESOURCE;
260 goto exit;
261 }
262
263 flat_buf->type = datatype;
264 flat_buf->next = NULL;
265 flat_buf->count = 0;
266 flat_buf->indices = NULL;
267 flat_buf->blocklens = NULL;
268
269 if ( 0 < count ) {
270 local_size = OMPIO_MAX(1,iov_count/count);
271 }
272 else {
273 local_size = 0;
274 }
275
276
277 if ( 0 < local_size ) {
278 flat_buf->indices =
279 (OMPI_MPI_OFFSET_TYPE *)calloc(local_size,
280 sizeof(OMPI_MPI_OFFSET_TYPE));
281 if (NULL == flat_buf->indices){
282 ret = OMPI_ERR_OUT_OF_RESOURCE;
283 goto exit;
284 }
285
286 flat_buf->blocklens =
287 (OMPI_MPI_OFFSET_TYPE *)calloc(local_size,
288 sizeof(OMPI_MPI_OFFSET_TYPE));
289 if ( NULL == flat_buf->blocklens ){
290 ret = OMPI_ERR_OUT_OF_RESOURCE;
291 goto exit;
292 }
293 }
294 flat_buf->count = local_size;
295 for (j = 0 ; j < local_size ; ++j) {
296 flat_buf->indices[j] = (OMPI_MPI_OFFSET_TYPE)(intptr_t)decoded_iov[j].iov_base;
297 flat_buf->blocklens[j] = decoded_iov[j].iov_len;
298 }
299
300 #if DEBUG
301 printf("flat_buf count: %d\n",
302 flat_buf->count);
303 for(i=0;i<flat_buf->count;i++){
304 printf("%d: blocklen[%d] : %lld, indices[%d]: %lld\n",
305 fh->f_rank, i, flat_buf->blocklens[i], i ,flat_buf->indices[i]);
306 }
307 #endif
308 }
309
310 #if DEBUG
311 printf("%d: total_bytes:%ld, local_count: %d\n",
312 fh->f_rank, long_total_bytes, local_count);
313 for (i=0 ; i<local_count ; i++) {
314 printf("%d: fcoll:two_phase:read_all:OFFSET:%ld,LENGTH:%ld\n",
315 fh->f_rank,
316 (size_t)iov[i].iov_base,
317 (size_t)iov[i].iov_len);
318 }
319 #endif
320
321 start_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)iov[0].iov_base;
322 if ( 0 < local_count ) {
323 end_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)iov[local_count-1].iov_base +
324 (OMPI_MPI_OFFSET_TYPE)(intptr_t)iov[local_count-1].iov_len - 1;
325 }
326 else {
327 end_offset = 0;
328 }
329 #if DEBUG
330 printf("%d: START OFFSET:%ld, END OFFSET:%ld\n",
331 fh->f_rank,
332 (size_t)start_offset,
333 (size_t)end_offset);
334 #endif
335
336 start_offsets = (OMPI_MPI_OFFSET_TYPE *)calloc
337 (fh->f_size, sizeof(OMPI_MPI_OFFSET_TYPE));
338
339 if ( NULL == start_offsets ){
340 ret = OMPI_ERR_OUT_OF_RESOURCE;
341 goto exit;
342 }
343
344 end_offsets = (OMPI_MPI_OFFSET_TYPE *)calloc
345 (fh->f_size, sizeof(OMPI_MPI_OFFSET_TYPE));
346
347 if (NULL == end_offsets){
348 ret = OMPI_ERR_OUT_OF_RESOURCE;
349 goto exit;
350 }
351
352 ret = fh->f_comm->c_coll->coll_allgather(&start_offset,
353 1,
354 OMPI_OFFSET_DATATYPE,
355 start_offsets,
356 1,
357 OMPI_OFFSET_DATATYPE,
358 fh->f_comm,
359 fh->f_comm->c_coll->coll_allgather_module);
360
361 if ( OMPI_SUCCESS != ret ){
362 goto exit;
363 }
364
365 ret = fh->f_comm->c_coll->coll_allgather(&end_offset,
366 1,
367 OMPI_OFFSET_DATATYPE,
368 end_offsets,
369 1,
370 OMPI_OFFSET_DATATYPE,
371 fh->f_comm,
372 fh->f_comm->c_coll->coll_allgather_module);
373
374
375 if ( OMPI_SUCCESS != ret ){
376 goto exit;
377 }
378
379 #if DEBUG
380 for (i=0;i<fh->f_size;i++){
381 printf("%d: start[%d]:%ld,end[%d]:%ld\n",
382 fh->f_rank,i,
383 (size_t)start_offsets[i],i,
384 (size_t)end_offsets[i]);
385 }
386 #endif
387
388 for (i=1; i<fh->f_size; i++){
389 if ((start_offsets[i] < end_offsets[i-1]) &&
390 (start_offsets[i] <= end_offsets[i])){
391 interleave_count++;
392 }
393 }
394
395 #if DEBUG
396 printf("%d: interleave_count:%d\n",
397 fh->f_rank,interleave_count);
398 #endif
399
400 ret = mca_fcoll_two_phase_domain_partition(fh,
401 start_offsets,
402 end_offsets,
403 &min_st_offset,
404 &fd_start,
405 &fd_end,
406 domain_size,
407 &fd_size,
408 striping_unit,
409 two_phase_num_io_procs);
410 if (OMPI_SUCCESS != ret){
411 goto exit;
412 }
413
414 #if DEBUG
415 for (i=0;i<two_phase_num_io_procs;i++){
416 printf("fd_start[%d] : %lld, fd_end[%d] : %lld, local_count: %d\n",
417 i, fd_start[i], i, fd_end[i], local_count);
418 }
419 #endif
420
421 ret = mca_fcoll_two_phase_calc_my_requests (fh,
422 iov,
423 local_count,
424 min_st_offset,
425 fd_start,
426 fd_end,
427 fd_size,
428 &count_my_req_procs,
429 &count_my_req_per_proc,
430 &my_req,
431 &buf_indices,
432 striping_unit,
433 two_phase_num_io_procs,
434 aggregator_list);
435 if ( OMPI_SUCCESS != ret ){
436 goto exit;
437 }
438
439 ret = mca_fcoll_two_phase_calc_others_requests(fh,
440 count_my_req_procs,
441 count_my_req_per_proc,
442 my_req,
443 &count_other_req_procs,
444 &others_req);
445 if (OMPI_SUCCESS != ret ){
446 goto exit;
447 }
448
449 #if DEBUG
450 printf("%d count_other_req_procs : %d\n",
451 fh->f_rank,
452 count_other_req_procs);
453 #endif
454
455 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
456 start_rexch = MPI_Wtime();
457 #endif
458
459
460 ret = two_phase_read_and_exch(fh,
461 buf,
462 datatype,
463 others_req,
464 iov,
465 local_count,
466 min_st_offset,
467 fd_size,
468 fd_start,
469 fd_end,
470 flat_buf,
471 buf_indices,
472 striping_unit,
473 two_phase_num_io_procs,
474 aggregator_list);
475
476
477 if (OMPI_SUCCESS != ret){
478 goto exit;
479 }
480 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
481 end_rexch = MPI_Wtime();
482 read_exch += (end_rexch - start_rexch);
483 nentry.time[0] = read_time;
484 nentry.time[1] = rcomm_time;
485 nentry.time[2] = read_exch;
486 if (isread_aggregator(fh->f_rank,
487 two_phase_num_io_procs,
488 aggregator_list)){
489 nentry.aggregator = 1;
490 }
491 else{
492 nentry.aggregator = 0;
493 }
494 nentry.nprocs_for_coll = two_phase_num_io_procs;
495
496
497 if (!mca_common_ompio_full_print_queue(fh->f_coll_read_time)){
498 mca_common_ompio_register_print_entry(fh->f_coll_read_time,
499 nentry);
500 }
501 #endif
502
503
504 exit:
505 if (flat_buf != NULL){
506 if (flat_buf->blocklens != NULL){
507 free (flat_buf->blocklens);
508 }
509 if (flat_buf->indices != NULL){
510 free (flat_buf->indices);
511 }
512 free (flat_buf);
513 }
514
515 free (start_offsets);
516 free (end_offsets);
517 free (aggregator_list);
518 free (fd_start);
519 free (decoded_iov);
520 free (buf_indices);
521 free (count_my_req_per_proc);
522 free (my_req);
523 free (others_req);
524 free (fd_end);
525
526 return ret;
527 }
528
529
530
531
two_phase_read_and_exch(ompio_file_t * fh,void * buf,MPI_Datatype datatype,mca_common_ompio_access_array_t * others_req,struct iovec * offset_len,int contig_access_count,OMPI_MPI_OFFSET_TYPE min_st_offset,OMPI_MPI_OFFSET_TYPE fd_size,OMPI_MPI_OFFSET_TYPE * fd_start,OMPI_MPI_OFFSET_TYPE * fd_end,Flatlist_node * flat_buf,size_t * buf_idx,int striping_unit,int two_phase_num_io_procs,int * aggregator_list)532 static int two_phase_read_and_exch(ompio_file_t *fh,
533 void *buf,
534 MPI_Datatype datatype,
535 mca_common_ompio_access_array_t *others_req,
536 struct iovec *offset_len,
537 int contig_access_count,
538 OMPI_MPI_OFFSET_TYPE min_st_offset,
539 OMPI_MPI_OFFSET_TYPE fd_size,
540 OMPI_MPI_OFFSET_TYPE *fd_start,
541 OMPI_MPI_OFFSET_TYPE *fd_end,
542 Flatlist_node *flat_buf,
543 size_t *buf_idx, int striping_unit,
544 int two_phase_num_io_procs,
545 int *aggregator_list){
546
547
548 int ret=OMPI_SUCCESS, i = 0, j = 0, ntimes = 0, max_ntimes = 0;
549 int m = 0;
550 int *curr_offlen_ptr=NULL, *count=NULL, *send_size=NULL, *recv_size=NULL;
551 int *partial_send=NULL, *start_pos=NULL, req_len=0, flag=0;
552 int *recd_from_proc=NULL;
553 MPI_Aint buftype_extent=0;
554 size_t byte_size = 0;
555 OMPI_MPI_OFFSET_TYPE st_loc=-1, end_loc=-1, off=0, done=0, for_next_iter=0;
556 OMPI_MPI_OFFSET_TYPE size=0, req_off=0, real_size=0, real_off=0, len=0;
557 OMPI_MPI_OFFSET_TYPE for_curr_iter=0;
558 char *read_buf=NULL, *tmp_buf=NULL;
559 MPI_Datatype byte = MPI_BYTE;
560 int two_phase_cycle_buffer_size=0;
561
562 opal_datatype_type_size(&byte->super,
563 &byte_size);
564
565 for (i = 0; i < fh->f_size; i++){
566 if (others_req[i].count) {
567 st_loc = others_req[i].offsets[0];
568 end_loc = others_req[i].offsets[0];
569 break;
570 }
571 }
572
573 for (i=0;i<fh->f_size;i++){
574 for(j=0;j< others_req[i].count; j++){
575 st_loc =
576 OMPIO_MIN(st_loc, others_req[i].offsets[j]);
577 end_loc =
578 OMPIO_MAX(end_loc, (others_req[i].offsets[j] +
579 others_req[i].lens[j] - 1));
580 }
581 }
582
583 two_phase_cycle_buffer_size = fh->f_bytes_per_agg;
584 ntimes = (int)((end_loc - st_loc + two_phase_cycle_buffer_size)/
585 two_phase_cycle_buffer_size);
586
587 if ((st_loc == -1) && (end_loc == -1)){
588 ntimes = 0;
589 }
590
591 fh->f_comm->c_coll->coll_allreduce (&ntimes,
592 &max_ntimes,
593 1,
594 MPI_INT,
595 MPI_MAX,
596 fh->f_comm,
597 fh->f_comm->c_coll->coll_allreduce_module);
598
599 if (ntimes){
600 read_buf = (char *) calloc (two_phase_cycle_buffer_size, sizeof(char));
601 if ( NULL == read_buf ){
602 ret = OMPI_ERR_OUT_OF_RESOURCE;
603 goto exit;
604 }
605 }
606
607 curr_offlen_ptr = (int *)calloc (fh->f_size,
608 sizeof(int));
609 if (NULL == curr_offlen_ptr){
610 ret = OMPI_ERR_OUT_OF_RESOURCE;
611 goto exit;
612 }
613
614 count = (int *)calloc (fh->f_size,
615 sizeof(int));
616 if (NULL == count){
617 ret = OMPI_ERR_OUT_OF_RESOURCE;
618 goto exit;
619 }
620
621 partial_send = (int *)calloc(fh->f_size, sizeof(int));
622 if ( NULL == partial_send ){
623 ret = OMPI_ERR_OUT_OF_RESOURCE;
624 goto exit;
625 }
626
627 send_size = (int *)malloc(fh->f_size * sizeof(int));
628 if (NULL == send_size){
629 ret = OMPI_ERR_OUT_OF_RESOURCE;
630 goto exit;
631 }
632
633 recv_size = (int *)malloc(fh->f_size * sizeof(int));
634 if (NULL == recv_size){
635 ret = OMPI_ERR_OUT_OF_RESOURCE;
636 goto exit;
637 }
638
639 recd_from_proc = (int *)calloc(fh->f_size,sizeof(int));
640 if (NULL == recd_from_proc){
641 ret = OMPI_ERR_OUT_OF_RESOURCE;
642 goto exit;
643 }
644
645 start_pos = (int *) calloc(fh->f_size, sizeof(int));
646 if ( NULL == start_pos ){
647 ret = OMPI_ERR_OUT_OF_RESOURCE;
648 goto exit;
649 }
650
651 done = 0;
652 off = st_loc;
653 for_curr_iter = for_next_iter = 0;
654
655 ompi_datatype_type_extent(datatype, &buftype_extent);
656
657 for (m=0; m<ntimes; m++) {
658
659 size = OMPIO_MIN((unsigned)two_phase_cycle_buffer_size, end_loc-st_loc+1-done);
660 real_off = off - for_curr_iter;
661 real_size = size + for_curr_iter;
662
663 for (i=0; i<fh->f_size; i++) count[i] = send_size[i] = 0;
664 for_next_iter = 0;
665
666 for (i=0; i<fh->f_size; i++) {
667 if (others_req[i].count) {
668 start_pos[i] = curr_offlen_ptr[i];
669 for (j=curr_offlen_ptr[i]; j<others_req[i].count;
670 j++) {
671 if (partial_send[i]) {
672 /* this request may have been partially
673 satisfied in the previous iteration. */
674 req_off = others_req[i].offsets[j] +
675 partial_send[i];
676 req_len = others_req[i].lens[j] -
677 partial_send[i];
678 partial_send[i] = 0;
679 /* modify the off-len pair to reflect this change */
680 others_req[i].offsets[j] = req_off;
681 others_req[i].lens[j] = req_len;
682 }
683 else {
684 req_off = others_req[i].offsets[j];
685 req_len = others_req[i].lens[j];
686 }
687 if (req_off < real_off + real_size) {
688 count[i]++;
689 PMPI_Get_address(read_buf+req_off-real_off,
690 &(others_req[i].mem_ptrs[j]));
691
692 send_size[i] += (int)(OMPIO_MIN(real_off + real_size - req_off,
693 (OMPI_MPI_OFFSET_TYPE)req_len));
694
695 if (real_off+real_size-req_off < (OMPI_MPI_OFFSET_TYPE)req_len) {
696 partial_send[i] = (int) (real_off + real_size - req_off);
697 if ((j+1 < others_req[i].count) &&
698 (others_req[i].offsets[j+1] <
699 real_off+real_size)) {
700 /* this is the case illustrated in the
701 figure above. */
702 for_next_iter = OMPIO_MAX(for_next_iter,
703 real_off + real_size - others_req[i].offsets[j+1]);
704 /* max because it must cover requests
705 from different processes */
706 }
707 break;
708 }
709 }
710 else break;
711 }
712 curr_offlen_ptr[i] = j;
713 }
714 }
715 flag = 0;
716 for (i=0; i<fh->f_size; i++)
717 if (count[i]) flag = 1;
718
719 if (flag) {
720
721 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
722 start_read_time = MPI_Wtime();
723 #endif
724
725 len = size * byte_size;
726 fh->f_io_array = (mca_common_ompio_io_array_t *)calloc
727 (1,sizeof(mca_common_ompio_io_array_t));
728 if (NULL == fh->f_io_array) {
729 opal_output(1, "OUT OF MEMORY\n");
730 ret = OMPI_ERR_OUT_OF_RESOURCE;
731 goto exit;
732 }
733 fh->f_io_array[0].offset = (IOVBASE_TYPE *)(intptr_t)off;
734 fh->f_io_array[0].length = len;
735 fh->f_io_array[0].memory_address =
736 read_buf+for_curr_iter;
737 fh->f_num_of_io_entries = 1;
738
739 if (fh->f_num_of_io_entries){
740 if ( 0 > fh->f_fbtl->fbtl_preadv (fh)) {
741 opal_output(1, "READ FAILED\n");
742 ret = OMPI_ERROR;
743 goto exit;
744 }
745 }
746
747 #if 0
748 int ii;
749 printf("%d: len/4 : %lld\n",
750 fh->f_rank,
751 len/4);
752 for (ii = 0; ii < len/4 ;ii++){
753 printf("%d: read_buf[%d]: %ld\n",
754 fh->f_rank,
755 ii,
756 (int *)read_buf[ii]);
757 }
758 #endif
759 fh->f_num_of_io_entries = 0;
760 if (NULL != fh->f_io_array) {
761 free (fh->f_io_array);
762 fh->f_io_array = NULL;
763 }
764
765 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
766 end_read_time = MPI_Wtime();
767 read_time += (end_read_time - start_read_time);
768 #endif
769
770
771 }
772
773 for_curr_iter = for_next_iter;
774
775 for (i=0; i< fh->f_size; i++){
776 recv_size[i] = 0;
777 }
778 two_phase_exchange_data(fh, buf, offset_len,
779 send_size, start_pos, recv_size, count,
780 partial_send, recd_from_proc,
781 contig_access_count,
782 min_st_offset, fd_size, fd_start, fd_end,
783 flat_buf, others_req, m, buf_idx,
784 buftype_extent, striping_unit, two_phase_num_io_procs,
785 aggregator_list);
786
787 if (for_next_iter){
788 tmp_buf = (char *) calloc (for_next_iter, sizeof(char));
789 memcpy(tmp_buf,
790 read_buf+real_size-for_next_iter,
791 for_next_iter);
792 free(read_buf);
793 read_buf = (char *)malloc(for_next_iter+two_phase_cycle_buffer_size);
794 memcpy(read_buf, tmp_buf, for_next_iter);
795 free(tmp_buf);
796 }
797
798 off += size;
799 done += size;
800 }
801
802 for (i=0; i<fh->f_size; i++) count[i] = send_size[i] = 0;
803 for (m=ntimes; m<max_ntimes; m++)
804 two_phase_exchange_data(fh, buf, offset_len, send_size,
805 start_pos, recv_size, count,
806 partial_send, recd_from_proc,
807 contig_access_count,
808 min_st_offset, fd_size, fd_start, fd_end,
809 flat_buf, others_req, m, buf_idx,
810 buftype_extent, striping_unit, two_phase_num_io_procs,
811 aggregator_list);
812
813 exit:
814 free (read_buf);
815 free (curr_offlen_ptr);
816 free (count);
817 free (partial_send);
818 free (send_size);
819 free (recv_size);
820 free (recd_from_proc);
821 free (start_pos);
822
823 return ret;
824
825 }
826
two_phase_exchange_data(ompio_file_t * fh,void * buf,struct iovec * offset_len,int * send_size,int * start_pos,int * recv_size,int * count,int * partial_send,int * recd_from_proc,int contig_access_count,OMPI_MPI_OFFSET_TYPE min_st_offset,OMPI_MPI_OFFSET_TYPE fd_size,OMPI_MPI_OFFSET_TYPE * fd_start,OMPI_MPI_OFFSET_TYPE * fd_end,Flatlist_node * flat_buf,mca_common_ompio_access_array_t * others_req,int iter,size_t * buf_idx,MPI_Aint buftype_extent,int striping_unit,int two_phase_num_io_procs,int * aggregator_list)827 static int two_phase_exchange_data(ompio_file_t *fh,
828 void *buf, struct iovec *offset_len,
829 int *send_size, int *start_pos, int *recv_size,
830 int *count, int *partial_send,
831 int *recd_from_proc, int contig_access_count,
832 OMPI_MPI_OFFSET_TYPE min_st_offset,
833 OMPI_MPI_OFFSET_TYPE fd_size,
834 OMPI_MPI_OFFSET_TYPE *fd_start,
835 OMPI_MPI_OFFSET_TYPE *fd_end,
836 Flatlist_node *flat_buf,
837 mca_common_ompio_access_array_t *others_req,
838 int iter, size_t *buf_idx,
839 MPI_Aint buftype_extent, int striping_unit,
840 int two_phase_num_io_procs, int *aggregator_list)
841 {
842
843 int i=0, j=0, k=0, tmp=0, nprocs_recv=0, nprocs_send=0;
844 int ret = OMPI_SUCCESS;
845 char **recv_buf = NULL;
846 MPI_Request *requests=NULL;
847 MPI_Datatype send_type;
848
849
850
851 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
852 start_rcomm_time = MPI_Wtime();
853 #endif
854
855 ret = fh->f_comm->c_coll->coll_alltoall (send_size,
856 1,
857 MPI_INT,
858 recv_size,
859 1,
860 MPI_INT,
861 fh->f_comm,
862 fh->f_comm->c_coll->coll_alltoall_module);
863
864 if ( OMPI_SUCCESS != ret ){
865 goto exit;
866 }
867
868
869 #if DEBUG
870 for (i=0; i<fh->f_size; i++){
871 printf("%d: RS[%d]: %d\n", fh->f_rank,
872 i,
873 recv_size[i]);
874 }
875 #endif
876
877
878 nprocs_recv = 0;
879 for (i=0; i < fh->f_size; i++)
880 if (recv_size[i]) nprocs_recv++;
881
882 nprocs_send = 0;
883 for (i=0; i< fh->f_size; i++)
884 if (send_size[i]) nprocs_send++;
885
886 requests = (MPI_Request *)
887 malloc((nprocs_send+nprocs_recv+1) * sizeof(MPI_Request));
888
889 if (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY) {
890 j = 0;
891 for (i=0; i < fh->f_size; i++){
892 if (recv_size[i]){
893 ret = MCA_PML_CALL(irecv(((char *) buf)+ buf_idx[i],
894 recv_size[i],
895 MPI_BYTE,
896 i,
897 fh->f_rank+i+100*iter,
898 fh->f_comm,
899 requests+j));
900
901 if ( OMPI_SUCCESS != ret ){
902 return ret;
903 }
904 j++;
905 buf_idx[i] += recv_size[i];
906 }
907 }
908 }
909 else{
910
911 recv_buf = (char **) calloc (fh->f_size, sizeof(char *));
912 if (NULL == recv_buf){
913 ret = OMPI_ERR_OUT_OF_RESOURCE;
914 goto exit;
915 }
916
917 for (i=0; i < fh->f_size; i++)
918 if(recv_size[i]) recv_buf[i] =
919 (char *) malloc (recv_size[i] * sizeof(char));
920 j = 0;
921 for(i=0; i<fh->f_size; i++)
922 if (recv_size[i]) {
923 ret = MCA_PML_CALL(irecv(recv_buf[i],
924 recv_size[i],
925 MPI_BYTE,
926 i,
927 fh->f_rank+i+100*iter,
928 fh->f_comm,
929 requests+j));
930 j++;
931
932 }
933 }
934
935
936
937 j = 0;
938 for (i = 0; i< fh->f_size; i++){
939 if (send_size[i]){
940 if (partial_send[i]){
941 k = start_pos[i] + count[i] - 1;
942 tmp = others_req[i].lens[k];
943 others_req[i].lens[k] = partial_send[i];
944 }
945
946 ompi_datatype_create_hindexed(count[i],
947 &(others_req[i].lens[start_pos[i]]),
948 &(others_req[i].mem_ptrs[start_pos[i]]),
949 MPI_BYTE,
950 &send_type);
951
952 ompi_datatype_commit(&send_type);
953
954 ret = MCA_PML_CALL(isend(MPI_BOTTOM,
955 1,
956 send_type,
957 i,
958 fh->f_rank+i+100*iter,
959 MCA_PML_BASE_SEND_STANDARD,
960 fh->f_comm,
961 requests+nprocs_recv+j));
962 ompi_datatype_destroy(&send_type);
963
964 if (partial_send[i]) others_req[i].lens[k] = tmp;
965 j++;
966 }
967 }
968
969
970 if (nprocs_recv) {
971
972 ret = ompi_request_wait_all(nprocs_recv,
973 requests,
974 MPI_STATUS_IGNORE);
975 if (OMPI_SUCCESS != ret) {
976 goto exit;
977 }
978
979 if (! (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
980
981 two_phase_fill_user_buffer(fh, buf, flat_buf,
982 recv_buf, offset_len,
983 (unsigned *)recv_size, requests,
984 recd_from_proc, contig_access_count,
985 min_st_offset, fd_size, fd_start, fd_end,
986 buftype_extent, striping_unit, two_phase_num_io_procs,
987 aggregator_list);
988 }
989 }
990
991 ret = ompi_request_wait_all(nprocs_send,
992 requests+nprocs_recv,
993 MPI_STATUS_IGNORE);
994
995 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
996 end_rcomm_time = MPI_Wtime();
997 rcomm_time += (end_rcomm_time - start_rcomm_time);
998 #endif
999
1000 exit:
1001
1002 if (recv_buf) {
1003 for (i=0; i< fh->f_size; i++){
1004 free(recv_buf[i]);
1005 }
1006
1007 free(recv_buf);
1008 }
1009
1010 free(requests);
1011
1012 return ret;
1013
1014 }
1015
1016
1017 #define TWO_PHASE_BUF_INCR \
1018 { \
1019 while (buf_incr) { \
1020 size_in_buf = OMPIO_MIN(buf_incr, flat_buf_sz); \
1021 user_buf_idx += size_in_buf; \
1022 flat_buf_sz -= size_in_buf; \
1023 if (!flat_buf_sz) { \
1024 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1025 else { \
1026 flat_buf_idx = 0; \
1027 n_buftypes++; \
1028 } \
1029 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
1030 (OMPI_MPI_OFFSET_TYPE)n_buftypes*(OMPI_MPI_OFFSET_TYPE)buftype_extent; \
1031 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
1032 } \
1033 buf_incr -= size_in_buf; \
1034 } \
1035 }
1036
1037
1038 #define TWO_PHASE_BUF_COPY \
1039 { \
1040 while (size) { \
1041 size_in_buf = OMPIO_MIN(size, flat_buf_sz); \
1042 memcpy(((char *) buf) + user_buf_idx, \
1043 &(recv_buf[p][recv_buf_idx[p]]), size_in_buf); \
1044 recv_buf_idx[p] += size_in_buf; \
1045 user_buf_idx += size_in_buf; \
1046 flat_buf_sz -= size_in_buf; \
1047 if (!flat_buf_sz) { \
1048 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1049 else { \
1050 flat_buf_idx = 0; \
1051 n_buftypes++; \
1052 } \
1053 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
1054 (OMPI_MPI_OFFSET_TYPE)n_buftypes*(OMPI_MPI_OFFSET_TYPE)buftype_extent; \
1055 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
1056 } \
1057 size -= size_in_buf; \
1058 buf_incr -= size_in_buf; \
1059 } \
1060 TWO_PHASE_BUF_INCR \
1061 }
1062
1063
1064
two_phase_fill_user_buffer(ompio_file_t * fh,void * buf,Flatlist_node * flat_buf,char ** recv_buf,struct iovec * offset_length,unsigned * recv_size,MPI_Request * requests,int * recd_from_proc,int contig_access_count,OMPI_MPI_OFFSET_TYPE min_st_offset,OMPI_MPI_OFFSET_TYPE fd_size,OMPI_MPI_OFFSET_TYPE * fd_start,OMPI_MPI_OFFSET_TYPE * fd_end,MPI_Aint buftype_extent,int striping_unit,int two_phase_num_io_procs,int * aggregator_list)1065 static void two_phase_fill_user_buffer(ompio_file_t *fh,
1066 void *buf,
1067 Flatlist_node *flat_buf,
1068 char **recv_buf,
1069 struct iovec *offset_length,
1070 unsigned *recv_size,
1071 MPI_Request *requests,
1072 int *recd_from_proc,
1073 int contig_access_count,
1074 OMPI_MPI_OFFSET_TYPE min_st_offset,
1075 OMPI_MPI_OFFSET_TYPE fd_size,
1076 OMPI_MPI_OFFSET_TYPE *fd_start,
1077 OMPI_MPI_OFFSET_TYPE *fd_end,
1078 MPI_Aint buftype_extent,
1079 int striping_unit, int two_phase_num_io_procs,
1080 int *aggregator_list){
1081
1082 int i = 0, p = 0, flat_buf_idx = 0;
1083 OMPI_MPI_OFFSET_TYPE flat_buf_sz = 0, size_in_buf = 0, buf_incr = 0, size = 0;
1084 int n_buftypes = 0;
1085 OMPI_MPI_OFFSET_TYPE off=0, len=0, rem_len=0, user_buf_idx=0;
1086 unsigned *curr_from_proc=NULL, *done_from_proc=NULL, *recv_buf_idx=NULL;
1087
1088 curr_from_proc = (unsigned *) malloc (fh->f_size * sizeof(unsigned));
1089 done_from_proc = (unsigned *) malloc (fh->f_size * sizeof(unsigned));
1090 recv_buf_idx = (unsigned *) malloc (fh->f_size * sizeof(unsigned));
1091
1092 for (i=0; i < fh->f_size; i++) {
1093 recv_buf_idx[i] = curr_from_proc[i] = 0;
1094 done_from_proc[i] = recd_from_proc[i];
1095 }
1096
1097 flat_buf_idx = 0;
1098 n_buftypes = 0;
1099
1100 if ( flat_buf->count > 0 ) {
1101 user_buf_idx = flat_buf->indices[0];
1102 flat_buf_sz = flat_buf->blocklens[0];
1103 }
1104
1105 /* flat_buf_idx = current index into flattened buftype
1106 flat_buf_sz = size of current contiguous component in
1107 flattened buf */
1108
1109 for (i=0; i<contig_access_count; i++) {
1110
1111 off = (OMPI_MPI_OFFSET_TYPE)(intptr_t)offset_length[i].iov_base;
1112 rem_len = (OMPI_MPI_OFFSET_TYPE)offset_length[i].iov_len;
1113
1114 /* this request may span the file domains of more than one process */
1115 while (rem_len != 0) {
1116 len = rem_len;
1117 /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no
1118 * longer than the single region that processor "p" is responsible
1119 * for.
1120 */
1121 p = mca_fcoll_two_phase_calc_aggregator(fh,
1122 off,
1123 min_st_offset,
1124 &len,
1125 fd_size,
1126 fd_start,
1127 fd_end,
1128 striping_unit,
1129 two_phase_num_io_procs,
1130 aggregator_list);
1131
1132 if (recv_buf_idx[p] < recv_size[p]) {
1133 if (curr_from_proc[p]+len > done_from_proc[p]) {
1134 if (done_from_proc[p] > curr_from_proc[p]) {
1135 size = OMPIO_MIN(curr_from_proc[p] + len -
1136 done_from_proc[p], recv_size[p]-recv_buf_idx[p]);
1137 buf_incr = done_from_proc[p] - curr_from_proc[p];
1138 TWO_PHASE_BUF_INCR
1139 buf_incr = curr_from_proc[p]+len-done_from_proc[p];
1140 curr_from_proc[p] = done_from_proc[p] + size;
1141 TWO_PHASE_BUF_COPY
1142 }
1143 else {
1144 size = OMPIO_MIN(len,recv_size[p]-recv_buf_idx[p]);
1145 buf_incr = len;
1146 curr_from_proc[p] += (unsigned) size;
1147 TWO_PHASE_BUF_COPY
1148 }
1149 }
1150 else {
1151 curr_from_proc[p] += (unsigned) len;
1152 buf_incr = len;
1153 TWO_PHASE_BUF_INCR
1154 }
1155 }
1156 else {
1157 buf_incr = len;
1158 TWO_PHASE_BUF_INCR
1159 }
1160 off += len;
1161 rem_len -= len;
1162 }
1163 }
1164 for (i=0; i < fh->f_size; i++)
1165 if (recv_size[i]) recd_from_proc[i] = curr_from_proc[i];
1166
1167 free(curr_from_proc);
1168 free(done_from_proc);
1169 free(recv_buf_idx);
1170
1171 }
1172
1173 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
isread_aggregator(int rank,int nprocs_for_coll,int * aggregator_list)1174 int isread_aggregator(int rank,
1175 int nprocs_for_coll,
1176 int *aggregator_list){
1177
1178 int i=0;
1179 for (i=0; i<nprocs_for_coll; i++){
1180 if (aggregator_list[i] == rank)
1181 return 1;
1182 }
1183 return 0;
1184 }
1185 #endif
1186