1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil -*- */
2 /*
3 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
4 * University Research and Technology
5 * Corporation. All rights reserved.
6 * Copyright (c) 2004-2017 The University of Tennessee and The University
7 * of Tennessee Research Foundation. All rights
8 * reserved.
9 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
10 * University of Stuttgart. All rights reserved.
11 * Copyright (c) 2004-2005 The Regents of the University of California.
12 * All rights reserved.
13 * Copyright (c) 2008-2014 University of Houston. All rights reserved.
14 * Copyright (c) 2015 Cisco Systems, Inc. All rights reserved.
15 * Copyright (c) 2015 Los Alamos National Security, LLC. All rights
16 * reserved.
17 * Copyright (c) 2017 Research Organization for Information Science
18 * and Technology (RIST). All rights reserved.
19 * $COPYRIGHT$
20 *
21 * Additional copyrights may follow
22 *
23 * $HEADER$
24 */
25
26 #include "ompi_config.h"
27 #include "fcoll_two_phase.h"
28 #include "mpi.h"
29 #include "ompi/constants.h"
30 #include "ompi/communicator/communicator.h"
31 #include "ompi/mca/fcoll/fcoll.h"
32 #include "ompi/mca/io/ompio/io_ompio.h"
33 #include "ompi/mca/io/io.h"
34 #include "opal/mca/base/base.h"
35 #include "math.h"
36 #include "ompi/mca/pml/pml.h"
37 #include <unistd.h>
38
39 #define DEBUG 0
40
41 /* Two Phase implementation from ROMIO ported to OMPIO infrastructure
42 * This is pretty much the same as ROMIO's two_phase and based on ROMIO's code
43 * base
44 */
45
46
47 /* Datastructure to support specifying the flat-list. */
48 typedef struct flat_list_node {
49 MPI_Datatype type;
50 int count;
51 OMPI_MPI_OFFSET_TYPE *blocklens;
52 OMPI_MPI_OFFSET_TYPE *indices;
53 struct flat_list_node *next;
54 }Flatlist_node;
55
56 /* local function declarations */
57 static int two_phase_read_and_exch(mca_io_ompio_file_t *fh,
58 void *buf,
59 MPI_Datatype datatype,
60 mca_io_ompio_access_array_t *others_req,
61 struct iovec *offset_len,
62 int contig_access_count,
63 OMPI_MPI_OFFSET_TYPE min_st_offset,
64 OMPI_MPI_OFFSET_TYPE fd_size,
65 OMPI_MPI_OFFSET_TYPE *fd_start,
66 OMPI_MPI_OFFSET_TYPE *fd_end,
67 Flatlist_node *flat_buf,
68 size_t *buf_idx, int striping_unit,
69 int num_io_procs, int *aggregator_list);
70
71 static int two_phase_exchange_data(mca_io_ompio_file_t *fh,
72 void *buf,
73 struct iovec *offset_length,
74 int *send_size, int *start_pos,
75 int *recv_size,
76 int *count,
77 int *partial_send, int *recd_from_proc,
78 int contig_access_count,
79 OMPI_MPI_OFFSET_TYPE min_st_offset,
80 OMPI_MPI_OFFSET_TYPE fd_size,
81 OMPI_MPI_OFFSET_TYPE *fd_start,
82 OMPI_MPI_OFFSET_TYPE *fd_end,
83 Flatlist_node *flat_buf,
84 mca_io_ompio_access_array_t *others_req,
85 int iter,
86 size_t *buf_idx, MPI_Aint buftype_extent,
87 int striping_unit, int num_io_procs,
88 int *aggregator_list);
89
90
91 static void two_phase_fill_user_buffer(mca_io_ompio_file_t *fh,
92 void *buf,
93 Flatlist_node *flat_buf,
94 char **recv_buf,
95 struct iovec *offset_length,
96 unsigned *recv_size,
97 MPI_Request *requests,
98 int *recd_from_proc,
99 int contig_access_count,
100 OMPI_MPI_OFFSET_TYPE min_st_offset,
101 OMPI_MPI_OFFSET_TYPE fd_size,
102 OMPI_MPI_OFFSET_TYPE *fd_start,
103 OMPI_MPI_OFFSET_TYPE *fd_end,
104 MPI_Aint buftype_extent,
105 int striping_unit,
106 int num_io_procs, int *aggregator_list);
107 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
108 static int isread_aggregator(int rank,
109 int nprocs_for_coll,
110 int *aggregator_list);
111
112 #endif
113
114 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
115 double read_time = 0.0, start_read_time = 0.0, end_read_time = 0.0;
116 double rcomm_time = 0.0, start_rcomm_time = 0.0, end_rcomm_time = 0.0;
117 double read_exch = 0.0, start_rexch = 0.0, end_rexch = 0.0;
118 #endif
119
120
121 int
mca_fcoll_two_phase_file_read_all(mca_io_ompio_file_t * fh,void * buf,int count,struct ompi_datatype_t * datatype,ompi_status_public_t * status)122 mca_fcoll_two_phase_file_read_all (mca_io_ompio_file_t *fh,
123 void *buf,
124 int count,
125 struct ompi_datatype_t *datatype,
126 ompi_status_public_t *status)
127 {
128
129 int ret = OMPI_SUCCESS, i = 0, j = 0, interleave_count = 0, striping_unit = 0;
130 MPI_Aint recv_buf_addr = 0;
131 uint32_t iov_count = 0, ti = 0;
132 struct iovec *decoded_iov = NULL, *temp_iov = NULL, *iov = NULL;
133 size_t max_data = 0;
134 long long_max_data = 0, long_total_bytes = 0;
135 int domain_size=0, *count_my_req_per_proc=NULL, count_my_req_procs = 0;
136 int count_other_req_procs;
137 size_t *buf_indices=NULL;
138 int *aggregator_list = NULL, local_count = 0, local_size = 0;
139 int two_phase_num_io_procs=1;
140 OMPI_MPI_OFFSET_TYPE start_offset = 0, end_offset = 0, fd_size = 0;
141 OMPI_MPI_OFFSET_TYPE *start_offsets=NULL, *end_offsets=NULL;
142 OMPI_MPI_OFFSET_TYPE *fd_start=NULL, *fd_end=NULL, min_st_offset = 0;
143 Flatlist_node *flat_buf=NULL;
144 mca_io_ompio_access_array_t *my_req=NULL, *others_req=NULL;
145 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
146 mca_common_ompio_print_entry nentry;
147 #endif
148 // if (opal_datatype_is_predefined(&datatype->super)) {
149 // fh->f_flags = fh->f_flags | OMPIO_CONTIGUOUS_MEMORY;
150 // }
151
152 if (! (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
153 ret = fh->f_decode_datatype ((struct mca_io_ompio_file_t *)fh,
154 datatype,
155 count,
156 buf,
157 &max_data,
158 &temp_iov,
159 &iov_count);
160 if (OMPI_SUCCESS != ret ){
161 goto exit;
162 }
163
164 recv_buf_addr = (size_t)(buf);
165 decoded_iov = (struct iovec *) calloc
166 (iov_count, sizeof(struct iovec));
167
168 for (ti = 0; ti < iov_count; ti++){
169
170 decoded_iov[ti].iov_base = (IOVBASE_TYPE *)
171 ((ptrdiff_t)temp_iov[ti].iov_base - recv_buf_addr);
172 decoded_iov[ti].iov_len = temp_iov[ti].iov_len;
173 #if DEBUG
174 printf("d_offset[%d]: %ld, d_len[%d]: %ld\n",
175 ti, (ptrdiff_t)decoded_iov[ti].iov_base,
176 ti, decoded_iov[ti].iov_len);
177 #endif
178 }
179
180 }
181 else{
182 max_data = count * datatype->super.size;
183 }
184
185 if ( MPI_STATUS_IGNORE != status ) {
186 status->_ucount = max_data;
187 }
188
189 fh->f_get_num_aggregators (&two_phase_num_io_procs);
190 if (-1 == two_phase_num_io_procs ){
191 ret = fh->f_set_aggregator_props ((struct mca_io_ompio_file_t *)fh,
192 two_phase_num_io_procs,
193 max_data);
194 if (OMPI_SUCCESS != ret){
195 goto exit;
196 }
197
198 two_phase_num_io_procs = fh->f_final_num_aggrs;
199
200 }
201
202 if (two_phase_num_io_procs > fh->f_size){
203 two_phase_num_io_procs = fh->f_size;
204 }
205
206 aggregator_list = (int *) calloc (two_phase_num_io_procs, sizeof(int));
207 if (NULL == aggregator_list){
208 ret = OMPI_ERR_OUT_OF_RESOURCE;
209 goto exit;
210 }
211
212 if ( OMPI_COMM_IS_MAPBY_NODE (&ompi_mpi_comm_world.comm) ) {
213 for (i =0; i< two_phase_num_io_procs; i++){
214 aggregator_list[i] = i;
215 }
216 }
217 else {
218 for (i =0; i< two_phase_num_io_procs; i++){
219 aggregator_list[i] = i * fh->f_size / two_phase_num_io_procs;
220 }
221 }
222
223 ret = fh->f_generate_current_file_view ((struct mca_io_ompio_file_t *)fh,
224 max_data,
225 &iov,
226 &local_count);
227
228 if (OMPI_SUCCESS != ret){
229 goto exit;
230 }
231
232 long_max_data = (long) max_data;
233 ret = fh->f_comm->c_coll->coll_allreduce (&long_max_data,
234 &long_total_bytes,
235 1,
236 MPI_LONG,
237 MPI_SUM,
238 fh->f_comm,
239 fh->f_comm->c_coll->coll_allreduce_module);
240
241 if ( OMPI_SUCCESS != ret ) {
242 goto exit;
243 }
244
245 if (!(fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
246
247 /* This datastructre translates between OMPIO->ROMIO its a little hacky!*/
248 /* But helps to re-use romio's code for handling non-contiguous file-type*/
249 /*Flattened datatype for ompio is in decoded_iov it translated into
250 flatbuf*/
251
252 flat_buf = (Flatlist_node *)calloc(1, sizeof(Flatlist_node));
253 if ( NULL == flat_buf ){
254 ret = OMPI_ERR_OUT_OF_RESOURCE;
255 goto exit;
256 }
257
258 flat_buf->type = datatype;
259 flat_buf->next = NULL;
260 flat_buf->count = 0;
261 flat_buf->indices = NULL;
262 flat_buf->blocklens = NULL;
263
264 if ( 0 < count ) {
265 local_size = OMPIO_MAX(1,iov_count/count);
266 }
267 else {
268 local_size = 0;
269 }
270
271
272 if ( 0 < local_size ) {
273 flat_buf->indices =
274 (OMPI_MPI_OFFSET_TYPE *)calloc(local_size,
275 sizeof(OMPI_MPI_OFFSET_TYPE));
276 if (NULL == flat_buf->indices){
277 ret = OMPI_ERR_OUT_OF_RESOURCE;
278 goto exit;
279 }
280
281 flat_buf->blocklens =
282 (OMPI_MPI_OFFSET_TYPE *)calloc(local_size,
283 sizeof(OMPI_MPI_OFFSET_TYPE));
284 if ( NULL == flat_buf->blocklens ){
285 ret = OMPI_ERR_OUT_OF_RESOURCE;
286 goto exit;
287 }
288 }
289 flat_buf->count = local_size;
290 for (j = 0 ; j < local_size ; ++j) {
291 flat_buf->indices[j] = (OMPI_MPI_OFFSET_TYPE)(intptr_t)decoded_iov[j].iov_base;
292 flat_buf->blocklens[j] = decoded_iov[j].iov_len;
293 }
294
295 #if DEBUG
296 printf("flat_buf count: %d\n",
297 flat_buf->count);
298 for(i=0;i<flat_buf->count;i++){
299 printf("%d: blocklen[%d] : %lld, indices[%d]: %lld\n",
300 fh->f_rank, i, flat_buf->blocklens[i], i ,flat_buf->indices[i]);
301 }
302 #endif
303 }
304
305 #if DEBUG
306 printf("%d: total_bytes:%ld, local_count: %d\n",
307 fh->f_rank, long_total_bytes, local_count);
308 for (i=0 ; i<local_count ; i++) {
309 printf("%d: fcoll:two_phase:read_all:OFFSET:%ld,LENGTH:%ld\n",
310 fh->f_rank,
311 (size_t)iov[i].iov_base,
312 (size_t)iov[i].iov_len);
313 }
314 #endif
315
316 start_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)iov[0].iov_base;
317 if ( 0 < local_count ) {
318 end_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)iov[local_count-1].iov_base +
319 (OMPI_MPI_OFFSET_TYPE)(intptr_t)iov[local_count-1].iov_len - 1;
320 }
321 else {
322 end_offset = 0;
323 }
324 #if DEBUG
325 printf("%d: START OFFSET:%ld, END OFFSET:%ld\n",
326 fh->f_rank,
327 (size_t)start_offset,
328 (size_t)end_offset);
329 #endif
330
331 start_offsets = (OMPI_MPI_OFFSET_TYPE *)calloc
332 (fh->f_size, sizeof(OMPI_MPI_OFFSET_TYPE));
333
334 if ( NULL == start_offsets ){
335 ret = OMPI_ERR_OUT_OF_RESOURCE;
336 goto exit;
337 }
338
339 end_offsets = (OMPI_MPI_OFFSET_TYPE *)calloc
340 (fh->f_size, sizeof(OMPI_MPI_OFFSET_TYPE));
341
342 if (NULL == end_offsets){
343 ret = OMPI_ERR_OUT_OF_RESOURCE;
344 goto exit;
345 }
346
347 ret = fh->f_comm->c_coll->coll_allgather(&start_offset,
348 1,
349 OMPI_OFFSET_DATATYPE,
350 start_offsets,
351 1,
352 OMPI_OFFSET_DATATYPE,
353 fh->f_comm,
354 fh->f_comm->c_coll->coll_allgather_module);
355
356 if ( OMPI_SUCCESS != ret ){
357 goto exit;
358 }
359
360 ret = fh->f_comm->c_coll->coll_allgather(&end_offset,
361 1,
362 OMPI_OFFSET_DATATYPE,
363 end_offsets,
364 1,
365 OMPI_OFFSET_DATATYPE,
366 fh->f_comm,
367 fh->f_comm->c_coll->coll_allgather_module);
368
369
370 if ( OMPI_SUCCESS != ret ){
371 goto exit;
372 }
373
374 #if DEBUG
375 for (i=0;i<fh->f_size;i++){
376 printf("%d: start[%d]:%ld,end[%d]:%ld\n",
377 fh->f_rank,i,
378 (size_t)start_offsets[i],i,
379 (size_t)end_offsets[i]);
380 }
381 #endif
382
383 for (i=1; i<fh->f_size; i++){
384 if ((start_offsets[i] < end_offsets[i-1]) &&
385 (start_offsets[i] <= end_offsets[i])){
386 interleave_count++;
387 }
388 }
389
390 #if DEBUG
391 printf("%d: interleave_count:%d\n",
392 fh->f_rank,interleave_count);
393 #endif
394
395 ret = mca_fcoll_two_phase_domain_partition(fh,
396 start_offsets,
397 end_offsets,
398 &min_st_offset,
399 &fd_start,
400 &fd_end,
401 domain_size,
402 &fd_size,
403 striping_unit,
404 two_phase_num_io_procs);
405 if (OMPI_SUCCESS != ret){
406 goto exit;
407 }
408
409 #if DEBUG
410 for (i=0;i<two_phase_num_io_procs;i++){
411 printf("fd_start[%d] : %lld, fd_end[%d] : %lld, local_count: %d\n",
412 i, fd_start[i], i, fd_end[i], local_count);
413 }
414 #endif
415
416 ret = mca_fcoll_two_phase_calc_my_requests (fh,
417 iov,
418 local_count,
419 min_st_offset,
420 fd_start,
421 fd_end,
422 fd_size,
423 &count_my_req_procs,
424 &count_my_req_per_proc,
425 &my_req,
426 &buf_indices,
427 striping_unit,
428 two_phase_num_io_procs,
429 aggregator_list);
430 if ( OMPI_SUCCESS != ret ){
431 goto exit;
432 }
433
434 ret = mca_fcoll_two_phase_calc_others_requests(fh,
435 count_my_req_procs,
436 count_my_req_per_proc,
437 my_req,
438 &count_other_req_procs,
439 &others_req);
440 if (OMPI_SUCCESS != ret ){
441 goto exit;
442 }
443
444 #if DEBUG
445 printf("%d count_other_req_procs : %d\n",
446 fh->f_rank,
447 count_other_req_procs);
448 #endif
449
450 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
451 start_rexch = MPI_Wtime();
452 #endif
453
454
455 ret = two_phase_read_and_exch(fh,
456 buf,
457 datatype,
458 others_req,
459 iov,
460 local_count,
461 min_st_offset,
462 fd_size,
463 fd_start,
464 fd_end,
465 flat_buf,
466 buf_indices,
467 striping_unit,
468 two_phase_num_io_procs,
469 aggregator_list);
470
471
472 if (OMPI_SUCCESS != ret){
473 goto exit;
474 }
475 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
476 end_rexch = MPI_Wtime();
477 read_exch += (end_rexch - start_rexch);
478 nentry.time[0] = read_time;
479 nentry.time[1] = rcomm_time;
480 nentry.time[2] = read_exch;
481 if (isread_aggregator(fh->f_rank,
482 two_phase_num_io_procs,
483 aggregator_list)){
484 nentry.aggregator = 1;
485 }
486 else{
487 nentry.aggregator = 0;
488 }
489 nentry.nprocs_for_coll = two_phase_num_io_procs;
490
491
492 if (!mca_common_ompio_full_print_queue(fh->f_coll_read_time)){
493 mca_common_ompio_register_print_entry(fh->f_coll_read_time,
494 nentry);
495 }
496 #endif
497
498
499 exit:
500 if (flat_buf != NULL){
501 if (flat_buf->blocklens != NULL){
502 free (flat_buf->blocklens);
503 }
504 if (flat_buf->indices != NULL){
505 free (flat_buf->indices);
506 }
507 free (flat_buf);
508 }
509
510 free (start_offsets);
511 free (end_offsets);
512 free (aggregator_list);
513 free (fd_start);
514 free (decoded_iov);
515 free (buf_indices);
516 free (count_my_req_per_proc);
517 free (my_req);
518 free (others_req);
519 free (fd_end);
520
521 return ret;
522 }
523
524
525
526
two_phase_read_and_exch(mca_io_ompio_file_t * fh,void * buf,MPI_Datatype datatype,mca_io_ompio_access_array_t * others_req,struct iovec * offset_len,int contig_access_count,OMPI_MPI_OFFSET_TYPE min_st_offset,OMPI_MPI_OFFSET_TYPE fd_size,OMPI_MPI_OFFSET_TYPE * fd_start,OMPI_MPI_OFFSET_TYPE * fd_end,Flatlist_node * flat_buf,size_t * buf_idx,int striping_unit,int two_phase_num_io_procs,int * aggregator_list)527 static int two_phase_read_and_exch(mca_io_ompio_file_t *fh,
528 void *buf,
529 MPI_Datatype datatype,
530 mca_io_ompio_access_array_t *others_req,
531 struct iovec *offset_len,
532 int contig_access_count,
533 OMPI_MPI_OFFSET_TYPE min_st_offset,
534 OMPI_MPI_OFFSET_TYPE fd_size,
535 OMPI_MPI_OFFSET_TYPE *fd_start,
536 OMPI_MPI_OFFSET_TYPE *fd_end,
537 Flatlist_node *flat_buf,
538 size_t *buf_idx, int striping_unit,
539 int two_phase_num_io_procs,
540 int *aggregator_list){
541
542
543 int ret=OMPI_SUCCESS, i = 0, j = 0, ntimes = 0, max_ntimes = 0;
544 int m = 0;
545 int *curr_offlen_ptr=NULL, *count=NULL, *send_size=NULL, *recv_size=NULL;
546 int *partial_send=NULL, *start_pos=NULL, req_len=0, flag=0;
547 int *recd_from_proc=NULL;
548 MPI_Aint buftype_extent=0;
549 size_t byte_size = 0;
550 OMPI_MPI_OFFSET_TYPE st_loc=-1, end_loc=-1, off=0, done=0, for_next_iter=0;
551 OMPI_MPI_OFFSET_TYPE size=0, req_off=0, real_size=0, real_off=0, len=0;
552 OMPI_MPI_OFFSET_TYPE for_curr_iter=0;
553 char *read_buf=NULL, *tmp_buf=NULL;
554 MPI_Datatype byte = MPI_BYTE;
555 int two_phase_cycle_buffer_size=0;
556
557 opal_datatype_type_size(&byte->super,
558 &byte_size);
559
560 for (i = 0; i < fh->f_size; i++){
561 if (others_req[i].count) {
562 st_loc = others_req[i].offsets[0];
563 end_loc = others_req[i].offsets[0];
564 break;
565 }
566 }
567
568 for (i=0;i<fh->f_size;i++){
569 for(j=0;j< others_req[i].count; j++){
570 st_loc =
571 OMPIO_MIN(st_loc, others_req[i].offsets[j]);
572 end_loc =
573 OMPIO_MAX(end_loc, (others_req[i].offsets[j] +
574 others_req[i].lens[j] - 1));
575 }
576 }
577
578 fh->f_get_bytes_per_agg ( &two_phase_cycle_buffer_size);
579 ntimes = (int)((end_loc - st_loc + two_phase_cycle_buffer_size)/
580 two_phase_cycle_buffer_size);
581
582 if ((st_loc == -1) && (end_loc == -1)){
583 ntimes = 0;
584 }
585
586 fh->f_comm->c_coll->coll_allreduce (&ntimes,
587 &max_ntimes,
588 1,
589 MPI_INT,
590 MPI_MAX,
591 fh->f_comm,
592 fh->f_comm->c_coll->coll_allreduce_module);
593
594 if (ntimes){
595 read_buf = (char *) calloc (two_phase_cycle_buffer_size, sizeof(char));
596 if ( NULL == read_buf ){
597 ret = OMPI_ERR_OUT_OF_RESOURCE;
598 goto exit;
599 }
600 }
601
602 curr_offlen_ptr = (int *)calloc (fh->f_size,
603 sizeof(int));
604 if (NULL == curr_offlen_ptr){
605 ret = OMPI_ERR_OUT_OF_RESOURCE;
606 goto exit;
607 }
608
609 count = (int *)calloc (fh->f_size,
610 sizeof(int));
611 if (NULL == count){
612 ret = OMPI_ERR_OUT_OF_RESOURCE;
613 goto exit;
614 }
615
616 partial_send = (int *)calloc(fh->f_size, sizeof(int));
617 if ( NULL == partial_send ){
618 ret = OMPI_ERR_OUT_OF_RESOURCE;
619 goto exit;
620 }
621
622 send_size = (int *)malloc(fh->f_size * sizeof(int));
623 if (NULL == send_size){
624 ret = OMPI_ERR_OUT_OF_RESOURCE;
625 goto exit;
626 }
627
628 recv_size = (int *)malloc(fh->f_size * sizeof(int));
629 if (NULL == recv_size){
630 ret = OMPI_ERR_OUT_OF_RESOURCE;
631 goto exit;
632 }
633
634 recd_from_proc = (int *)calloc(fh->f_size,sizeof(int));
635 if (NULL == recd_from_proc){
636 ret = OMPI_ERR_OUT_OF_RESOURCE;
637 goto exit;
638 }
639
640 start_pos = (int *) calloc(fh->f_size, sizeof(int));
641 if ( NULL == start_pos ){
642 ret = OMPI_ERR_OUT_OF_RESOURCE;
643 goto exit;
644 }
645
646 done = 0;
647 off = st_loc;
648 for_curr_iter = for_next_iter = 0;
649
650 ompi_datatype_type_extent(datatype, &buftype_extent);
651
652 for (m=0; m<ntimes; m++) {
653
654 size = OMPIO_MIN((unsigned)two_phase_cycle_buffer_size, end_loc-st_loc+1-done);
655 real_off = off - for_curr_iter;
656 real_size = size + for_curr_iter;
657
658 for (i=0; i<fh->f_size; i++) count[i] = send_size[i] = 0;
659 for_next_iter = 0;
660
661 for (i=0; i<fh->f_size; i++) {
662 if (others_req[i].count) {
663 start_pos[i] = curr_offlen_ptr[i];
664 for (j=curr_offlen_ptr[i]; j<others_req[i].count;
665 j++) {
666 if (partial_send[i]) {
667 /* this request may have been partially
668 satisfied in the previous iteration. */
669 req_off = others_req[i].offsets[j] +
670 partial_send[i];
671 req_len = others_req[i].lens[j] -
672 partial_send[i];
673 partial_send[i] = 0;
674 /* modify the off-len pair to reflect this change */
675 others_req[i].offsets[j] = req_off;
676 others_req[i].lens[j] = req_len;
677 }
678 else {
679 req_off = others_req[i].offsets[j];
680 req_len = others_req[i].lens[j];
681 }
682 if (req_off < real_off + real_size) {
683 count[i]++;
684 PMPI_Address(read_buf+req_off-real_off,
685 &(others_req[i].mem_ptrs[j]));
686
687 send_size[i] += (int)(OMPIO_MIN(real_off + real_size - req_off,
688 (OMPI_MPI_OFFSET_TYPE)req_len));
689
690 if (real_off+real_size-req_off < (OMPI_MPI_OFFSET_TYPE)req_len) {
691 partial_send[i] = (int) (real_off + real_size - req_off);
692 if ((j+1 < others_req[i].count) &&
693 (others_req[i].offsets[j+1] <
694 real_off+real_size)) {
695 /* this is the case illustrated in the
696 figure above. */
697 for_next_iter = OMPIO_MAX(for_next_iter,
698 real_off + real_size - others_req[i].offsets[j+1]);
699 /* max because it must cover requests
700 from different processes */
701 }
702 break;
703 }
704 }
705 else break;
706 }
707 curr_offlen_ptr[i] = j;
708 }
709 }
710 flag = 0;
711 for (i=0; i<fh->f_size; i++)
712 if (count[i]) flag = 1;
713
714 if (flag) {
715
716 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
717 start_read_time = MPI_Wtime();
718 #endif
719
720 len = size * byte_size;
721 fh->f_io_array = (mca_io_ompio_io_array_t *)calloc
722 (1,sizeof(mca_io_ompio_io_array_t));
723 if (NULL == fh->f_io_array) {
724 opal_output(1, "OUT OF MEMORY\n");
725 ret = OMPI_ERR_OUT_OF_RESOURCE;
726 goto exit;
727 }
728 fh->f_io_array[0].offset = (IOVBASE_TYPE *)(intptr_t)off;
729 fh->f_io_array[0].length = len;
730 fh->f_io_array[0].memory_address =
731 read_buf+for_curr_iter;
732 fh->f_num_of_io_entries = 1;
733
734 if (fh->f_num_of_io_entries){
735 if ( 0 > fh->f_fbtl->fbtl_preadv (fh)) {
736 opal_output(1, "READ FAILED\n");
737 ret = OMPI_ERROR;
738 goto exit;
739 }
740 }
741
742 #if 0
743 int ii;
744 printf("%d: len/4 : %lld\n",
745 fh->f_rank,
746 len/4);
747 for (ii = 0; ii < len/4 ;ii++){
748 printf("%d: read_buf[%d]: %ld\n",
749 fh->f_rank,
750 ii,
751 (int *)read_buf[ii]);
752 }
753 #endif
754 fh->f_num_of_io_entries = 0;
755 if (NULL != fh->f_io_array) {
756 free (fh->f_io_array);
757 fh->f_io_array = NULL;
758 }
759
760 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
761 end_read_time = MPI_Wtime();
762 read_time += (end_read_time - start_read_time);
763 #endif
764
765
766 }
767
768 for_curr_iter = for_next_iter;
769
770 for (i=0; i< fh->f_size; i++){
771 recv_size[i] = 0;
772 }
773 two_phase_exchange_data(fh, buf, offset_len,
774 send_size, start_pos, recv_size, count,
775 partial_send, recd_from_proc,
776 contig_access_count,
777 min_st_offset, fd_size, fd_start, fd_end,
778 flat_buf, others_req, m, buf_idx,
779 buftype_extent, striping_unit, two_phase_num_io_procs,
780 aggregator_list);
781
782 if (for_next_iter){
783 tmp_buf = (char *) calloc (for_next_iter, sizeof(char));
784 memcpy(tmp_buf,
785 read_buf+real_size-for_next_iter,
786 for_next_iter);
787 free(read_buf);
788 read_buf = (char *)malloc(for_next_iter+two_phase_cycle_buffer_size);
789 memcpy(read_buf, tmp_buf, for_next_iter);
790 free(tmp_buf);
791 }
792
793 off += size;
794 done += size;
795 }
796
797 for (i=0; i<fh->f_size; i++) count[i] = send_size[i] = 0;
798 for (m=ntimes; m<max_ntimes; m++)
799 two_phase_exchange_data(fh, buf, offset_len, send_size,
800 start_pos, recv_size, count,
801 partial_send, recd_from_proc,
802 contig_access_count,
803 min_st_offset, fd_size, fd_start, fd_end,
804 flat_buf, others_req, m, buf_idx,
805 buftype_extent, striping_unit, two_phase_num_io_procs,
806 aggregator_list);
807
808 exit:
809 free (read_buf);
810 free (curr_offlen_ptr);
811 free (count);
812 free (partial_send);
813 free (send_size);
814 free (recv_size);
815 free (recd_from_proc);
816 free (start_pos);
817
818 return ret;
819
820 }
821
two_phase_exchange_data(mca_io_ompio_file_t * fh,void * buf,struct iovec * offset_len,int * send_size,int * start_pos,int * recv_size,int * count,int * partial_send,int * recd_from_proc,int contig_access_count,OMPI_MPI_OFFSET_TYPE min_st_offset,OMPI_MPI_OFFSET_TYPE fd_size,OMPI_MPI_OFFSET_TYPE * fd_start,OMPI_MPI_OFFSET_TYPE * fd_end,Flatlist_node * flat_buf,mca_io_ompio_access_array_t * others_req,int iter,size_t * buf_idx,MPI_Aint buftype_extent,int striping_unit,int two_phase_num_io_procs,int * aggregator_list)822 static int two_phase_exchange_data(mca_io_ompio_file_t *fh,
823 void *buf, struct iovec *offset_len,
824 int *send_size, int *start_pos, int *recv_size,
825 int *count, int *partial_send,
826 int *recd_from_proc, int contig_access_count,
827 OMPI_MPI_OFFSET_TYPE min_st_offset,
828 OMPI_MPI_OFFSET_TYPE fd_size,
829 OMPI_MPI_OFFSET_TYPE *fd_start,
830 OMPI_MPI_OFFSET_TYPE *fd_end,
831 Flatlist_node *flat_buf,
832 mca_io_ompio_access_array_t *others_req,
833 int iter, size_t *buf_idx,
834 MPI_Aint buftype_extent, int striping_unit,
835 int two_phase_num_io_procs, int *aggregator_list)
836 {
837
838 int i=0, j=0, k=0, tmp=0, nprocs_recv=0, nprocs_send=0;
839 int ret = OMPI_SUCCESS;
840 char **recv_buf = NULL;
841 MPI_Request *requests=NULL;
842 MPI_Datatype send_type;
843
844
845
846 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
847 start_rcomm_time = MPI_Wtime();
848 #endif
849
850 ret = fh->f_comm->c_coll->coll_alltoall (send_size,
851 1,
852 MPI_INT,
853 recv_size,
854 1,
855 MPI_INT,
856 fh->f_comm,
857 fh->f_comm->c_coll->coll_alltoall_module);
858
859 if ( OMPI_SUCCESS != ret ){
860 goto exit;
861 }
862
863
864 #if DEBUG
865 for (i=0; i<fh->f_size; i++){
866 printf("%d: RS[%d]: %d\n", fh->f_rank,
867 i,
868 recv_size[i]);
869 }
870 #endif
871
872
873 nprocs_recv = 0;
874 for (i=0; i < fh->f_size; i++)
875 if (recv_size[i]) nprocs_recv++;
876
877 nprocs_send = 0;
878 for (i=0; i< fh->f_size; i++)
879 if (send_size[i]) nprocs_send++;
880
881 requests = (MPI_Request *)
882 malloc((nprocs_send+nprocs_recv+1) * sizeof(MPI_Request));
883
884 if (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY) {
885 j = 0;
886 for (i=0; i < fh->f_size; i++){
887 if (recv_size[i]){
888 ret = MCA_PML_CALL(irecv(((char *) buf)+ buf_idx[i],
889 recv_size[i],
890 MPI_BYTE,
891 i,
892 fh->f_rank+i+100*iter,
893 fh->f_comm,
894 requests+j));
895
896 if ( OMPI_SUCCESS != ret ){
897 return ret;
898 }
899 j++;
900 buf_idx[i] += recv_size[i];
901 }
902 }
903 }
904 else{
905
906 recv_buf = (char **) calloc (fh->f_size, sizeof(char *));
907 if (NULL == recv_buf){
908 ret = OMPI_ERR_OUT_OF_RESOURCE;
909 goto exit;
910 }
911
912 for (i=0; i < fh->f_size; i++)
913 if(recv_size[i]) recv_buf[i] =
914 (char *) malloc (recv_size[i] * sizeof(char));
915 j = 0;
916 for(i=0; i<fh->f_size; i++)
917 if (recv_size[i]) {
918 ret = MCA_PML_CALL(irecv(recv_buf[i],
919 recv_size[i],
920 MPI_BYTE,
921 i,
922 fh->f_rank+i+100*iter,
923 fh->f_comm,
924 requests+j));
925 j++;
926
927 }
928 }
929
930
931
932 j = 0;
933 for (i = 0; i< fh->f_size; i++){
934 if (send_size[i]){
935 if (partial_send[i]){
936 k = start_pos[i] + count[i] - 1;
937 tmp = others_req[i].lens[k];
938 others_req[i].lens[k] = partial_send[i];
939 }
940
941 ompi_datatype_create_hindexed(count[i],
942 &(others_req[i].lens[start_pos[i]]),
943 &(others_req[i].mem_ptrs[start_pos[i]]),
944 MPI_BYTE,
945 &send_type);
946
947 ompi_datatype_commit(&send_type);
948
949 ret = MCA_PML_CALL(isend(MPI_BOTTOM,
950 1,
951 send_type,
952 i,
953 fh->f_rank+i+100*iter,
954 MCA_PML_BASE_SEND_STANDARD,
955 fh->f_comm,
956 requests+nprocs_recv+j));
957 ompi_datatype_destroy(&send_type);
958
959 if (partial_send[i]) others_req[i].lens[k] = tmp;
960 j++;
961 }
962 }
963
964
965 if (nprocs_recv) {
966
967 ret = ompi_request_wait_all(nprocs_recv,
968 requests,
969 MPI_STATUS_IGNORE);
970 if (OMPI_SUCCESS != ret) {
971 goto exit;
972 }
973
974 if (! (fh->f_flags & OMPIO_CONTIGUOUS_MEMORY)) {
975
976 two_phase_fill_user_buffer(fh, buf, flat_buf,
977 recv_buf, offset_len,
978 (unsigned *)recv_size, requests,
979 recd_from_proc, contig_access_count,
980 min_st_offset, fd_size, fd_start, fd_end,
981 buftype_extent, striping_unit, two_phase_num_io_procs,
982 aggregator_list);
983 }
984 }
985
986 ret = ompi_request_wait_all(nprocs_send,
987 requests+nprocs_recv,
988 MPI_STATUS_IGNORE);
989
990 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
991 end_rcomm_time = MPI_Wtime();
992 rcomm_time += (end_rcomm_time - start_rcomm_time);
993 #endif
994
995 exit:
996
997 if (recv_buf) {
998 for (i=0; i< fh->f_size; i++){
999 free(recv_buf[i]);
1000 }
1001
1002 free(recv_buf);
1003 }
1004
1005 free(requests);
1006
1007 return ret;
1008
1009 }
1010
1011
1012 #define TWO_PHASE_BUF_INCR \
1013 { \
1014 while (buf_incr) { \
1015 size_in_buf = OMPIO_MIN(buf_incr, flat_buf_sz); \
1016 user_buf_idx += size_in_buf; \
1017 flat_buf_sz -= size_in_buf; \
1018 if (!flat_buf_sz) { \
1019 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1020 else { \
1021 flat_buf_idx = 0; \
1022 n_buftypes++; \
1023 } \
1024 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
1025 (OMPI_MPI_OFFSET_TYPE)n_buftypes*(OMPI_MPI_OFFSET_TYPE)buftype_extent; \
1026 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
1027 } \
1028 buf_incr -= size_in_buf; \
1029 } \
1030 }
1031
1032
1033 #define TWO_PHASE_BUF_COPY \
1034 { \
1035 while (size) { \
1036 size_in_buf = OMPIO_MIN(size, flat_buf_sz); \
1037 memcpy(((char *) buf) + user_buf_idx, \
1038 &(recv_buf[p][recv_buf_idx[p]]), size_in_buf); \
1039 recv_buf_idx[p] += size_in_buf; \
1040 user_buf_idx += size_in_buf; \
1041 flat_buf_sz -= size_in_buf; \
1042 if (!flat_buf_sz) { \
1043 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
1044 else { \
1045 flat_buf_idx = 0; \
1046 n_buftypes++; \
1047 } \
1048 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
1049 (OMPI_MPI_OFFSET_TYPE)n_buftypes*(OMPI_MPI_OFFSET_TYPE)buftype_extent; \
1050 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
1051 } \
1052 size -= size_in_buf; \
1053 buf_incr -= size_in_buf; \
1054 } \
1055 TWO_PHASE_BUF_INCR \
1056 }
1057
1058
1059
two_phase_fill_user_buffer(mca_io_ompio_file_t * fh,void * buf,Flatlist_node * flat_buf,char ** recv_buf,struct iovec * offset_length,unsigned * recv_size,MPI_Request * requests,int * recd_from_proc,int contig_access_count,OMPI_MPI_OFFSET_TYPE min_st_offset,OMPI_MPI_OFFSET_TYPE fd_size,OMPI_MPI_OFFSET_TYPE * fd_start,OMPI_MPI_OFFSET_TYPE * fd_end,MPI_Aint buftype_extent,int striping_unit,int two_phase_num_io_procs,int * aggregator_list)1060 static void two_phase_fill_user_buffer(mca_io_ompio_file_t *fh,
1061 void *buf,
1062 Flatlist_node *flat_buf,
1063 char **recv_buf,
1064 struct iovec *offset_length,
1065 unsigned *recv_size,
1066 MPI_Request *requests,
1067 int *recd_from_proc,
1068 int contig_access_count,
1069 OMPI_MPI_OFFSET_TYPE min_st_offset,
1070 OMPI_MPI_OFFSET_TYPE fd_size,
1071 OMPI_MPI_OFFSET_TYPE *fd_start,
1072 OMPI_MPI_OFFSET_TYPE *fd_end,
1073 MPI_Aint buftype_extent,
1074 int striping_unit, int two_phase_num_io_procs,
1075 int *aggregator_list){
1076
1077 int i = 0, p = 0, flat_buf_idx = 0;
1078 OMPI_MPI_OFFSET_TYPE flat_buf_sz = 0, size_in_buf = 0, buf_incr = 0, size = 0;
1079 int n_buftypes = 0;
1080 OMPI_MPI_OFFSET_TYPE off=0, len=0, rem_len=0, user_buf_idx=0;
1081 unsigned *curr_from_proc=NULL, *done_from_proc=NULL, *recv_buf_idx=NULL;
1082
1083 curr_from_proc = (unsigned *) malloc (fh->f_size * sizeof(unsigned));
1084 done_from_proc = (unsigned *) malloc (fh->f_size * sizeof(unsigned));
1085 recv_buf_idx = (unsigned *) malloc (fh->f_size * sizeof(unsigned));
1086
1087 for (i=0; i < fh->f_size; i++) {
1088 recv_buf_idx[i] = curr_from_proc[i] = 0;
1089 done_from_proc[i] = recd_from_proc[i];
1090 }
1091
1092 flat_buf_idx = 0;
1093 n_buftypes = 0;
1094
1095 if ( flat_buf->count > 0 ) {
1096 user_buf_idx = flat_buf->indices[0];
1097 flat_buf_sz = flat_buf->blocklens[0];
1098 }
1099
1100 /* flat_buf_idx = current index into flattened buftype
1101 flat_buf_sz = size of current contiguous component in
1102 flattened buf */
1103
1104 for (i=0; i<contig_access_count; i++) {
1105
1106 off = (OMPI_MPI_OFFSET_TYPE)(intptr_t)offset_length[i].iov_base;
1107 rem_len = (OMPI_MPI_OFFSET_TYPE)offset_length[i].iov_len;
1108
1109 /* this request may span the file domains of more than one process */
1110 while (rem_len != 0) {
1111 len = rem_len;
1112 /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no
1113 * longer than the single region that processor "p" is responsible
1114 * for.
1115 */
1116 p = mca_fcoll_two_phase_calc_aggregator(fh,
1117 off,
1118 min_st_offset,
1119 &len,
1120 fd_size,
1121 fd_start,
1122 fd_end,
1123 striping_unit,
1124 two_phase_num_io_procs,
1125 aggregator_list);
1126
1127 if (recv_buf_idx[p] < recv_size[p]) {
1128 if (curr_from_proc[p]+len > done_from_proc[p]) {
1129 if (done_from_proc[p] > curr_from_proc[p]) {
1130 size = OMPIO_MIN(curr_from_proc[p] + len -
1131 done_from_proc[p], recv_size[p]-recv_buf_idx[p]);
1132 buf_incr = done_from_proc[p] - curr_from_proc[p];
1133 TWO_PHASE_BUF_INCR
1134 buf_incr = curr_from_proc[p]+len-done_from_proc[p];
1135 curr_from_proc[p] = done_from_proc[p] + size;
1136 TWO_PHASE_BUF_COPY
1137 }
1138 else {
1139 size = OMPIO_MIN(len,recv_size[p]-recv_buf_idx[p]);
1140 buf_incr = len;
1141 curr_from_proc[p] += (unsigned) size;
1142 TWO_PHASE_BUF_COPY
1143 }
1144 }
1145 else {
1146 curr_from_proc[p] += (unsigned) len;
1147 buf_incr = len;
1148 TWO_PHASE_BUF_INCR
1149 }
1150 }
1151 else {
1152 buf_incr = len;
1153 TWO_PHASE_BUF_INCR
1154 }
1155 off += len;
1156 rem_len -= len;
1157 }
1158 }
1159 for (i=0; i < fh->f_size; i++)
1160 if (recv_size[i]) recd_from_proc[i] = curr_from_proc[i];
1161
1162 free(curr_from_proc);
1163 free(done_from_proc);
1164 free(recv_buf_idx);
1165
1166 }
1167
1168 #if OMPIO_FCOLL_WANT_TIME_BREAKDOWN
isread_aggregator(int rank,int nprocs_for_coll,int * aggregator_list)1169 int isread_aggregator(int rank,
1170 int nprocs_for_coll,
1171 int *aggregator_list){
1172
1173 int i=0;
1174 for (i=0; i<nprocs_for_coll; i++){
1175 if (aggregator_list[i] == rank)
1176 return 1;
1177 }
1178 return 0;
1179 }
1180 #endif
1181