1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  *   Copyright (C) 2008 University of Chicago.
4  *   See COPYRIGHT notice in top-level directory.
5  */
6 
7 #include "assert.h"
8 #include "adio.h"
9 #include "adio_extern.h"
10 #ifdef AGGREGATION_PROFILE
11 #include "mpe.h"
12 #endif
13 
14 /* #define ALLTOALL */
15 
16 /* #define DEBUG */
17 /* #define DEBUG2 */  /* print buffers */
18 
19 #define USE_PRE_REQ
20 
21 static void Exch_data_amounts (ADIO_File fd, int nprocs,
22 			ADIO_Offset *client_comm_sz_arr,
23 			ADIO_Offset *agg_comm_sz_arr,
24 			int *client_alltoallw_counts,
25 			int *agg_alltoallw_counts,
26 			int *aggregators_done);
27 static void post_aggregator_comm (MPI_Comm comm, int rw_type, int nproc,
28 			   void *cb_buf,
29 			   MPI_Datatype *client_comm_dtype_arr,
30 			   ADIO_Offset *client_comm_sz_arr,
31 			   MPI_Request **requests,
32 			   int *aggregators_client_count_p);
33 
34 static void post_client_comm (ADIO_File fd, int rw_type,
35 		       int agg_rank, void *buf,
36 		       MPI_Datatype agg_comm_dtype,
37 		       int agg_alltoallw_count,
38 		       MPI_Request *request);
39 
40 /* Avery Ching and Kenin Columa's reworked two-phase algorithm.  Key features
41  * - persistent file domains
42  * - an option to use alltoall instead of point-to-point
43  */
ADIOI_IOStridedColl(ADIO_File fd,void * buf,int count,int rdwr,MPI_Datatype datatype,int file_ptr_type,ADIO_Offset offset,ADIO_Status * status,int * error_code)44 void ADIOI_IOStridedColl (ADIO_File fd, void *buf, int count, int rdwr,
45 			  MPI_Datatype datatype, int file_ptr_type,
46 			  ADIO_Offset offset, ADIO_Status *status,
47 			  int *error_code)
48 {
49     ADIO_Offset min_st_offset=0, max_end_offset=0;
50     ADIO_Offset st_end_offset[2];
51     ADIO_Offset *all_st_end_offsets = NULL;
52     int filetype_is_contig, buftype_is_contig, is_contig;
53     ADIO_Offset off;
54     int interleave_count = 0, i, nprocs, myrank, nprocs_for_coll;
55     int cb_enable;
56     ADIO_Offset bufsize;
57     MPI_Aint extent, lb;
58 #ifdef DEBUG2
59     MPI_Aint bufextent;
60 #endif
61     MPI_Count size;
62     int agg_rank;
63 
64     ADIO_Offset agg_disp; /* aggregated file offset */
65     MPI_Datatype agg_dtype; /* aggregated file datatype */
66 
67     int aggregators_done = 0;
68     ADIO_Offset buffered_io_size = 0;
69 
70     int *alltoallw_disps;
71 
72     int *alltoallw_counts;
73     int *client_alltoallw_counts;
74     int *agg_alltoallw_counts;
75 
76     char *cb_buf = NULL;
77 
78     MPI_Datatype *client_comm_dtype_arr; /* aggregator perspective */
79     MPI_Datatype *agg_comm_dtype_arr;    /* client perspective */
80     ADIO_Offset *client_comm_sz_arr;     /* aggregator perspective */
81     ADIO_Offset *agg_comm_sz_arr;        /* client perspective */
82 
83     /* file views for each client and aggregator */
84     view_state *client_file_view_state_arr = NULL;
85     view_state *agg_file_view_state_arr    = NULL;
86     /* mem views for local process */
87     view_state *my_mem_view_state_arr      = NULL;
88 
89     MPI_Status *agg_comm_statuses     = NULL;
90     MPI_Request *agg_comm_requests    = NULL;
91     MPI_Status *client_comm_statuses  = NULL;
92     MPI_Request *client_comm_requests = NULL;
93     int aggs_client_count = 0;
94     int clients_agg_count = 0;
95 
96     MPI_Comm_size (fd->comm, &nprocs);
97     MPI_Comm_rank (fd->comm, &myrank);
98 #ifdef DEBUG
99     fprintf (stderr, "p%d: entering ADIOI_IOStridedColl\n", myrank);
100 #endif
101 #ifdef AGGREGATION_PROFILE
102     if (rdwr == ADIOI_READ)
103 	MPE_Log_event (5010, 0, NULL);
104     else
105 	MPE_Log_event (5012, 0, NULL);
106 #endif
107 
108     /* I need to check if there are any outstanding nonblocking writes
109        to the file, which could potentially interfere with the writes
110        taking place in this collective write call. Since this is not
111        likely to be common, let me do the simplest thing possible here:
112        Each process completes all pending nonblocking operations before
113        completing. */
114 
115     nprocs_for_coll = fd->hints->cb_nodes;
116 
117     if (rdwr == ADIOI_READ)
118 	cb_enable = fd->hints->cb_read;
119     else
120 	cb_enable = fd->hints->cb_write;
121 
122     /* only check for interleaving if cb_read isn't disabled */
123     if (cb_enable != ADIOI_HINT_DISABLE) {
124 	/* find the starting and ending byte of my I/O access */
125 	ADIOI_Calc_bounds (fd, count, datatype, file_ptr_type, offset,
126 			   &st_end_offset[0], &st_end_offset[1]);
127 
128 	/* allocate an array of start/end pairs */
129 	all_st_end_offsets = (ADIO_Offset *)
130 	    ADIOI_Malloc (2*nprocs*sizeof(ADIO_Offset));
131 	MPI_Allgather (st_end_offset, 2, ADIO_OFFSET, all_st_end_offsets, 2,
132 		       ADIO_OFFSET, fd->comm);
133 
134 	min_st_offset = all_st_end_offsets[0];
135 	max_end_offset = all_st_end_offsets[1];
136 
137 	for (i=1; i<nprocs; i++) {
138 	    /* are the accesses of different processes interleaved? */
139 	    if ((all_st_end_offsets[i*2] < all_st_end_offsets[i*2-1]) &&
140 		(all_st_end_offsets[i*2] <= all_st_end_offsets[i*2+1]))
141 		interleave_count++;
142 	    /* This is a rudimentary check for interleaving, but should
143 	     * suffice for the moment. */
144 
145 	    min_st_offset = ADIOI_MIN(all_st_end_offsets[i*2],
146 				      min_st_offset);
147 	    max_end_offset = ADIOI_MAX(all_st_end_offsets[i*2+1],
148 				       max_end_offset);
149 	}
150     }
151 
152     ADIOI_Datatype_iscontig (datatype, &buftype_is_contig);
153     ADIOI_Datatype_iscontig (fd->filetype, &filetype_is_contig);
154 
155     if ((cb_enable == ADIOI_HINT_DISABLE
156 	 || (!interleave_count && (cb_enable == ADIOI_HINT_AUTO)))
157 	&& (fd->hints->cb_pfr != ADIOI_HINT_ENABLE)){
158 	if (cb_enable != ADIOI_HINT_DISABLE) {
159 	    ADIOI_Free (all_st_end_offsets);
160 	}
161 
162 	if (buftype_is_contig && filetype_is_contig) {
163 	    if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
164 		off = fd->disp + (fd->etype_size) * offset;
165 		if (rdwr == ADIOI_READ)
166 		    ADIO_ReadContig(fd, buf, count, datatype,
167 				    ADIO_EXPLICIT_OFFSET, off, status,
168 				    error_code);
169 		else
170 		    ADIO_WriteContig(fd, buf, count, datatype,
171 				     ADIO_EXPLICIT_OFFSET, off, status,
172 				     error_code);
173 	    }
174 	    else {
175 		if (rdwr == ADIOI_READ)
176 		    ADIO_ReadContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
177 				    0, status, error_code);
178 		else
179 		    ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
180 				     0, status, error_code);
181 	    }
182 	}
183 	else {
184 	    if (rdwr == ADIOI_READ)
185 		ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type,
186 				 offset, status, error_code);
187 	    else
188 		ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
189 				  offset, status, error_code);
190 	}
191 	return;
192     }
193 
194     MPI_Type_get_extent(datatype, &lb, &extent);
195 #ifdef DEBUG2
196     bufextent = extent * count;
197 #endif
198     MPI_Type_size_x(datatype, &size);
199     bufsize = size * (MPI_Count)count;
200 
201     /* Calculate file realms */
202     if ((fd->hints->cb_pfr != ADIOI_HINT_ENABLE) ||
203 	(fd->file_realm_types == NULL))
204 	ADIOI_Calc_file_realms (fd, min_st_offset, max_end_offset);
205 
206     my_mem_view_state_arr = (view_state *)
207 	ADIOI_Calloc (1, nprocs * sizeof(view_state));
208     agg_file_view_state_arr = (view_state *)
209 	ADIOI_Calloc (1, nprocs * sizeof(view_state));
210     client_comm_sz_arr = (ADIO_Offset *)
211 	ADIOI_Calloc (1, nprocs * sizeof(ADIO_Offset));
212 
213     if (fd->is_agg) {
214 	client_file_view_state_arr = (view_state *)
215 	    ADIOI_Calloc (1, nprocs * sizeof(view_state));
216     }
217     else {
218 	client_file_view_state_arr = NULL;
219     }
220 
221     /* Alltoallw doesn't like a null array even if the counts are
222      * zero.  If you do not include this code, it will fail. */
223     client_comm_dtype_arr = (MPI_Datatype *)
224 	ADIOI_Calloc (1, nprocs * sizeof(MPI_Datatype));
225     if (!fd->is_agg)
226 	for (i = 0; i < nprocs; i++)
227 	    client_comm_dtype_arr[i] = MPI_BYTE;
228 
229     ADIOI_Exch_file_views (myrank, nprocs, file_ptr_type, fd, count,
230 			   datatype, offset, my_mem_view_state_arr,
231 			   agg_file_view_state_arr,
232 			   client_file_view_state_arr);
233 
234     agg_comm_sz_arr = (ADIO_Offset *)
235 	ADIOI_Calloc (1, nprocs * sizeof(ADIO_Offset));
236     agg_comm_dtype_arr = (MPI_Datatype *)
237 	ADIOI_Malloc (nprocs * sizeof(MPI_Datatype));
238     if (fd->is_agg) {
239 	ADIOI_Build_agg_reqs (fd, rdwr, nprocs,
240 			      client_file_view_state_arr,
241 			      client_comm_dtype_arr,
242 			      client_comm_sz_arr,
243 			      &agg_disp,
244 			      &agg_dtype);
245 	buffered_io_size = 0;
246 	for (i=0; i <nprocs; i++) {
247 	    if (client_comm_sz_arr[i] > 0)
248 		buffered_io_size += client_comm_sz_arr[i];
249 	}
250     }
251 #ifdef USE_PRE_REQ
252     else
253     {
254 	/* Example use of ADIOI_Build_client_pre_req. to an
255 	 * appropriate section */
256 
257 	for (i = 0; i < fd->hints->cb_nodes; i++)
258 	{
259 	    agg_rank = fd->hints->ranklist[(i+myrank)%fd->hints->cb_nodes];
260 #ifdef AGGREGATION_PROFILE
261 	    MPE_Log_event (5040, 0, NULL);
262 #endif
263 	    ADIOI_Build_client_pre_req(
264 		fd, agg_rank, (i+myrank)%fd->hints->cb_nodes,
265 		&(my_mem_view_state_arr[agg_rank]),
266 		&(agg_file_view_state_arr[agg_rank]),
267 		2*1024*1024,
268 		64*1024);
269 #ifdef AGGREGATION_PROFILE
270 	    MPE_Log_event (5041, 0, NULL);
271 #endif
272 	}
273     }
274 #endif
275 
276 
277     if (fd->is_agg)
278 	cb_buf = (char *) ADIOI_Malloc (fd->hints->cb_buffer_size);
279     alltoallw_disps  = (int *) ADIOI_Calloc (nprocs, sizeof(int));
280     alltoallw_counts = client_alltoallw_counts = (int *)
281 	ADIOI_Calloc (2*nprocs, sizeof(int));
282     agg_alltoallw_counts = &alltoallw_counts[nprocs];
283 
284     if (fd->hints->cb_alltoall == ADIOI_HINT_DISABLE) {
285         /* aggregators pre-post all Irecv's for incoming data from clients */
286         if ((fd->is_agg) && (rdwr == ADIOI_WRITE))
287 	    post_aggregator_comm(fd->comm, rdwr, nprocs, cb_buf,
288 			     client_comm_dtype_arr,
289 			     client_comm_sz_arr,
290 			     &agg_comm_requests,
291 			     &aggs_client_count);
292     }
293     /* Aggregators send amounts for data requested to clients */
294     Exch_data_amounts (fd, nprocs, client_comm_sz_arr, agg_comm_sz_arr,
295 		       client_alltoallw_counts, agg_alltoallw_counts,
296 		       &aggregators_done);
297 
298 #ifdef DEBUG
299     fprintf (stderr, "client_alltoallw_counts[ ");
300     for (i=0; i<nprocs; i++) {
301 	fprintf (stderr, "%d ", client_alltoallw_counts[i]);
302     }
303     fprintf (stderr, "]\n");
304     fprintf (stderr, "agg_alltoallw_counts[ ");
305     for (i=0; i<nprocs; i++) {
306 	fprintf (stderr,"%d ", agg_alltoallw_counts[i]);
307     }
308     fprintf (stderr, "]\n");
309 #endif
310 
311     /* keep looping while aggregators still have I/O to do */
312     while (aggregators_done != nprocs_for_coll) {
313 	if (fd->hints->cb_alltoall == ADIOI_HINT_DISABLE) {
314 	/* clients should build datatypes for local memory locations
315 	   for data communication with aggregators and post
316 	   communication as the datatypes are built */
317 
318 	client_comm_requests = (MPI_Request *)
319 	    ADIOI_Calloc (fd->hints->cb_nodes, sizeof(MPI_Request));
320 
321 	for (i = 0; i < fd->hints->cb_nodes; i++)
322 	{
323 	    clients_agg_count = 0;
324 	    agg_rank = fd->hints->ranklist[(i+myrank)%fd->hints->cb_nodes];
325 	    if (agg_comm_sz_arr[agg_rank] > 0) {
326 	        ADIOI_Build_client_req(fd, agg_rank,
327 				       (i+myrank)%fd->hints->cb_nodes,
328 				       &(my_mem_view_state_arr[agg_rank]),
329 				       &(agg_file_view_state_arr[agg_rank]),
330 				       agg_comm_sz_arr[agg_rank],
331 				       &(agg_comm_dtype_arr[agg_rank]));
332 
333 #ifdef AGGREGATION_PROFILE
334 		if (i == 0)
335 		    MPE_Log_event (5038, 0, NULL);
336 #endif
337 		post_client_comm (fd, rdwr, agg_rank, buf,
338 				  agg_comm_dtype_arr[agg_rank],
339 				  agg_alltoallw_counts[agg_rank],
340 				  &client_comm_requests[clients_agg_count]);
341 		clients_agg_count++;
342 	    }
343 	}
344 #ifdef AGGREGATION_PROFILE
345 	if (!clients_agg_count)
346 	    MPE_Log_event(5039, 0, NULL);
347 #endif
348 
349 	if (rdwr == ADIOI_READ) {
350 	    if (fd->is_agg && buffered_io_size) {
351 		ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
352 				  ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
353 				  ADIOI_READ, status, error_code);
354 		if (*error_code != MPI_SUCCESS) return;
355 		MPI_Type_free (&agg_dtype);
356 	    }
357 
358 #ifdef DEBUG
359 	    fprintf (stderr, "expecting from [agg](disp,size,cnt)=");
360 	    for (i=0; i < nprocs; i++) {
361 		MPI_Type_size_x (agg_comm_dtype_arr[i], &size);
362 		fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i],
363 			 size, agg_alltoallw_counts[i]);
364 		if (i != nprocs - 1)
365 		    fprintf(stderr, ",");
366 	    }
367 	    fprintf (stderr, "]\n");
368 	    if (fd->is_agg) {
369 		fprintf (stderr, "sending to [client](disp,size,cnt)=");
370 		for (i=0; i < nprocs; i++) {
371 		    if (fd->is_agg)
372 			MPI_Type_size_x (client_comm_dtype_arr[i], &size);
373 		    else
374 			size = -1;
375 
376 		    fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i],
377 			     size, client_alltoallw_counts[i]);
378 		    if (i != nprocs - 1)
379 			fprintf(stderr, ",");
380 		}
381 		fprintf (stderr,"\n");
382 	    }
383 	    fflush (NULL);
384 #endif
385 	    /* aggregators post all Isends for outgoing data to clients */
386 	    if (fd->is_agg)
387 		post_aggregator_comm(fd->comm, rdwr, nprocs, cb_buf,
388 				     client_comm_dtype_arr,
389 				     client_comm_sz_arr,
390 				     &agg_comm_requests,
391 				     &aggs_client_count);
392 
393 	    if (fd->is_agg && aggs_client_count) {
394 		agg_comm_statuses = ADIOI_Malloc(aggs_client_count *
395 						 sizeof(MPI_Status));
396 		MPI_Waitall(aggs_client_count, agg_comm_requests,
397 			    agg_comm_statuses);
398 #ifdef AGGREGATION_PROFILE
399 		MPE_Log_event (5033, 0, NULL);
400 #endif
401 		ADIOI_Free (agg_comm_requests);
402 		ADIOI_Free (agg_comm_statuses);
403 	    }
404 
405 	    if (clients_agg_count) {
406 		client_comm_statuses = ADIOI_Malloc(clients_agg_count *
407 						    sizeof(MPI_Status));
408 		MPI_Waitall(clients_agg_count, client_comm_requests,
409 			    client_comm_statuses);
410 #ifdef AGGREGATION_PROFILE
411 		MPE_Log_event (5039, 0, NULL);
412 #endif
413 		ADIOI_Free (client_comm_requests);
414 		ADIOI_Free (client_comm_statuses);
415 	    }
416 
417 #ifdef DEBUG2
418 	    fprintf (stderr, "buffered_io_size = %lld\n", buffered_io_size);
419 	    if (fd->is_agg && buffered_io_size) {
420 		fprintf (stderr, "buf = [");
421 		for (i=0; i<bufextent; i++)
422 		    fprintf (stderr, "%c", ((char *) buf)[i]);
423 		fprintf (stderr, "]\n");
424 		fprintf (stderr, "cb_buf = [");
425 		for (i=0; i<buffered_io_size; i++)
426 		    fprintf (stderr, "%c", cb_buf[i]);
427 		fprintf (stderr, "]\n");
428 		fflush (NULL);
429 	    }
430 #endif
431 	}
432 	else { /* Write Case */
433 #ifdef DEBUG
434 	    fprintf (stderr, "sending to [agg](disp,size,cnt)=");
435 	    for (i=0; i < nprocs; i++) {
436 		MPI_Type_size_x (agg_comm_dtype_arr[i], &size);
437 		fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i],
438 			 size, agg_alltoallw_counts[i]);
439 		if (i != nprocs - 1)
440 		    fprintf(stderr, ",");
441 	    }
442 	    fprintf (stderr, "]\n");
443 	    fprintf (stderr, "expecting from [client](disp,size,cnt)=");
444 	    for (i=0; i < nprocs; i++) {
445 		if (fd->is_agg)
446 		    MPI_Type_size_x (client_comm_dtype_arr[i], &size);
447 		else
448 		    size = -1;
449 
450 		fprintf (stderr, "[%d](%d,%d,%d)", i, alltoallw_disps[i],
451 			 size, client_alltoallw_counts[i]);
452 		if (i != nprocs - 1)
453 		    fprintf(stderr, ",");
454 	    }
455 	    fprintf (stderr,"\n");
456 	    fflush (NULL);
457 #endif
458 #ifdef DEBUG
459 	    fprintf (stderr, "buffered_io_size = %lld\n", buffered_io_size);
460 #endif
461 
462 	    if (clients_agg_count) {
463 		client_comm_statuses = ADIOI_Malloc(clients_agg_count *
464 						    sizeof(MPI_Status));
465 		MPI_Waitall(clients_agg_count, client_comm_requests,
466 			    client_comm_statuses);
467 #ifdef AGGREGATION_PROFILE
468 		MPE_Log_event (5039, 0, NULL);
469 #endif
470 		ADIOI_Free(client_comm_requests);
471 		ADIOI_Free(client_comm_statuses);
472 	    }
473 #ifdef DEBUG2
474 	    if (bufextent) {
475 		fprintf (stderr, "buf = [");
476 		for (i=0; i<bufextent; i++)
477 		    fprintf (stderr, "%c", ((char *) buf)[i]);
478 		fprintf (stderr, "]\n");
479 	    }
480 #endif
481 
482 	    if (fd->is_agg && buffered_io_size) {
483 		assert (aggs_client_count != 0);
484 		/* make sure we actually have the data to write out */
485 		agg_comm_statuses = (MPI_Status *)
486 		    ADIOI_Malloc (aggs_client_count*sizeof(MPI_Status));
487 
488 		MPI_Waitall (aggs_client_count, agg_comm_requests,
489 			     agg_comm_statuses);
490 #ifdef AGGREGATION_PROFILE
491 		MPE_Log_event (5033, 0, NULL);
492 #endif
493 		ADIOI_Free (agg_comm_requests);
494 		ADIOI_Free (agg_comm_statuses);
495 #ifdef DEBUG2
496 		fprintf (stderr, "cb_buf = [");
497 		for (i=0; i<buffered_io_size; i++)
498 		    fprintf (stderr, "%c", cb_buf[i]);
499 		fprintf (stderr, "]\n");
500 		fflush (NULL);
501 #endif
502 		ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
503 				  ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
504 				  ADIOI_WRITE, status, error_code);
505 		if (*error_code != MPI_SUCCESS) return;
506 		MPI_Type_free (&agg_dtype);
507 	    }
508 
509 	}
510 	} else {
511 	/* Alltoallw version of everything */
512 	ADIOI_Build_client_reqs(fd, nprocs, my_mem_view_state_arr,
513 				agg_file_view_state_arr,
514 				agg_comm_sz_arr, agg_comm_dtype_arr);
515 
516 	if (rdwr == ADIOI_READ) {
517 	    if (fd->is_agg && buffered_io_size) {
518 		ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
519 				  ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
520 				  ADIOI_READ, status, error_code);
521 		if (*error_code != MPI_SUCCESS) return;
522 		MPI_Type_free (&agg_dtype);
523 	    }
524 
525 #ifdef AGGREGATION_PROFILE
526 	    MPE_Log_event (5032, 0, NULL);
527 #endif
528 	    MPI_Alltoallw (cb_buf, client_alltoallw_counts, alltoallw_disps,
529 			   client_comm_dtype_arr,
530 			   buf, agg_alltoallw_counts , alltoallw_disps,
531 			   agg_comm_dtype_arr,
532 			   fd->comm);
533 #ifdef AGGREGATION_PROFILE
534 	    MPE_Log_event (5033, 0, NULL);
535 #endif
536 	}
537 	else { /* Write Case */
538 #ifdef AGGREGATION_PROFILE
539 	    MPE_Log_event (5032, 0, NULL);
540 #endif
541 	    MPI_Alltoallw (buf, agg_alltoallw_counts, alltoallw_disps,
542 			   agg_comm_dtype_arr,
543 			   cb_buf, client_alltoallw_counts, alltoallw_disps,
544 			   client_comm_dtype_arr,
545 			   fd->comm);
546 #ifdef AGGREGATION_PROFILE
547 	    MPE_Log_event (5033, 0, NULL);
548 #endif
549 	    if (fd->is_agg && buffered_io_size) {
550 		ADIOI_IOFiletype (fd, cb_buf, buffered_io_size, MPI_BYTE,
551 				  ADIO_EXPLICIT_OFFSET, agg_disp, agg_dtype,
552 				  ADIOI_WRITE, status, error_code);
553 		if (*error_code != MPI_SUCCESS) return;
554 		MPI_Type_free (&agg_dtype);
555 	    }
556 	}
557 	}
558 
559 	/* Free (uncommit) datatypes for reuse */
560 	if (fd->is_agg) {
561 	    if (buffered_io_size > 0) {
562 		for (i=0; i<nprocs; i++) {
563 		    if (client_comm_sz_arr[i] > 0)
564 			MPI_Type_free (&client_comm_dtype_arr[i]);
565 		}
566 	    }
567 	}
568 	for (i=0; i<nprocs; i++) {
569 	    if (agg_comm_sz_arr[i] > 0)
570 		MPI_Type_free (&agg_comm_dtype_arr[i]);
571 	}
572 
573 	/* figure out next set up requests */
574 	if (fd->is_agg) {
575 	    ADIOI_Build_agg_reqs (fd, rdwr, nprocs,
576 				  client_file_view_state_arr,
577 				  client_comm_dtype_arr,
578 				  client_comm_sz_arr,
579 				  &agg_disp,
580 				  &agg_dtype);
581 	    buffered_io_size = 0;
582 	    for (i=0; i <nprocs; i++) {
583 		if (client_comm_sz_arr[i] > 0)
584 		    buffered_io_size += client_comm_sz_arr[i];
585 	    }
586 	}
587 #ifdef USE_PRE_REQ
588 	else {
589 	    /* Example use of ADIOI_Build_client_pre_req. to an
590 	     * appropriate section */
591 	    for (i = 0; i < fd->hints->cb_nodes; i++)
592 	    {
593 		agg_rank = fd->hints->ranklist[(i+myrank)%fd->hints->cb_nodes];
594 #ifdef AGGREGATION_PROFILE
595 		MPE_Log_event (5040, 0, NULL);
596 #endif
597 		ADIOI_Build_client_pre_req(
598 		    fd, agg_rank, (i+myrank)%fd->hints->cb_nodes,
599 		    &(my_mem_view_state_arr[agg_rank]),
600 		    &(agg_file_view_state_arr[agg_rank]),
601 		    2*1024*1024,
602 		    64*1024);
603 #ifdef AGGREGATION_PROFILE
604 		MPE_Log_event (5041, 0, NULL);
605 #endif
606 	    }
607 	}
608 #endif
609 
610 	/* aggregators pre-post all Irecv's for incoming data from
611 	 * clients.  if nothing is needed, agg_comm_requests is not
612 	 * allocated */
613 	if (fd->hints->cb_alltoall == ADIOI_HINT_DISABLE) {
614 	    if ((fd->is_agg) && (rdwr == ADIOI_WRITE))
615 	        post_aggregator_comm(fd->comm, rdwr, nprocs, cb_buf,
616 				 client_comm_dtype_arr,
617 				 client_comm_sz_arr,
618 				 &agg_comm_requests,
619 				 &aggs_client_count);
620 	}
621 
622 	/* Aggregators send amounts for data requested to clients */
623 	Exch_data_amounts (fd, nprocs, client_comm_sz_arr, agg_comm_sz_arr,
624 			   client_alltoallw_counts, agg_alltoallw_counts,
625 			   &aggregators_done);
626 
627     }
628 
629     /* Clean up */
630 
631     if (fd->hints->cb_pfr != ADIOI_HINT_ENABLE) {
632 	/* AAR, FSIZE, and User provided uniform File realms */
633 	if (1) {
634 	    ADIOI_Delete_flattened (fd->file_realm_types[0]);
635 	    MPI_Type_free (&fd->file_realm_types[0]);
636 	}
637 	else {
638 	    for (i=0; i<fd->hints->cb_nodes; i++) {
639 		ADIOI_Datatype_iscontig(fd->file_realm_types[i], &is_contig);
640 		if (!is_contig)
641 		    ADIOI_Delete_flattened(fd->file_realm_types[i]);
642 		MPI_Type_free (&fd->file_realm_types[i]);
643 	    }
644 	}
645 	ADIOI_Free (fd->file_realm_types);
646 	ADIOI_Free (fd->file_realm_st_offs);
647     }
648 
649     /* This memtype must be deleted from the ADIOI_Flatlist or else it
650      * will match incorrectly with other datatypes which use this
651      * pointer. */
652     ADIOI_Delete_flattened(datatype);
653     ADIOI_Delete_flattened(fd->filetype);
654 
655     if (fd->is_agg) {
656 	if (buffered_io_size > 0)
657 	    MPI_Type_free (&agg_dtype);
658 	for (i=0; i<nprocs; i++) {
659 	    MPI_Type_free (&client_comm_dtype_arr[i]);
660 	    ADIOI_Free (client_file_view_state_arr[i].flat_type_p->indices);
661 	    ADIOI_Free (client_file_view_state_arr[i].flat_type_p->blocklens);
662 	    ADIOI_Free (client_file_view_state_arr[i].flat_type_p);
663 	}
664 	ADIOI_Free (client_file_view_state_arr);
665 	ADIOI_Free (cb_buf);
666     }
667     for (i = 0; i<nprocs; i++)
668 	if (agg_comm_sz_arr[i] > 0)
669 	    MPI_Type_free (&agg_comm_dtype_arr[i]);
670 
671     ADIOI_Free (client_comm_sz_arr);
672     ADIOI_Free (client_comm_dtype_arr);
673     ADIOI_Free (my_mem_view_state_arr);
674     ADIOI_Free (agg_file_view_state_arr);
675     ADIOI_Free (agg_comm_sz_arr);
676     ADIOI_Free (agg_comm_dtype_arr);
677     ADIOI_Free (alltoallw_disps);
678     ADIOI_Free (alltoallw_counts);
679     ADIOI_Free (all_st_end_offsets);
680 
681 #ifdef HAVE_STATUS_SET_BYTES
682     MPIR_Status_set_bytes(status, datatype, bufsize);
683     /* This is a temporary way of filling in status.  The right way is
684      * to keep track of how much data was actually read and placed in
685      * buf during collective I/O. */
686 #endif
687     fd->fp_sys_posn = -1; /* set it to null. */
688 #ifdef AGGREGATION_PROFILE
689     if (rdwr == ADIOI_READ)
690 	MPE_Log_event (5011, 0, NULL);
691     else
692 	MPE_Log_event (5013, 0, NULL);
693 #endif
694 }
695 
696 
697 /* Some of this code is from the old Calc_my_off_len() function.
698  * It calculates the 1st and last byte accessed */
ADIOI_Calc_bounds(ADIO_File fd,int count,MPI_Datatype buftype,int file_ptr_type,ADIO_Offset offset,ADIO_Offset * st_offset,ADIO_Offset * end_offset)699 void ADIOI_Calc_bounds (ADIO_File fd, int count, MPI_Datatype buftype,
700 			int file_ptr_type, ADIO_Offset offset,
701 			ADIO_Offset *st_offset, ADIO_Offset *end_offset)
702 {
703     MPI_Count filetype_size, buftype_size, etype_size;
704     int sum;
705     MPI_Aint filetype_extent, lb;
706     ADIO_Offset total_io;
707     int filetype_is_contig;
708     ADIO_Offset i, remainder;
709     ADIOI_Flatlist_node *flat_file;
710 
711     ADIO_Offset st_byte_off, end_byte_off;
712 
713 #ifdef AGGREGATION_PROFILE
714     MPE_Log_event (5000, 0, NULL);
715 #endif
716 
717     if (!count) {
718 	/* Max signed positive value for ADIO_Offset
719 	 * (arch. dependent?).  is there a better way? */
720 	memset (st_offset, 8, sizeof(ADIO_Offset));
721 	*st_offset = *st_offset / 2;
722 	*end_offset = -1;
723 	return;
724     }
725 
726     ADIOI_Datatype_iscontig (fd->filetype, &filetype_is_contig);
727 
728     MPI_Type_size_x (fd->filetype, &filetype_size);
729     MPI_Type_get_extent (fd->filetype, &lb, &filetype_extent);
730     MPI_Type_size_x (fd->etype, &etype_size);
731     MPI_Type_size_x (buftype, &buftype_size);
732 
733     total_io = buftype_size * count;
734 
735     if (filetype_is_contig) {
736 	if (file_ptr_type == ADIO_INDIVIDUAL)
737 	    st_byte_off = fd->fp_ind;
738 	else
739 	    st_byte_off = fd->disp + etype_size * offset;
740 
741 	end_byte_off = st_byte_off + total_io - 1;
742     }
743     else {
744 	flat_file = ADIOI_Flatlist;
745 	while (flat_file->type != fd->filetype) flat_file = flat_file->next;
746 
747 	/* we need to take care of some weirdness since fd->fp_ind
748 	   points at an accessible byte in file.  the first accessible
749 	   byte in the file is not necessarily the first byte, nor is
750 	   it necessarily the first off/len pair in the filetype. */
751 	if (file_ptr_type == ADIO_INDIVIDUAL) {
752 	    st_byte_off = fd->fp_ind;
753 	    /* find end byte of I/O (may be in middle of an etype) */
754 
755 	    /* calculate byte starting point of first filetype */
756 	    end_byte_off = (ADIO_Offset)
757 		((fd->fp_ind - fd->disp - flat_file->indices[0]) /
758 		 filetype_extent) * filetype_extent + fd->disp +
759 		flat_file->indices[0];
760 	    /* number of absolute bytes into first filetype */
761 	    remainder = (fd->fp_ind - fd->disp - flat_file->indices[0]) %
762 		filetype_extent;
763 	    if (remainder) {
764 		/* find how many file viewable bytes into first filetype */
765 		sum = 0;
766 		for (i=0; i<flat_file->count; i++) {
767 		    sum += flat_file->blocklens[i];
768 		    if ((flat_file->indices[i] - flat_file->indices[0] +
769 			 flat_file->blocklens[i]) >= remainder) {
770 			sum -= (flat_file->blocklens[i] - (sum - remainder));
771 			break;
772 		    }
773 		}
774 		total_io += sum;
775 	    }
776 	    /* byte starting point of last filetype */
777 	    end_byte_off += (total_io - 1) / filetype_size * filetype_extent;
778 	    /* number of bytes into last filetype */
779 	    remainder = total_io % filetype_size;
780 	    if (!remainder) {
781 		for (i=flat_file->count - 1; i>=0; i--) {
782 		    if (flat_file->blocklens[i]) break;
783 		}
784 		assert (i > -1);
785 		end_byte_off += flat_file->indices[i] +
786 		    flat_file->blocklens[i] - 1;
787 		end_byte_off -= flat_file->indices[0];
788 	    }
789 	    else {
790 		sum = 0;
791 		for (i=0; i<flat_file->count; i++) {
792 		    sum += flat_file->blocklens[i];
793 		    if (sum >= remainder) {
794 			end_byte_off += flat_file->indices[i] +
795 			    flat_file->blocklens[i] - sum + remainder - 1;
796 			break;
797 		    }
798 		}
799 		end_byte_off -= flat_file->indices[0];
800 	    }
801 	}
802 	else {
803 	    /* find starting byte of I/O (must be aligned with an etype) */
804 	    /* byte starting point of starting filetype */
805 	    st_byte_off = fd->disp + ((offset * etype_size) / filetype_size) *
806 		filetype_extent;
807 	    /* number of file viewable bytes into starting filetype */
808 	    remainder = (etype_size * offset) % filetype_size;
809 
810 	    sum = 0;
811 	    for (i=0; i<flat_file->count; i++) {
812 		sum += flat_file->blocklens[i];
813 		if (sum >= remainder) {
814 		    if (sum == remainder)
815 			st_byte_off += flat_file->indices[i+1];
816 		    else
817 			st_byte_off += flat_file->indices[i] +
818 			    flat_file->blocklens[i] - sum + remainder;
819 		    break;
820 		}
821 	    }
822 
823 	    /* find end byte of I/O (may be in middle of an etype) */
824 	    /* byte starting point of last filetype */
825 	    end_byte_off = fd->disp + (offset * etype_size + total_io) /
826 		filetype_size * filetype_extent;
827 	    /* number of bytes into last filetype */
828 	    remainder = (offset * etype_size + total_io) % filetype_size;
829 
830 	    if (!remainder) {
831 		/* the last non-zero off/len pair */
832 		for (i=flat_file->count-1; i>=0; i--) {
833 		    if (flat_file->blocklens[i]) break;
834 		}
835 		assert (i >= 0);
836 		/* back up a whole filetype, and put back up to the
837 		 * last byte of the last non-zero offlen pair */
838 		/* end_byte_off = (end_byte_off - filetype_extent) +
839 		    flat_file->indices[i] +
840 		    flat_file->blocklens[i] - 1; */
841 		/* equivalent of above commented out equation */
842 		end_byte_off -= filetype_extent - flat_file->indices[i] -
843 		    flat_file->blocklens[i] + 1;
844 	    }
845 	    else {
846 		sum = 0;
847 		for (i=0; i<flat_file->count; i++) {
848 		    sum += flat_file->blocklens[i];
849 		    if (sum >= remainder) {
850 			end_byte_off += flat_file->indices[i] +
851 			    flat_file->blocklens[i] - sum + remainder - 1;
852 			break;
853 		    }
854 		}
855 	    }
856 	}
857     }
858 
859     *st_offset  = st_byte_off;
860     *end_offset = end_byte_off;
861 #ifdef DEBUG
862     printf ("st_offset = %lld\nend_offset = %lld\n",
863 	    st_byte_off, end_byte_off);
864 #endif
865 #ifdef AGGREGATION_PROFILE
866     MPE_Log_event (5001, 0, NULL);
867 #endif
868 }
869 
870 /* wrapper function for ADIO_WriteStrided and ADIO_ReadStrided.  Used
871  * by new 2 phase code to pass an arbitrary file type directly to
872  * WriteStrided call without affecting existing code.  For the new 2
873  * phase code, we really only need to set a custom_ftype, and we can
874  * assume that this uses MPI_BYTE for the etype, and disp is 0 */
ADIOI_IOFiletype(ADIO_File fd,void * buf,int count,MPI_Datatype datatype,int file_ptr_type,ADIO_Offset offset,MPI_Datatype custom_ftype,int rdwr,ADIO_Status * status,int * error_code)875 void ADIOI_IOFiletype(ADIO_File fd, void *buf, int count,
876 		      MPI_Datatype datatype, int file_ptr_type,
877 		      ADIO_Offset offset, MPI_Datatype custom_ftype,
878 		      int rdwr, ADIO_Status *status, int *error_code)
879 {
880     MPI_Datatype user_filetype;
881     MPI_Datatype user_etype;
882     ADIO_Offset user_disp;
883     int user_ind_wr_buffer_size;
884     int user_ind_rd_buffer_size;
885     int f_is_contig, m_is_contig;
886     int user_ds_read, user_ds_write;
887     MPI_Aint f_extent, lb;
888     MPI_Count f_size;
889     int f_ds_percent; /* size/extent */
890 
891 #ifdef AGGREGATION_PROFILE
892     if (rdwr == ADIOI_READ)
893 	MPE_Log_event(5006, 0, NULL);
894     else
895 	MPE_Log_event(5008, 0, NULL);
896 #endif
897     MPI_Type_get_extent(custom_ftype, &lb, &f_extent);
898     MPI_Type_size_x(custom_ftype, &f_size);
899     f_ds_percent = 100 * f_size / f_extent;
900 
901     /* temporarily store file view information */
902     user_filetype           = fd->filetype;
903     user_etype              = fd->etype;
904     user_disp               = fd->disp;
905     user_ds_read            = fd->hints->ds_read;
906     user_ds_write           = fd->hints->ds_write;
907     /* temporarily override the independent I/O datasieve buffer size */
908     user_ind_wr_buffer_size = fd->hints->ind_wr_buffer_size;
909     user_ind_rd_buffer_size = fd->hints->ind_rd_buffer_size;
910 
911     /* set new values for temporary file view */
912     fd->filetype = custom_ftype;
913     fd->etype    = MPI_BYTE;
914     /* set new values for independent I/O datasieve buffer size */
915     fd->hints->ind_wr_buffer_size = fd->hints->cb_buffer_size;
916     fd->hints->ind_rd_buffer_size = fd->hints->cb_buffer_size;
917     /* decide whether or not to do datasieving */
918 #ifdef DEBUG
919     printf ("f_ds_percent = %d cb_ds_threshold = %d\n", f_ds_percent,
920 	    fd->hints->cb_ds_threshold);
921 #endif
922     if (f_ds_percent >= fd->hints->cb_ds_threshold) {
923 	fd->hints->ds_read = ADIOI_HINT_ENABLE;
924 	fd->hints->ds_write = ADIOI_HINT_ENABLE;
925     }
926     else {
927 	fd->hints->ds_read = ADIOI_HINT_DISABLE;
928 	fd->hints->ds_write = ADIOI_HINT_DISABLE;
929     }
930 
931     /* flatten the new filetype since the strided calls expect it to
932      * have been flattened in set file view.  in the two phase code,
933      * the datatype passed down should always be MPI_BYTE, and
934      * therefore contiguous, but just for completeness sake, we'll
935      * check the memory datatype anyway */
936     ADIOI_Datatype_iscontig(custom_ftype, &f_is_contig);
937     ADIOI_Datatype_iscontig(datatype, &m_is_contig);
938     if (!f_is_contig)
939 	ADIOI_Flatten_datatype (custom_ftype);
940 
941     /* make appropriate Read/Write calls.  Let ROMIO figure out file
942      * system specific stuff. */
943     if (f_is_contig && m_is_contig) {
944 	fd->disp = 0;
945 	if (rdwr == ADIOI_READ)
946 	    ADIO_ReadContig(fd, buf, count, datatype, file_ptr_type, offset,
947 			    status, error_code);
948 	else
949 	    ADIO_WriteContig(fd, buf, count, datatype, file_ptr_type, offset,
950 			     status, error_code);
951     }
952     else {
953 	fd->disp = offset;
954 	if (rdwr == ADIOI_READ)
955 	    ADIO_ReadStrided(fd, buf, count, datatype, file_ptr_type, 0,
956 			     status, error_code);
957 	else
958 	    ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type, 0,
959 			      status, error_code);
960     }
961 
962     /* Delete flattened temporary filetype */
963     if (!f_is_contig)
964 	ADIOI_Delete_flattened (custom_ftype);
965 
966     /* restore the user specified file view to cover our tracks */
967     fd->filetype                  = user_filetype;
968     fd->etype                     = user_etype;
969     fd->disp                      = user_disp;
970     fd->hints->ds_read            = user_ds_read;
971     fd->hints->ds_write           = user_ds_write;
972     fd->hints->ind_wr_buffer_size = user_ind_wr_buffer_size;
973     fd->hints->ind_rd_buffer_size = user_ind_rd_buffer_size;
974 #ifdef AGGREGATION_PROFILE
975     if (rdwr == ADIOI_READ)
976 	MPE_Log_event (5007, 0, NULL);
977     else
978 	MPE_Log_event (5009, 0, NULL);
979 #endif
980 }
981 
Exch_data_amounts(ADIO_File fd,int nprocs,ADIO_Offset * client_comm_sz_arr,ADIO_Offset * agg_comm_sz_arr,int * client_alltoallw_counts,int * agg_alltoallw_counts,int * aggregators_done)982 static void Exch_data_amounts (ADIO_File fd, int nprocs,
983 			ADIO_Offset *client_comm_sz_arr,
984 			ADIO_Offset *agg_comm_sz_arr,
985 			int *client_alltoallw_counts,
986 			int *agg_alltoallw_counts,
987 			int *aggregators_done)
988 {
989     int i;
990     int recv_idx;
991     MPI_Request *recv_requests;
992     MPI_Request *send_requests;
993     MPI_Status status;
994     MPI_Status *send_statuses;
995     /* Aggregators send amounts for data requested to clients */
996     if (fd->hints->cb_alltoall != ADIOI_HINT_DISABLE) {
997         MPI_Alltoall (client_comm_sz_arr, sizeof(ADIO_Offset), MPI_BYTE,
998 		  agg_comm_sz_arr, sizeof(ADIO_Offset), MPI_BYTE,
999 		  fd->comm);
1000 
1001         if (fd->is_agg) {
1002 	    for (i=0; i<nprocs; i++)
1003 	        if (client_comm_sz_arr[i] > 0)
1004 		    client_alltoallw_counts[i] = 1;
1005 	        else
1006 		    client_alltoallw_counts[i] = 0;
1007         }
1008         *aggregators_done = 0;
1009         for (i=0; i<nprocs; i++) {
1010 	    if (agg_comm_sz_arr[i] == -1)
1011 	        *aggregators_done = *aggregators_done + 1;
1012 	    else if (agg_comm_sz_arr[i] > 0)
1013 	        agg_alltoallw_counts[i] = 1;
1014 	    else
1015 	        agg_alltoallw_counts[i] = 0;
1016         }
1017     } else {
1018         /* let's see if we can't reduce some communication as well as
1019          * overlap some communication and work */
1020 
1021         recv_requests = ADIOI_Malloc (fd->hints->cb_nodes * sizeof(MPI_Request));
1022         /* post all receives - only receive from aggregators */
1023         for (i = 0; i < fd->hints->cb_nodes; i++)
1024 	    MPI_Irecv (&agg_comm_sz_arr[fd->hints->ranklist[i]],
1025 		   sizeof(ADIO_Offset), MPI_BYTE, fd->hints->ranklist[i],
1026 		   AMT_TAG, fd->comm, &recv_requests[i]);
1027 
1028         /* Barrier is needed here if we're worried about unexpected
1029          * messages being dropped */
1030         /* MPI_Barrier (fd->comm); */
1031         send_requests = NULL;
1032         if (fd->is_agg) {
1033 	    /* only aggregators send data */
1034 	    send_requests = ADIOI_Malloc (nprocs * sizeof(MPI_Request));
1035 
1036 	    /* post all sends */
1037 	    for (i = 0; i < nprocs; i++) {
1038 	        MPI_Isend (&client_comm_sz_arr[i], sizeof(ADIO_Offset),
1039 		       MPI_BYTE, i, AMT_TAG, fd->comm, &send_requests[i]);
1040 
1041 	        if (client_comm_sz_arr[i] > 0)
1042 		    client_alltoallw_counts[i] = 1;
1043 	        else
1044 		    client_alltoallw_counts[i] = 0;
1045 	    }
1046         }
1047 
1048         *aggregators_done = 0;
1049         for (i=0; i < fd->hints->cb_nodes; i++) {
1050 	    MPI_Waitany (fd->hints->cb_nodes, recv_requests, &recv_idx, &status);
1051 	    if (agg_comm_sz_arr[fd->hints->ranklist[recv_idx]] == -1)
1052 	        *aggregators_done = *aggregators_done + 1;
1053 	    else if (agg_comm_sz_arr[fd->hints->ranklist[recv_idx]] > 0)
1054 	        agg_alltoallw_counts[fd->hints->ranklist[recv_idx]] = 1;
1055 	    else
1056 	        agg_alltoallw_counts[fd->hints->ranklist[recv_idx]] = 0;
1057         }
1058 
1059         ADIOI_Free (recv_requests);
1060         if (fd->is_agg) {
1061 	    /* wait for all sends to complete */
1062 	    send_statuses = ADIOI_Malloc (nprocs * sizeof (MPI_Status));
1063 	    MPI_Waitall (nprocs, send_requests, send_statuses);
1064 	    ADIOI_Free (send_requests);
1065 	    ADIOI_Free (send_statuses);
1066         }
1067     }
1068 }
1069 
post_aggregator_comm(MPI_Comm comm,int rw_type,int nproc,void * cb_buf,MPI_Datatype * client_comm_dtype_arr,ADIO_Offset * client_comm_sz_arr,MPI_Request ** requests_p,int * aggs_client_count_p)1070 static void post_aggregator_comm (MPI_Comm comm, int rw_type,
1071 		           int nproc, void *cb_buf,
1072 			   MPI_Datatype *client_comm_dtype_arr,
1073 			   ADIO_Offset *client_comm_sz_arr,
1074 			   MPI_Request **requests_p,
1075 			   int *aggs_client_count_p)
1076 {
1077     int aggs_client_count = 0;
1078     MPI_Request *requests;
1079     int i;
1080 
1081 #ifdef DEBUG
1082     printf ("posting aggregator communication\n");
1083 #endif
1084 
1085     for (i=0; i < nproc; i++)
1086 	if (client_comm_sz_arr[i] > 0)
1087 	    aggs_client_count++;
1088 #ifdef DEBUG
1089     printf ("aggregator needs to talk to %d clients\n",
1090 	aggs_client_count);
1091 #endif
1092     *aggs_client_count_p = aggs_client_count;
1093     if (aggs_client_count) {
1094 	requests = (MPI_Request *)
1095 	    ADIOI_Malloc (aggs_client_count * sizeof(MPI_Request));
1096 	aggs_client_count = 0;
1097 #ifdef AGGREGATION_PROFILE
1098 	MPE_Log_event (5032, 0, NULL);
1099 #endif
1100 	for (i=0; i < nproc; i++) {
1101 	    if (client_comm_sz_arr[i] > 0) {
1102 		if (rw_type == ADIOI_WRITE)
1103 		    MPI_Irecv (cb_buf, 1, client_comm_dtype_arr[i], i,
1104 			       DATA_TAG, comm,
1105 			       &requests[aggs_client_count]);
1106 		else
1107 		    MPI_Isend (cb_buf, 1, client_comm_dtype_arr[i], i,
1108 			       DATA_TAG, comm,
1109 			       &requests[aggs_client_count]);
1110 
1111 		aggs_client_count++;
1112 	    }
1113 	}
1114 	*requests_p = requests;
1115     }
1116 }
1117 
post_client_comm(ADIO_File fd,int rw_type,int agg_rank,void * buf,MPI_Datatype agg_comm_dtype,int agg_alltoallw_count,MPI_Request * request)1118 static void post_client_comm (ADIO_File fd, int rw_type,
1119 		       int agg_rank, void *buf,
1120 		       MPI_Datatype agg_comm_dtype,
1121 		       int agg_alltoallw_count,
1122 		       MPI_Request *request)
1123 {
1124     if (agg_alltoallw_count) {
1125 	if (rw_type == ADIOI_READ)
1126 	    MPI_Irecv (buf, 1, agg_comm_dtype, agg_rank, DATA_TAG, fd->comm,
1127 		       request);
1128 	else
1129 	    MPI_Isend (buf, 1, agg_comm_dtype, agg_rank, DATA_TAG, fd->comm,
1130 		       request);
1131     }
1132 }
1133 
1134 
1135 
1136