1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3 * Copyright (C) 2008 University of Chicago.
4 * See COPYRIGHT notice in top-level directory.
5 */
6
7 #include "assert.h"
8 #include "adio.h"
9 #include "adio_extern.h"
10 #ifdef AGGREGATION_PROFILE
11 #include "mpe.h"
12 #endif
13
14 /* #define ALLTOALL */
15
16 /* #define DEBUG */
17 /* #define DEBUG2 */ /* print buffers */
18
19 #define USE_PRE_REQ
20
21 static void Exch_data_amounts (ADIO_File fd, int nprocs,
22 ADIO_Offset *client_comm_sz_arr,
23 ADIO_Offset *agg_comm_sz_arr,
24 int *client_alltoallw_counts,
25 int *agg_alltoallw_counts,
26 int *aggregators_done);
27 static void post_aggregator_comm (MPI_Comm comm, int rw_type, int nproc,
28 void *cb_buf,
29 MPI_Datatype *client_comm_dtype_arr,
30 ADIO_Offset *client_comm_sz_arr,
31 MPI_Request **requests,
32 int *aggregators_client_count_p);
33
34 static void post_client_comm (ADIO_File fd, int rw_type,
35 int agg_rank, void *buf,
36 MPI_Datatype agg_comm_dtype,
37 int agg_alltoallw_count,
38 MPI_Request *request);
39
40 /* Avery Ching and Kenin Columa's reworked two-phase algorithm. Key features
41 * - persistent file domains
42 * - an option to use alltoall instead of point-to-point
43 */
ADIOI_IOStridedColl(ADIO_File fd,void * buf,int count,int rdwr,MPI_Datatype datatype,int file_ptr_type,ADIO_Offset offset,ADIO_Status * status,int * error_code)44 void ADIOI_IOStridedColl (ADIO_File fd, void *buf, int count, int rdwr,
45 MPI_Datatype datatype, int file_ptr_type,
46 ADIO_Offset offset, ADIO_Status *status,
47 int *error_code)
48 {
49 ADIO_Offset min_st_offset=0, max_end_offset=0;
50 ADIO_Offset st_end_offset[2];
51 ADIO_Offset *all_st_end_offsets = NULL;
52 int filetype_is_contig, buftype_is_contig, is_contig;
53 ADIO_Offset off;
54 int interleave_count = 0, i, nprocs, myrank, nprocs_for_coll;
55 int cb_enable;
56 ADIO_Offset bufsize;
57 MPI_Aint extent, lb;
58 #ifdef DEBUG2
59 MPI_Aint bufextent;
60 #endif
61 MPI_Count size;
62 int agg_rank;
63
64 ADIO_Offset agg_disp; /* aggregated file offset */
65 MPI_Datatype agg_dtype; /* aggregated file datatype */
66
67 int aggregators_done = 0;
68 ADIO_Offset buffered_io_size = 0;
69
70 int *alltoallw_disps;
71
72 int *alltoallw_counts;
73 int *client_alltoallw_counts;
74 int *agg_alltoallw_counts;
75
76 char *cb_buf = NULL;
77
78 MPI_Datatype *client_comm_dtype_arr; /* aggregator perspective */
79 MPI_Datatype *agg_comm_dtype_arr; /* client perspective */
80 ADIO_Offset *client_comm_sz_arr; /* aggregator perspective */
81 ADIO_Offset *agg_comm_sz_arr; /* client perspective */
82
83 /* file views for each client and aggregator */
84 view_state *client_file_view_state_arr = NULL;
85 view_state *agg_file_view_state_arr = NULL;
86 /* mem views for local process */
87 view_state *my_mem_view_state_arr = NULL;
88
89 MPI_Status *agg_comm_statuses = NULL;
90 MPI_Request *agg_comm_requests = NULL;
91 MPI_Status *client_comm_statuses = NULL;
92 MPI_Request *client_comm_requests = NULL;
93 int aggs_client_count = 0;
94 int clients_agg_count = 0;
95
96 MPI_Comm_size (fd->comm, &nprocs);
97 MPI_Comm_rank (fd->comm, &myrank);
98 #ifdef DEBUG
99 fprintf (stderr, "p%d: entering ADIOI_IOStridedColl\n", myrank);
100 #endif
101 #ifdef AGGREGATION_PROFILE
102 if (rdwr == ADIOI_READ)
103 MPE_Log_event (5010, 0, NULL);
104 else
105 MPE_Log_event (5012, 0, NULL);
106 #endif
107
108 /* I need to check if there are any outstanding nonblocking writes
109 to the file, which could potentially interfere with the writes
110 taking place in this collective write call. Since this is not
111 likely to be common, let me do the simplest thing possible here:
112 Each process completes all pending nonblocking operations before
113 completing. */
114
115 nprocs_for_coll = fd->hints->cb_nodes;
116
117 if (rdwr == ADIOI_READ)
118 cb_enable = fd->hints->cb_read;
119 else
120 cb_enable = fd->hints->cb_write;
121
122 /* only check for interleaving if cb_read isn't disabled */
123 if (cb_enable != ADIOI_HINT_DISABLE) {
124 /* find the starting and ending byte of my I/O access */
125 ADIOI_Calc_bounds (fd, count, datatype, file_ptr_type, offset,
126 &st_end_offset[0], &st_end_offset[1]);
127
128 /* allocate an array of start/end pairs */
129 all_st_end_offsets = (ADIO_Offset *)
130 ADIOI_Malloc (2*nprocs*sizeof(ADIO_Offset));
131 MPI_Allgather (st_end_offset, 2, ADIO_OFFSET, all_st_end_offsets, 2,
132 ADIO_OFFSET, fd->comm);
133
134 min_st_offset = all_st_end_offsets[0];
135 max_end_offset = all_st_end_offsets[1];
136
137 for (i=1; i<nprocs; i++) {
138 /* are the accesses of different processes interleaved? */
139 if ((all_st_end_offsets[i*2] < all_st_end_offsets[i*2-1]) &&
140 (all_st_end_offsets[i*2] <= all_st_end_offsets[i*2+1]))
141 interleave_count++;
142 /* This is a rudimentary check for interleaving, but should
143 * suffice for the moment. */
144
145 min_st_offset = ADIOI_MIN(all_st_end_offsets[i*2],
146 min_st_offset);
147 max_end_offset = ADIOI_MAX(all_st_end_offsets[i*2+1],
148 max_end_offset);
149 }
150 }
151
152 ADIOI_Datatype_iscontig (datatype, &buftype_is_contig);
153 ADIOI_Datatype_iscontig (fd->filetype, &filetype_is_contig);
154
155 if ((cb_enable == ADIOI_HINT_DISABLE
156 || (!interleave_count && (cb_enable == ADIOI_HINT_AUTO)))
157 && (fd->hints->cb_pfr != ADIOI_HINT_ENABLE)){
158 if (cb_enable != ADIOI_HINT_DISABLE) {
159 ADIOI_Free (all_st_end_offsets);
160 }
161
162 if (buftype_is_contig && filetype_is_contig) {
163 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
164 off = fd->disp + (fd->etype_size) * offset;
165 if (rdwr == ADIOI_READ)
166 ADIO_ReadContig(fd, buf, count, datatype,
167 ADIO_EXPLICIT_OFFSET, off, status,
168 error_code);
169 else
170 ADIO_WriteContig(fd, buf, count, datatype,
171 ADIO_EXPLICIT_OFFSET, off, status,
172 error_code);
173 }
174 else {
175 if (rdwr == ADIOI_READ)
176 ADIO_ReadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
177 0, status, error_code);
178 else
179 ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
180 0, status, error_code);
181 }
182 }
183 else {
184 if (rdwr == ADIOI_READ)
185 ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type,
186 offset, status, error_code);
187 else
188 ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
189 offset, status, error_code);
190 }
191 return;
192 }
193
194 MPI_Type_get_extent(datatype, &lb, &extent);
195 #ifdef DEBUG2
196 bufextent = extent * count;
197 #endif
198 MPI_Type_size_x(datatype, &size);
199 bufsize = size * (MPI_Count)count;
200
201 /* Calculate file realms */
202 if ((fd->hints->cb_pfr != ADIOI_HINT_ENABLE) ||
203 (fd->file_realm_types == NULL))
204 ADIOI_Calc_file_realms (fd, min_st_offset, max_end_offset);
205
206 my_mem_view_state_arr = (view_state *)
207 ADIOI_Calloc (1, nprocs * sizeof(view_state));
208 agg_file_view_state_arr = (view_state *)
209 ADIOI_Calloc (1, nprocs * sizeof(view_state));
210 client_comm_sz_arr = (ADIO_Offset *)
211 ADIOI_Calloc (1, nprocs * sizeof(ADIO_Offset));
212
213 if (fd->is_agg) {
214 client_file_view_state_arr = (view_state *)
215 ADIOI_Calloc (1, nprocs * sizeof(view_state));
216 }
217 else {
218 client_file_view_state_arr = NULL;
219 }
220
221 /* Alltoallw doesn't like a null array even if the counts are
222 * zero. If you do not include this code, it will fail. */
223 client_comm_dtype_arr = (MPI_Datatype *)
224 ADIOI_Calloc (1, nprocs * sizeof(MPI_Datatype));
225 if (!fd->is_agg)
226 for (i = 0; i < nprocs; i++)
227 client_comm_dtype_arr[i] = MPI_BYTE;
228
229 ADIOI_Exch_file_views (myrank, nprocs, file_ptr_type, fd, count,
230 datatype, offset, my_mem_view_state_arr,
231 agg_file_view_state_arr,
232 client_file_view_state_arr);
233
234 agg_comm_sz_arr = (ADIO_Offset *)
235 ADIOI_Calloc (1, nprocs * sizeof(ADIO_Offset));
236 agg_comm_dtype_arr = (MPI_Datatype *)
237 ADIOI_Malloc (nprocs * sizeof(MPI_Datatype));
238 if (fd->is_agg) {
239 ADIOI_Build_agg_reqs (fd, rdwr, nprocs,
240 client_file_view_state_arr,
241 client_comm_dtype_arr,
242 client_comm_sz_arr,
243 &agg_disp,
244 &agg_dtype);
245 buffered_io_size = 0;
246 for (i=0; i <nprocs; i++) {
247 if (client_comm_sz_arr[i] > 0)
248 buffered_io_size += client_comm_sz_arr[i];
249 }
250 }
251 #ifdef USE_PRE_REQ
252 else
253 {
254 /* Example use of ADIOI_Build_client_pre_req. to an
255 * appropriate section */
256
257 for (i = 0; i < fd->hints->cb_nodes; i++)
258 {
259 agg_rank = fd->hints->ranklist[(i+myrank)%fd->hints->cb_nodes];
260 #ifdef AGGREGATION_PROFILE
261 MPE_Log_event (5040, 0, NULL);
262 #endif
263 ADIOI_Build_client_pre_req(
264 fd, agg_rank, (i+myrank)%fd->hints->cb_nodes,
265 &(my_mem_view_state_arr[agg_rank]),
266 &(agg_file_view_state_arr[agg_rank]),
267 2*1024*1024,
268 64*1024);
269 #ifdef AGGREGATION_PROFILE
270 MPE_Log_event (5041, 0, NULL);
271 #endif
272 }
273 }
274 #endif
275
276
277 if (fd->is_agg)
278 cb_buf = (char *) ADIOI_Malloc (fd->hints->cb_buffer_size);
279 alltoallw_disps = (int *) ADIOI_Calloc (nprocs, sizeof(int));
280 alltoallw_counts = client_alltoallw_counts = (int *)
281 ADIOI_Calloc (2*nprocs, sizeof(int));
282 agg_alltoallw_counts = &alltoallw_counts[nprocs];
283
284 if (fd->hints->cb_alltoall == ADIOI_HINT_DISABLE) {
285 /* aggregators pre-post all Irecv's for incoming data from clients */
286 if ((fd->is_agg) && (rdwr == ADIOI_WRITE))
287 post_aggregator_comm(fd->comm, rdwr, nprocs, cb_buf,
288 client_comm_dtype_arr,
289 client_comm_sz_arr,
290 &agg_comm_requests,
291 &aggs_client_count);
292 }
293 /* Aggregators send amounts for data requested to clients */
294 Exch_data_amounts (fd, nprocs, client_comm_sz_arr, agg_comm_sz_arr,
295 client_alltoallw_counts, agg_alltoallw_counts,
296 &aggregators_done);
297
298 #ifdef DEBUG
299 fprintf (stderr, "client_alltoallw_counts[ ");
300 for (i=0; i<nprocs; i++) {
301 fprintf (stderr, "%d ", client_alltoallw_counts[i]);
302 }
303 fprintf (stderr, "]\n");
304 fprintf (stderr, "agg_alltoallw_counts[ ");
305 for (i=0; i<nprocs; i++) {
306 fprintf (stderr,"%d ", agg_alltoallw_counts[i]);
307 }
308 fprintf (stderr, "]\n");
309 #endif
310
311 /* keep looping while aggregators still have I/O to do */
312 while (aggregators_done != nprocs_for_coll) {
313 if (fd->hints->cb_alltoall == ADIOI_HINT_DISABLE) {
314 /* clients should build datatypes for local memory locations
315 for data communication with aggregators and post
316 communication as the datatypes are built */
317
318 client_comm_requests = (MPI_Request *)
319 ADIOI_Calloc (fd->hints->cb_nodes, sizeof(MPI_Request));
320
321 for (i = 0; i < fd->hints->cb_nodes; i++)
322 {
323 clients_agg_count = 0;
324 agg_rank = fd->hints->ranklist[(i+myrank)%fd->hints->cb_nodes];
325 if (agg_comm_sz_arr[agg_rank] > 0) {
326 ADIOI_Build_client_req(fd, agg_rank,
327 (i+myrank)%fd->hints->cb_nodes,
328 &(my_mem_view_state_arr[agg_rank]),
329 &(agg_file_view_state_arr[agg_rank]),
330 agg_comm_sz_arr[agg_rank],
331 &(agg_comm_dtype_arr[agg_rank]));
332
333 #ifdef AGGREGATION_PROFILE
334 if (i == 0)
335 MPE_Log_event (5038, 0, NULL);
336 #endif
337 post_client_comm (fd, rdwr, agg_rank, buf,
338 agg_comm_dtype_arr[agg_rank],
339 agg_alltoallw_counts[agg_rank],
340 &client_comm_requests[clients_agg_count]);
341 clients_agg_count++;
342 }
343 }
344 #ifdef AGGREGATION_PROFILE
345 if (!clients_agg_count)
346 MPE_Log_event(5039, 0, NULL);
347 #endif
348
349 if (rdwr == ADIOI_READ) {
350 if (fd->is_agg && buffered_io_size) {
351 ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
352 ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
353 ADIOI_READ, status, error_code);
354 if (*error_code != MPI_SUCCESS) return;
355 MPI_Type_free (&agg_dtype);
356 }
357
358 #ifdef DEBUG
359 fprintf (stderr, "expecting from [agg](disp,size,cnt)=");
360 for (i=0; i < nprocs; i++) {
361 MPI_Type_size_x (agg_comm_dtype_arr[i], &size);
362 fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i],
363 size, agg_alltoallw_counts[i]);
364 if (i != nprocs - 1)
365 fprintf(stderr, ",");
366 }
367 fprintf (stderr, "]\n");
368 if (fd->is_agg) {
369 fprintf (stderr, "sending to [client](disp,size,cnt)=");
370 for (i=0; i < nprocs; i++) {
371 if (fd->is_agg)
372 MPI_Type_size_x (client_comm_dtype_arr[i], &size);
373 else
374 size = -1;
375
376 fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i],
377 size, client_alltoallw_counts[i]);
378 if (i != nprocs - 1)
379 fprintf(stderr, ",");
380 }
381 fprintf (stderr,"\n");
382 }
383 fflush (NULL);
384 #endif
385 /* aggregators post all Isends for outgoing data to clients */
386 if (fd->is_agg)
387 post_aggregator_comm(fd->comm, rdwr, nprocs, cb_buf,
388 client_comm_dtype_arr,
389 client_comm_sz_arr,
390 &agg_comm_requests,
391 &aggs_client_count);
392
393 if (fd->is_agg && aggs_client_count) {
394 agg_comm_statuses = ADIOI_Malloc(aggs_client_count *
395 sizeof(MPI_Status));
396 MPI_Waitall(aggs_client_count, agg_comm_requests,
397 agg_comm_statuses);
398 #ifdef AGGREGATION_PROFILE
399 MPE_Log_event (5033, 0, NULL);
400 #endif
401 ADIOI_Free (agg_comm_requests);
402 ADIOI_Free (agg_comm_statuses);
403 }
404
405 if (clients_agg_count) {
406 client_comm_statuses = ADIOI_Malloc(clients_agg_count *
407 sizeof(MPI_Status));
408 MPI_Waitall(clients_agg_count, client_comm_requests,
409 client_comm_statuses);
410 #ifdef AGGREGATION_PROFILE
411 MPE_Log_event (5039, 0, NULL);
412 #endif
413 ADIOI_Free (client_comm_requests);
414 ADIOI_Free (client_comm_statuses);
415 }
416
417 #ifdef DEBUG2
418 fprintf (stderr, "buffered_io_size = %lld\n", buffered_io_size);
419 if (fd->is_agg && buffered_io_size) {
420 fprintf (stderr, "buf = [");
421 for (i=0; i<bufextent; i++)
422 fprintf (stderr, "%c", ((char *) buf)[i]);
423 fprintf (stderr, "]\n");
424 fprintf (stderr, "cb_buf = [");
425 for (i=0; i<buffered_io_size; i++)
426 fprintf (stderr, "%c", cb_buf[i]);
427 fprintf (stderr, "]\n");
428 fflush (NULL);
429 }
430 #endif
431 }
432 else { /* Write Case */
433 #ifdef DEBUG
434 fprintf (stderr, "sending to [agg](disp,size,cnt)=");
435 for (i=0; i < nprocs; i++) {
436 MPI_Type_size_x (agg_comm_dtype_arr[i], &size);
437 fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i],
438 size, agg_alltoallw_counts[i]);
439 if (i != nprocs - 1)
440 fprintf(stderr, ",");
441 }
442 fprintf (stderr, "]\n");
443 fprintf (stderr, "expecting from [client](disp,size,cnt)=");
444 for (i=0; i < nprocs; i++) {
445 if (fd->is_agg)
446 MPI_Type_size_x (client_comm_dtype_arr[i], &size);
447 else
448 size = -1;
449
450 fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i],
451 size, client_alltoallw_counts[i]);
452 if (i != nprocs - 1)
453 fprintf(stderr, ",");
454 }
455 fprintf (stderr,"\n");
456 fflush (NULL);
457 #endif
458 #ifdef DEBUG
459 fprintf (stderr, "buffered_io_size = %lld\n", buffered_io_size);
460 #endif
461
462 if (clients_agg_count) {
463 client_comm_statuses = ADIOI_Malloc(clients_agg_count *
464 sizeof(MPI_Status));
465 MPI_Waitall(clients_agg_count, client_comm_requests,
466 client_comm_statuses);
467 #ifdef AGGREGATION_PROFILE
468 MPE_Log_event (5039, 0, NULL);
469 #endif
470 ADIOI_Free(client_comm_requests);
471 ADIOI_Free(client_comm_statuses);
472 }
473 #ifdef DEBUG2
474 if (bufextent) {
475 fprintf (stderr, "buf = [");
476 for (i=0; i<bufextent; i++)
477 fprintf (stderr, "%c", ((char *) buf)[i]);
478 fprintf (stderr, "]\n");
479 }
480 #endif
481
482 if (fd->is_agg && buffered_io_size) {
483 assert (aggs_client_count != 0);
484 /* make sure we actually have the data to write out */
485 agg_comm_statuses = (MPI_Status *)
486 ADIOI_Malloc (aggs_client_count*sizeof(MPI_Status));
487
488 MPI_Waitall (aggs_client_count, agg_comm_requests,
489 agg_comm_statuses);
490 #ifdef AGGREGATION_PROFILE
491 MPE_Log_event (5033, 0, NULL);
492 #endif
493 ADIOI_Free (agg_comm_requests);
494 ADIOI_Free (agg_comm_statuses);
495 #ifdef DEBUG2
496 fprintf (stderr, "cb_buf = [");
497 for (i=0; i<buffered_io_size; i++)
498 fprintf (stderr, "%c", cb_buf[i]);
499 fprintf (stderr, "]\n");
500 fflush (NULL);
501 #endif
502 ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
503 ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
504 ADIOI_WRITE, status, error_code);
505 if (*error_code != MPI_SUCCESS) return;
506 MPI_Type_free (&agg_dtype);
507 }
508
509 }
510 } else {
511 /* Alltoallw version of everything */
512 ADIOI_Build_client_reqs(fd, nprocs, my_mem_view_state_arr,
513 agg_file_view_state_arr,
514 agg_comm_sz_arr, agg_comm_dtype_arr);
515
516 if (rdwr == ADIOI_READ) {
517 if (fd->is_agg && buffered_io_size) {
518 ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
519 ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
520 ADIOI_READ, status, error_code);
521 if (*error_code != MPI_SUCCESS) return;
522 MPI_Type_free (&agg_dtype);
523 }
524
525 #ifdef AGGREGATION_PROFILE
526 MPE_Log_event (5032, 0, NULL);
527 #endif
528 MPI_Alltoallw (cb_buf, client_alltoallw_counts, alltoallw_disps,
529 client_comm_dtype_arr,
530 buf, agg_alltoallw_counts , alltoallw_disps,
531 agg_comm_dtype_arr,
532 fd->comm);
533 #ifdef AGGREGATION_PROFILE
534 MPE_Log_event (5033, 0, NULL);
535 #endif
536 }
537 else { /* Write Case */
538 #ifdef AGGREGATION_PROFILE
539 MPE_Log_event (5032, 0, NULL);
540 #endif
541 MPI_Alltoallw (buf, agg_alltoallw_counts, alltoallw_disps,
542 agg_comm_dtype_arr,
543 cb_buf, client_alltoallw_counts, alltoallw_disps,
544 client_comm_dtype_arr,
545 fd->comm);
546 #ifdef AGGREGATION_PROFILE
547 MPE_Log_event (5033, 0, NULL);
548 #endif
549 if (fd->is_agg && buffered_io_size) {
550 ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
551 ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
552 ADIOI_WRITE, status, error_code);
553 if (*error_code != MPI_SUCCESS) return;
554 MPI_Type_free (&agg_dtype);
555 }
556 }
557 }
558
559 /* Free (uncommit) datatypes for reuse */
560 if (fd->is_agg) {
561 if (buffered_io_size > 0) {
562 for (i=0; i<nprocs; i++) {
563 if (client_comm_sz_arr[i] > 0)
564 MPI_Type_free (&client_comm_dtype_arr[i]);
565 }
566 }
567 }
568 for (i=0; i<nprocs; i++) {
569 if (agg_comm_sz_arr[i] > 0)
570 MPI_Type_free (&agg_comm_dtype_arr[i]);
571 }
572
573 /* figure out next set up requests */
574 if (fd->is_agg) {
575 ADIOI_Build_agg_reqs (fd, rdwr, nprocs,
576 client_file_view_state_arr,
577 client_comm_dtype_arr,
578 client_comm_sz_arr,
579 &agg_disp,
580 &agg_dtype);
581 buffered_io_size = 0;
582 for (i=0; i <nprocs; i++) {
583 if (client_comm_sz_arr[i] > 0)
584 buffered_io_size += client_comm_sz_arr[i];
585 }
586 }
587 #ifdef USE_PRE_REQ
588 else {
589 /* Example use of ADIOI_Build_client_pre_req. to an
590 * appropriate section */
591 for (i = 0; i < fd->hints->cb_nodes; i++)
592 {
593 agg_rank = fd->hints->ranklist[(i+myrank)%fd->hints->cb_nodes];
594 #ifdef AGGREGATION_PROFILE
595 MPE_Log_event (5040, 0, NULL);
596 #endif
597 ADIOI_Build_client_pre_req(
598 fd, agg_rank, (i+myrank)%fd->hints->cb_nodes,
599 &(my_mem_view_state_arr[agg_rank]),
600 &(agg_file_view_state_arr[agg_rank]),
601 2*1024*1024,
602 64*1024);
603 #ifdef AGGREGATION_PROFILE
604 MPE_Log_event (5041, 0, NULL);
605 #endif
606 }
607 }
608 #endif
609
610 /* aggregators pre-post all Irecv's for incoming data from
611 * clients. if nothing is needed, agg_comm_requests is not
612 * allocated */
613 if (fd->hints->cb_alltoall == ADIOI_HINT_DISABLE) {
614 if ((fd->is_agg) && (rdwr == ADIOI_WRITE))
615 post_aggregator_comm(fd->comm, rdwr, nprocs, cb_buf,
616 client_comm_dtype_arr,
617 client_comm_sz_arr,
618 &agg_comm_requests,
619 &aggs_client_count);
620 }
621
622 /* Aggregators send amounts for data requested to clients */
623 Exch_data_amounts (fd, nprocs, client_comm_sz_arr, agg_comm_sz_arr,
624 client_alltoallw_counts, agg_alltoallw_counts,
625 &aggregators_done);
626
627 }
628
629 /* Clean up */
630
631 if (fd->hints->cb_pfr != ADIOI_HINT_ENABLE) {
632 /* AAR, FSIZE, and User provided uniform File realms */
633 if (1) {
634 ADIOI_Delete_flattened (fd->file_realm_types[0]);
635 MPI_Type_free (&fd->file_realm_types[0]);
636 }
637 else {
638 for (i=0; i<fd->hints->cb_nodes; i++) {
639 ADIOI_Datatype_iscontig(fd->file_realm_types[i], &is_contig);
640 if (!is_contig)
641 ADIOI_Delete_flattened(fd->file_realm_types[i]);
642 MPI_Type_free (&fd->file_realm_types[i]);
643 }
644 }
645 ADIOI_Free (fd->file_realm_types);
646 ADIOI_Free (fd->file_realm_st_offs);
647 }
648
649 /* This memtype must be deleted from the ADIOI_Flatlist or else it
650 * will match incorrectly with other datatypes which use this
651 * pointer. */
652 ADIOI_Delete_flattened(datatype);
653 ADIOI_Delete_flattened(fd->filetype);
654
655 if (fd->is_agg) {
656 if (buffered_io_size > 0)
657 MPI_Type_free (&agg_dtype);
658 for (i=0; i<nprocs; i++) {
659 MPI_Type_free (&client_comm_dtype_arr[i]);
660 ADIOI_Free (client_file_view_state_arr[i].flat_type_p->indices);
661 ADIOI_Free (client_file_view_state_arr[i].flat_type_p->blocklens);
662 ADIOI_Free (client_file_view_state_arr[i].flat_type_p);
663 }
664 ADIOI_Free (client_file_view_state_arr);
665 ADIOI_Free (cb_buf);
666 }
667 for (i = 0; i<nprocs; i++)
668 if (agg_comm_sz_arr[i] > 0)
669 MPI_Type_free (&agg_comm_dtype_arr[i]);
670
671 ADIOI_Free (client_comm_sz_arr);
672 ADIOI_Free (client_comm_dtype_arr);
673 ADIOI_Free (my_mem_view_state_arr);
674 ADIOI_Free (agg_file_view_state_arr);
675 ADIOI_Free (agg_comm_sz_arr);
676 ADIOI_Free (agg_comm_dtype_arr);
677 ADIOI_Free (alltoallw_disps);
678 ADIOI_Free (alltoallw_counts);
679 ADIOI_Free (all_st_end_offsets);
680
681 #ifdef HAVE_STATUS_SET_BYTES
682 MPIR_Status_set_bytes(status, datatype, bufsize);
683 /* This is a temporary way of filling in status. The right way is
684 * to keep track of how much data was actually read and placed in
685 * buf during collective I/O. */
686 #endif
687 fd->fp_sys_posn = -1; /* set it to null. */
688 #ifdef AGGREGATION_PROFILE
689 if (rdwr == ADIOI_READ)
690 MPE_Log_event (5011, 0, NULL);
691 else
692 MPE_Log_event (5013, 0, NULL);
693 #endif
694 }
695
696
697 /* Some of this code is from the old Calc_my_off_len() function.
698 * It calculates the 1st and last byte accessed */
ADIOI_Calc_bounds(ADIO_File fd,int count,MPI_Datatype buftype,int file_ptr_type,ADIO_Offset offset,ADIO_Offset * st_offset,ADIO_Offset * end_offset)699 void ADIOI_Calc_bounds (ADIO_File fd, int count, MPI_Datatype buftype,
700 int file_ptr_type, ADIO_Offset offset,
701 ADIO_Offset *st_offset, ADIO_Offset *end_offset)
702 {
703 MPI_Count filetype_size, buftype_size, etype_size;
704 int sum;
705 MPI_Aint filetype_extent, lb;
706 ADIO_Offset total_io;
707 int filetype_is_contig;
708 ADIO_Offset i, remainder;
709 ADIOI_Flatlist_node *flat_file;
710
711 ADIO_Offset st_byte_off, end_byte_off;
712
713 #ifdef AGGREGATION_PROFILE
714 MPE_Log_event (5000, 0, NULL);
715 #endif
716
717 if (!count) {
718 /* Max signed positive value for ADIO_Offset
719 * (arch. dependent?). is there a better way? */
720 memset (st_offset, 8, sizeof(ADIO_Offset));
721 *st_offset = *st_offset / 2;
722 *end_offset = -1;
723 return;
724 }
725
726 ADIOI_Datatype_iscontig (fd->filetype, &filetype_is_contig);
727
728 MPI_Type_size_x (fd->filetype, &filetype_size);
729 MPI_Type_get_extent (fd->filetype, &lb, &filetype_extent);
730 MPI_Type_size_x (fd->etype, &etype_size);
731 MPI_Type_size_x (buftype, &buftype_size);
732
733 total_io = buftype_size * count;
734
735 if (filetype_is_contig) {
736 if (file_ptr_type == ADIO_INDIVIDUAL)
737 st_byte_off = fd->fp_ind;
738 else
739 st_byte_off = fd->disp + etype_size * offset;
740
741 end_byte_off = st_byte_off + total_io - 1;
742 }
743 else {
744 flat_file = ADIOI_Flatlist;
745 while (flat_file->type != fd->filetype) flat_file = flat_file->next;
746
747 /* we need to take care of some weirdness since fd->fp_ind
748 points at an accessible byte in file. the first accessible
749 byte in the file is not necessarily the first byte, nor is
750 it necessarily the first off/len pair in the filetype. */
751 if (file_ptr_type == ADIO_INDIVIDUAL) {
752 st_byte_off = fd->fp_ind;
753 /* find end byte of I/O (may be in middle of an etype) */
754
755 /* calculate byte starting point of first filetype */
756 end_byte_off = (ADIO_Offset)
757 ((fd->fp_ind - fd->disp - flat_file->indices[0]) /
758 filetype_extent) * filetype_extent + fd->disp +
759 flat_file->indices[0];
760 /* number of absolute bytes into first filetype */
761 remainder = (fd->fp_ind - fd->disp - flat_file->indices[0]) %
762 filetype_extent;
763 if (remainder) {
764 /* find how many file viewable bytes into first filetype */
765 sum = 0;
766 for (i=0; i<flat_file->count; i++) {
767 sum += flat_file->blocklens[i];
768 if ((flat_file->indices[i] - flat_file->indices[0] +
769 flat_file->blocklens[i]) >= remainder) {
770 sum -= (flat_file->blocklens[i] - (sum - remainder));
771 break;
772 }
773 }
774 total_io += sum;
775 }
776 /* byte starting point of last filetype */
777 end_byte_off += (total_io - 1) / filetype_size * filetype_extent;
778 /* number of bytes into last filetype */
779 remainder = total_io % filetype_size;
780 if (!remainder) {
781 for (i=flat_file->count - 1; i>=0; i--) {
782 if (flat_file->blocklens[i]) break;
783 }
784 assert (i > -1);
785 end_byte_off += flat_file->indices[i] +
786 flat_file->blocklens[i] - 1;
787 end_byte_off -= flat_file->indices[0];
788 }
789 else {
790 sum = 0;
791 for (i=0; i<flat_file->count; i++) {
792 sum += flat_file->blocklens[i];
793 if (sum >= remainder) {
794 end_byte_off += flat_file->indices[i] +
795 flat_file->blocklens[i] - sum + remainder - 1;
796 break;
797 }
798 }
799 end_byte_off -= flat_file->indices[0];
800 }
801 }
802 else {
803 /* find starting byte of I/O (must be aligned with an etype) */
804 /* byte starting point of starting filetype */
805 st_byte_off = fd->disp + ((offset * etype_size) / filetype_size) *
806 filetype_extent;
807 /* number of file viewable bytes into starting filetype */
808 remainder = (etype_size * offset) % filetype_size;
809
810 sum = 0;
811 for (i=0; i<flat_file->count; i++) {
812 sum += flat_file->blocklens[i];
813 if (sum >= remainder) {
814 if (sum == remainder)
815 st_byte_off += flat_file->indices[i+1];
816 else
817 st_byte_off += flat_file->indices[i] +
818 flat_file->blocklens[i] - sum + remainder;
819 break;
820 }
821 }
822
823 /* find end byte of I/O (may be in middle of an etype) */
824 /* byte starting point of last filetype */
825 end_byte_off = fd->disp + (offset * etype_size + total_io) /
826 filetype_size * filetype_extent;
827 /* number of bytes into last filetype */
828 remainder = (offset * etype_size + total_io) % filetype_size;
829
830 if (!remainder) {
831 /* the last non-zero off/len pair */
832 for (i=flat_file->count-1; i>=0; i--) {
833 if (flat_file->blocklens[i]) break;
834 }
835 assert (i >= 0);
836 /* back up a whole filetype, and put back up to the
837 * last byte of the last non-zero offlen pair */
838 /* end_byte_off = (end_byte_off - filetype_extent) +
839 flat_file->indices[i] +
840 flat_file->blocklens[i] - 1; */
841 /* equivalent of above commented out equation */
842 end_byte_off -= filetype_extent - flat_file->indices[i] -
843 flat_file->blocklens[i] + 1;
844 }
845 else {
846 sum = 0;
847 for (i=0; i<flat_file->count; i++) {
848 sum += flat_file->blocklens[i];
849 if (sum >= remainder) {
850 end_byte_off += flat_file->indices[i] +
851 flat_file->blocklens[i] - sum + remainder - 1;
852 break;
853 }
854 }
855 }
856 }
857 }
858
859 *st_offset = st_byte_off;
860 *end_offset = end_byte_off;
861 #ifdef DEBUG
862 printf ("st_offset = %lld\nend_offset = %lld\n",
863 st_byte_off, end_byte_off);
864 #endif
865 #ifdef AGGREGATION_PROFILE
866 MPE_Log_event (5001, 0, NULL);
867 #endif
868 }
869
870 /* wrapper function for ADIO_WriteStrided and ADIO_ReadStrided. Used
871 * by new 2 phase code to pass an arbitrary file type directly to
872 * WriteStrided call without affecting existing code. For the new 2
873 * phase code, we really only need to set a custom_ftype, and we can
874 * assume that this uses MPI_BYTE for the etype, and disp is 0 */
ADIOI_IOFiletype(ADIO_File fd,void * buf,int count,MPI_Datatype datatype,int file_ptr_type,ADIO_Offset offset,MPI_Datatype custom_ftype,int rdwr,ADIO_Status * status,int * error_code)875 void ADIOI_IOFiletype(ADIO_File fd, void *buf, int count,
876 MPI_Datatype datatype, int file_ptr_type,
877 ADIO_Offset offset, MPI_Datatype custom_ftype,
878 int rdwr, ADIO_Status *status, int *error_code)
879 {
880 MPI_Datatype user_filetype;
881 MPI_Datatype user_etype;
882 ADIO_Offset user_disp;
883 int user_ind_wr_buffer_size;
884 int user_ind_rd_buffer_size;
885 int f_is_contig, m_is_contig;
886 int user_ds_read, user_ds_write;
887 MPI_Aint f_extent, lb;
888 MPI_Count f_size;
889 int f_ds_percent; /* size/extent */
890
891 #ifdef AGGREGATION_PROFILE
892 if (rdwr == ADIOI_READ)
893 MPE_Log_event(5006, 0, NULL);
894 else
895 MPE_Log_event(5008, 0, NULL);
896 #endif
897 MPI_Type_get_extent(custom_ftype, &lb, &f_extent);
898 MPI_Type_size_x(custom_ftype, &f_size);
899 f_ds_percent = 100 * f_size / f_extent;
900
901 /* temporarily store file view information */
902 user_filetype = fd->filetype;
903 user_etype = fd->etype;
904 user_disp = fd->disp;
905 user_ds_read = fd->hints->ds_read;
906 user_ds_write = fd->hints->ds_write;
907 /* temporarily override the independent I/O datasieve buffer size */
908 user_ind_wr_buffer_size = fd->hints->ind_wr_buffer_size;
909 user_ind_rd_buffer_size = fd->hints->ind_rd_buffer_size;
910
911 /* set new values for temporary file view */
912 fd->filetype = custom_ftype;
913 fd->etype = MPI_BYTE;
914 /* set new values for independent I/O datasieve buffer size */
915 fd->hints->ind_wr_buffer_size = fd->hints->cb_buffer_size;
916 fd->hints->ind_rd_buffer_size = fd->hints->cb_buffer_size;
917 /* decide whether or not to do datasieving */
918 #ifdef DEBUG
919 printf ("f_ds_percent = %d cb_ds_threshold = %d\n", f_ds_percent,
920 fd->hints->cb_ds_threshold);
921 #endif
922 if (f_ds_percent >= fd->hints->cb_ds_threshold) {
923 fd->hints->ds_read = ADIOI_HINT_ENABLE;
924 fd->hints->ds_write = ADIOI_HINT_ENABLE;
925 }
926 else {
927 fd->hints->ds_read = ADIOI_HINT_DISABLE;
928 fd->hints->ds_write = ADIOI_HINT_DISABLE;
929 }
930
931 /* flatten the new filetype since the strided calls expect it to
932 * have been flattened in set file view. in the two phase code,
933 * the datatype passed down should always be MPI_BYTE, and
934 * therefore contiguous, but just for completeness sake, we'll
935 * check the memory datatype anyway */
936 ADIOI_Datatype_iscontig(custom_ftype, &f_is_contig);
937 ADIOI_Datatype_iscontig(datatype, &m_is_contig);
938 if (!f_is_contig)
939 ADIOI_Flatten_datatype (custom_ftype);
940
941 /* make appropriate Read/Write calls. Let ROMIO figure out file
942 * system specific stuff. */
943 if (f_is_contig && m_is_contig) {
944 fd->disp = 0;
945 if (rdwr == ADIOI_READ)
946 ADIO_ReadContig(fd, buf, count, datatype, file_ptr_type, offset,
947 status, error_code);
948 else
949 ADIO_WriteContig(fd, buf, count, datatype, file_ptr_type, offset,
950 status, error_code);
951 }
952 else {
953 fd->disp = offset;
954 if (rdwr == ADIOI_READ)
955 ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type, 0,
956 status, error_code);
957 else
958 ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type, 0,
959 status, error_code);
960 }
961
962 /* Delete flattened temporary filetype */
963 if (!f_is_contig)
964 ADIOI_Delete_flattened (custom_ftype);
965
966 /* restore the user specified file view to cover our tracks */
967 fd->filetype = user_filetype;
968 fd->etype = user_etype;
969 fd->disp = user_disp;
970 fd->hints->ds_read = user_ds_read;
971 fd->hints->ds_write = user_ds_write;
972 fd->hints->ind_wr_buffer_size = user_ind_wr_buffer_size;
973 fd->hints->ind_rd_buffer_size = user_ind_rd_buffer_size;
974 #ifdef AGGREGATION_PROFILE
975 if (rdwr == ADIOI_READ)
976 MPE_Log_event (5007, 0, NULL);
977 else
978 MPE_Log_event (5009, 0, NULL);
979 #endif
980 }
981
Exch_data_amounts(ADIO_File fd,int nprocs,ADIO_Offset * client_comm_sz_arr,ADIO_Offset * agg_comm_sz_arr,int * client_alltoallw_counts,int * agg_alltoallw_counts,int * aggregators_done)982 static void Exch_data_amounts (ADIO_File fd, int nprocs,
983 ADIO_Offset *client_comm_sz_arr,
984 ADIO_Offset *agg_comm_sz_arr,
985 int *client_alltoallw_counts,
986 int *agg_alltoallw_counts,
987 int *aggregators_done)
988 {
989 int i;
990 int recv_idx;
991 MPI_Request *recv_requests;
992 MPI_Request *send_requests;
993 MPI_Status status;
994 MPI_Status *send_statuses;
995 /* Aggregators send amounts for data requested to clients */
996 if (fd->hints->cb_alltoall != ADIOI_HINT_DISABLE) {
997 MPI_Alltoall (client_comm_sz_arr, sizeof(ADIO_Offset), MPI_BYTE,
998 agg_comm_sz_arr, sizeof(ADIO_Offset), MPI_BYTE,
999 fd->comm);
1000
1001 if (fd->is_agg) {
1002 for (i=0; i<nprocs; i++)
1003 if (client_comm_sz_arr[i] > 0)
1004 client_alltoallw_counts[i] = 1;
1005 else
1006 client_alltoallw_counts[i] = 0;
1007 }
1008 *aggregators_done = 0;
1009 for (i=0; i<nprocs; i++) {
1010 if (agg_comm_sz_arr[i] == -1)
1011 *aggregators_done = *aggregators_done + 1;
1012 else if (agg_comm_sz_arr[i] > 0)
1013 agg_alltoallw_counts[i] = 1;
1014 else
1015 agg_alltoallw_counts[i] = 0;
1016 }
1017 } else {
1018 /* let's see if we can't reduce some communication as well as
1019 * overlap some communication and work */
1020
1021 recv_requests = ADIOI_Malloc (fd->hints->cb_nodes * sizeof(MPI_Request));
1022 /* post all receives - only receive from aggregators */
1023 for (i = 0; i < fd->hints->cb_nodes; i++)
1024 MPI_Irecv (&agg_comm_sz_arr[fd->hints->ranklist[i]],
1025 sizeof(ADIO_Offset), MPI_BYTE, fd->hints->ranklist[i],
1026 AMT_TAG, fd->comm, &recv_requests[i]);
1027
1028 /* Barrier is needed here if we're worried about unexpected
1029 * messages being dropped */
1030 /* MPI_Barrier (fd->comm); */
1031 send_requests = NULL;
1032 if (fd->is_agg) {
1033 /* only aggregators send data */
1034 send_requests = ADIOI_Malloc (nprocs * sizeof(MPI_Request));
1035
1036 /* post all sends */
1037 for (i = 0; i < nprocs; i++) {
1038 MPI_Isend (&client_comm_sz_arr[i], sizeof(ADIO_Offset),
1039 MPI_BYTE, i, AMT_TAG, fd->comm, &send_requests[i]);
1040
1041 if (client_comm_sz_arr[i] > 0)
1042 client_alltoallw_counts[i] = 1;
1043 else
1044 client_alltoallw_counts[i] = 0;
1045 }
1046 }
1047
1048 *aggregators_done = 0;
1049 for (i=0; i < fd->hints->cb_nodes; i++) {
1050 MPI_Waitany (fd->hints->cb_nodes, recv_requests, &recv_idx, &status);
1051 if (agg_comm_sz_arr[fd->hints->ranklist[recv_idx]] == -1)
1052 *aggregators_done = *aggregators_done + 1;
1053 else if (agg_comm_sz_arr[fd->hints->ranklist[recv_idx]] > 0)
1054 agg_alltoallw_counts[fd->hints->ranklist[recv_idx]] = 1;
1055 else
1056 agg_alltoallw_counts[fd->hints->ranklist[recv_idx]] = 0;
1057 }
1058
1059 ADIOI_Free (recv_requests);
1060 if (fd->is_agg) {
1061 /* wait for all sends to complete */
1062 send_statuses = ADIOI_Malloc (nprocs * sizeof (MPI_Status));
1063 MPI_Waitall (nprocs, send_requests, send_statuses);
1064 ADIOI_Free (send_requests);
1065 ADIOI_Free (send_statuses);
1066 }
1067 }
1068 }
1069
post_aggregator_comm(MPI_Comm comm,int rw_type,int nproc,void * cb_buf,MPI_Datatype * client_comm_dtype_arr,ADIO_Offset * client_comm_sz_arr,MPI_Request ** requests_p,int * aggs_client_count_p)1070 static void post_aggregator_comm (MPI_Comm comm, int rw_type,
1071 int nproc, void *cb_buf,
1072 MPI_Datatype *client_comm_dtype_arr,
1073 ADIO_Offset *client_comm_sz_arr,
1074 MPI_Request **requests_p,
1075 int *aggs_client_count_p)
1076 {
1077 int aggs_client_count = 0;
1078 MPI_Request *requests;
1079 int i;
1080
1081 #ifdef DEBUG
1082 printf ("posting aggregator communication\n");
1083 #endif
1084
1085 for (i=0; i < nproc; i++)
1086 if (client_comm_sz_arr[i] > 0)
1087 aggs_client_count++;
1088 #ifdef DEBUG
1089 printf ("aggregator needs to talk to %d clients\n",
1090 aggs_client_count);
1091 #endif
1092 *aggs_client_count_p = aggs_client_count;
1093 if (aggs_client_count) {
1094 requests = (MPI_Request *)
1095 ADIOI_Malloc (aggs_client_count * sizeof(MPI_Request));
1096 aggs_client_count = 0;
1097 #ifdef AGGREGATION_PROFILE
1098 MPE_Log_event (5032, 0, NULL);
1099 #endif
1100 for (i=0; i < nproc; i++) {
1101 if (client_comm_sz_arr[i] > 0) {
1102 if (rw_type == ADIOI_WRITE)
1103 MPI_Irecv (cb_buf, 1, client_comm_dtype_arr[i], i,
1104 DATA_TAG, comm,
1105 &requests[aggs_client_count]);
1106 else
1107 MPI_Isend (cb_buf, 1, client_comm_dtype_arr[i], i,
1108 DATA_TAG, comm,
1109 &requests[aggs_client_count]);
1110
1111 aggs_client_count++;
1112 }
1113 }
1114 *requests_p = requests;
1115 }
1116 }
1117
post_client_comm(ADIO_File fd,int rw_type,int agg_rank,void * buf,MPI_Datatype agg_comm_dtype,int agg_alltoallw_count,MPI_Request * request)1118 static void post_client_comm (ADIO_File fd, int rw_type,
1119 int agg_rank, void *buf,
1120 MPI_Datatype agg_comm_dtype,
1121 int agg_alltoallw_count,
1122 MPI_Request *request)
1123 {
1124 if (agg_alltoallw_count) {
1125 if (rw_type == ADIOI_READ)
1126 MPI_Irecv (buf, 1, agg_comm_dtype, agg_rank, DATA_TAG, fd->comm,
1127 request);
1128 else
1129 MPI_Isend (buf, 1, agg_comm_dtype, agg_rank, DATA_TAG, fd->comm,
1130 request);
1131 }
1132 }
1133
1134
1135
1136