1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  *   Copyright (C) 1997 University of Chicago.
4  *   See COPYRIGHT notice in top-level directory.
5  *
6  *   Copyright (C) 2007 Oak Ridge National Laboratory
7  *
8  *   Copyright (C) 2008 Sun Microsystems, Lustre group
9  */
10 
11 #include "ad_lustre.h"
12 #include "adio_extern.h"
13 
14 /* prototypes of functions used for collective writes only. */
15 static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, const void *buf,
16 					MPI_Datatype datatype, int nprocs,
17 					int myrank,
18 					ADIOI_Access *others_req,
19 					ADIOI_Access *my_req,
20 					ADIO_Offset *offset_list,
21 					ADIO_Offset *len_list,
22 					int contig_access_count,
23 					int *striping_info,
24                                         int **buf_idx, int *error_code);
25 static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, const void *buf,
26 					  ADIOI_Flatlist_node *flat_buf,
27 					  char **send_buf,
28 					  ADIO_Offset *offset_list,
29 					  ADIO_Offset *len_list, int *send_size,
30 					  MPI_Request *requests,
31 					  int *sent_to_proc, int nprocs,
32 					  int myrank, int contig_access_count,
33 					  int *striping_info,
34 					  int *send_buf_idx,
35                                           int *curr_to_proc,
36 					  int *done_to_proc, int iter,
37 					  MPI_Aint buftype_extent);
38 static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, const void *buf,
39 					 char *write_buf,
40 					 ADIOI_Flatlist_node *flat_buf,
41 					 ADIO_Offset *offset_list,
42 					 ADIO_Offset *len_list, int *send_size,
43 					 int *recv_size, ADIO_Offset off,
44 					 int size, int *count,
45 					 int *start_pos,
46 					 int *sent_to_proc, int nprocs,
47 					 int myrank, int buftype_is_contig,
48 					 int contig_access_count,
49 					 int *striping_info,
50 					 ADIOI_Access *others_req,
51 					 int *send_buf_idx,
52 					 int *curr_to_proc,
53 					 int *done_to_proc, int *hole,
54 					 int iter, MPI_Aint buftype_extent,
55 					 int *buf_idx,
56 					 ADIO_Offset **srt_off, int **srt_len, int *srt_num,
57 					 int *error_code);
58 void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
59                       ADIO_Offset *srt_off, int *srt_len, int *start_pos,
60                       int nprocs, int nprocs_recv, int total_elements);
61 
ADIOI_LUSTRE_WriteStridedColl(ADIO_File fd,const void * buf,int count,MPI_Datatype datatype,int file_ptr_type,ADIO_Offset offset,ADIO_Status * status,int * error_code)62 void ADIOI_LUSTRE_WriteStridedColl(ADIO_File fd, const void *buf, int count,
63 				   MPI_Datatype datatype,
64 				   int file_ptr_type, ADIO_Offset offset,
65 				   ADIO_Status *status, int *error_code)
66 {
67     /* Uses a generalized version of the extended two-phase method described
68      * in "An Extended Two-Phase Method for Accessing Sections of
69      * Out-of-Core Arrays", Rajeev Thakur and Alok Choudhary,
70      * Scientific Programming, (5)4:301--317, Winter 1996.
71      * http://www.mcs.anl.gov/home/thakur/ext2ph.ps
72      */
73 
74     ADIOI_Access *my_req;
75     /* array of nprocs access structures, one for each other process has
76        this process's request */
77 
78     ADIOI_Access *others_req;
79     /* array of nprocs access structures, one for each other process
80        whose request is written by this process. */
81 
82     int i, filetype_is_contig, nprocs, myrank, do_collect = 0;
83     int contig_access_count = 0, buftype_is_contig, interleave_count = 0;
84     int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
85     ADIO_Offset orig_fp, start_offset, end_offset, off;
86     ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *end_offsets = NULL;
87     ADIO_Offset *len_list = NULL;
88     int **buf_idx = NULL, *striping_info = NULL;
89     int old_error, tmp_error;
90 
91     MPI_Comm_size(fd->comm, &nprocs);
92     MPI_Comm_rank(fd->comm, &myrank);
93 
94     orig_fp = fd->fp_ind;
95 
96     /* IO patten identification if cb_write isn't disabled */
97     if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
98 	/* For this process's request, calculate the list of offsets and
99 	   lengths in the file and determine the start and end offsets. */
100 
101 	/* Note: end_offset points to the last byte-offset that will be accessed.
102          * e.g., if start_offset=0 and 100 bytes to be read, end_offset=99
103          */
104 
105 	ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
106 	                      &offset_list, &len_list, &start_offset,
107 	                      &end_offset, &contig_access_count);
108 
109 	/* each process communicates its start and end offsets to other
110          * processes. The result is an array each of start and end offsets
111          * stored in order of process rank.
112          */
113 	st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
114 	end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
115 	MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
116 		      ADIO_OFFSET, fd->comm);
117 	MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
118 		      ADIO_OFFSET, fd->comm);
119 	/* are the accesses of different processes interleaved? */
120 	for (i = 1; i < nprocs; i++)
121 	    if ((st_offsets[i] < end_offsets[i-1]) &&
122                 (st_offsets[i] <= end_offsets[i]))
123                 interleave_count++;
124 	/* This is a rudimentary check for interleaving, but should suffice
125 	   for the moment. */
126 
127 	/* Two typical access patterns can benefit from collective write.
128          *   1) the processes are interleaved, and
129          *   2) the req size is small.
130          */
131         if (interleave_count > 0) {
132 	    do_collect = 1;
133         } else {
134             do_collect = ADIOI_LUSTRE_Docollect(fd, contig_access_count,
135 			                        len_list, nprocs);
136         }
137     }
138     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
139 
140     /* Decide if collective I/O should be done */
141     if ((!do_collect && fd->hints->cb_write == ADIOI_HINT_AUTO) ||
142         fd->hints->cb_write == ADIOI_HINT_DISABLE) {
143 
144 	/* use independent accesses */
145 	if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
146 	    ADIOI_Free(offset_list);
147 	    ADIOI_Free(len_list);
148             ADIOI_Free(st_offsets);
149             ADIOI_Free(end_offsets);
150 	}
151 
152 	fd->fp_ind = orig_fp;
153 	ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
154 	if (buftype_is_contig && filetype_is_contig) {
155 	    if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
156                 off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
157 		ADIO_WriteContig(fd, buf, count, datatype,
158 				 ADIO_EXPLICIT_OFFSET,
159 				 off, status, error_code);
160 	    } else
161 		ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
162 				 0, status, error_code);
163 	} else {
164 	    ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
165 			      offset, status, error_code);
166 	}
167 	return;
168     }
169 
170     /* Get Lustre hints information */
171     ADIOI_LUSTRE_Get_striping_info(fd, &striping_info, 1);
172 
173     /* calculate what portions of the access requests of this process are
174      * located in which process
175      */
176     ADIOI_LUSTRE_Calc_my_req(fd, offset_list, len_list, contig_access_count,
177                              striping_info, nprocs, &count_my_req_procs,
178                              &count_my_req_per_proc, &my_req,
179                              &buf_idx);
180 
181     /* based on everyone's my_req, calculate what requests of other processes
182      * will be accessed by this process.
183      * count_others_req_procs = number of processes whose requests (including
184      * this process itself) will be accessed by this process
185      * count_others_req_per_proc[i] indicates how many separate contiguous
186      * requests of proc. i will be accessed by this process.
187      */
188 
189     ADIOI_Calc_others_req(fd, count_my_req_procs, count_my_req_per_proc,
190                           my_req, nprocs, myrank, &count_others_req_procs,
191                           &others_req);
192     ADIOI_Free(count_my_req_per_proc);
193 
194     /* exchange data and write in sizes of no more than stripe_size. */
195     ADIOI_LUSTRE_Exch_and_write(fd, buf, datatype, nprocs, myrank,
196                                 others_req, my_req, offset_list, len_list,
197                                 contig_access_count, striping_info,
198                                 buf_idx, error_code);
199 
200     /* If this collective write is followed by an independent write,
201      * it's possible to have those subsequent writes on other processes
202      * race ahead and sneak in before the read-modify-write completes.
203      * We carry out a collective communication at the end here so no one
204      * can start independent i/o before collective I/O completes.
205      *
206      * need to do some gymnastics with the error codes so that if something
207      * went wrong, all processes report error, but if a process has a more
208      * specific error code, we can still have that process report the
209      * additional information */
210 
211     old_error = *error_code;
212     if (*error_code != MPI_SUCCESS)
213 	*error_code = MPI_ERR_IO;
214 
215     /* optimization: if only one process performing i/o, we can perform
216      * a less-expensive Bcast  */
217 #ifdef ADIOI_MPE_LOGGING
218     MPE_Log_event(ADIOI_MPE_postwrite_a, 0, NULL);
219 #endif
220     if (fd->hints->cb_nodes == 1)
221 	MPI_Bcast(error_code, 1, MPI_INT,
222 		  fd->hints->ranklist[0], fd->comm);
223     else {
224 	tmp_error = *error_code;
225 	MPI_Allreduce(&tmp_error, error_code, 1, MPI_INT,
226 		      MPI_MAX, fd->comm);
227     }
228 #ifdef ADIOI_MPE_LOGGING
229     MPE_Log_event(ADIOI_MPE_postwrite_b, 0, NULL);
230 #endif
231 
232     if ((old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO))
233 	*error_code = old_error;
234 
235 
236     if (!buftype_is_contig)
237 	ADIOI_Delete_flattened(datatype);
238 
239     /* free all memory allocated for collective I/O */
240     /* free others_req */
241     for (i = 0; i < nprocs; i++) {
242 	if (others_req[i].count) {
243 	    ADIOI_Free(others_req[i].offsets);
244 	    ADIOI_Free(others_req[i].lens);
245 	    ADIOI_Free(others_req[i].mem_ptrs);
246 	}
247     }
248     ADIOI_Free(others_req);
249     /* free my_req here */
250     for (i = 0; i < nprocs; i++) {
251 	if (my_req[i].count) {
252 	    ADIOI_Free(my_req[i].offsets);
253 	    ADIOI_Free(my_req[i].lens);
254 	}
255     }
256     ADIOI_Free(my_req);
257     for (i = 0; i < nprocs; i++) {
258         ADIOI_Free(buf_idx[i]);
259     }
260     ADIOI_Free(buf_idx);
261     ADIOI_Free(offset_list);
262     ADIOI_Free(len_list);
263     ADIOI_Free(st_offsets);
264     ADIOI_Free(end_offsets);
265     ADIOI_Free(striping_info);
266 
267 #ifdef HAVE_STATUS_SET_BYTES
268     if (status) {
269 	MPI_Count bufsize, size;
270 	/* Don't set status if it isn't needed */
271 	MPI_Type_size_x(datatype, &size);
272 	bufsize = size * count;
273 	MPIR_Status_set_bytes(status, datatype, bufsize);
274     }
275     /* This is a temporary way of filling in status. The right way is to
276      * keep track of how much data was actually written during collective I/O.
277      */
278 #endif
279 
280     fd->fp_sys_posn = -1;	/* set it to null. */
281 }
282 
283 /* If successful, error_code is set to MPI_SUCCESS.  Otherwise an error
284  * code is created and returned in error_code.
285  */
ADIOI_LUSTRE_Exch_and_write(ADIO_File fd,const void * buf,MPI_Datatype datatype,int nprocs,int myrank,ADIOI_Access * others_req,ADIOI_Access * my_req,ADIO_Offset * offset_list,ADIO_Offset * len_list,int contig_access_count,int * striping_info,int ** buf_idx,int * error_code)286 static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, const void *buf,
287 					MPI_Datatype datatype, int nprocs,
288 					int myrank, ADIOI_Access *others_req,
289                                         ADIOI_Access *my_req,
290 					ADIO_Offset *offset_list,
291                                         ADIO_Offset *len_list,
292 					int contig_access_count,
293                                         int *striping_info, int **buf_idx,
294                                         int *error_code)
295 {
296     /* Send data to appropriate processes and write in sizes of no more
297      * than lustre stripe_size.
298      * The idea is to reduce the amount of extra memory required for
299      * collective I/O. If all data were written all at once, which is much
300      * easier, it would require temp space more than the size of user_buf,
301      * which is often unacceptable. For example, to write a distributed
302      * array to a file, where each local array is 8Mbytes, requiring
303      * at least another 8Mbytes of temp space is unacceptable.
304      */
305 
306     int hole, i, j, m, flag, ntimes = 1 , max_ntimes, buftype_is_contig;
307     ADIO_Offset st_loc = -1, end_loc = -1, min_st_loc, max_end_loc;
308     ADIO_Offset off, req_off, send_off, iter_st_off, *off_list;
309     ADIO_Offset max_size, step_size = 0;
310     int real_size, req_len, send_len;
311     int *recv_curr_offlen_ptr, *recv_count, *recv_size;
312     int *send_curr_offlen_ptr, *send_size;
313     int *sent_to_proc, *recv_start_pos;
314     int *send_buf_idx, *curr_to_proc, *done_to_proc;
315     int *this_buf_idx;
316     char *write_buf = NULL;
317     MPI_Status status;
318     ADIOI_Flatlist_node *flat_buf = NULL;
319     MPI_Aint buftype_extent;
320     int stripe_size = striping_info[0], avail_cb_nodes = striping_info[2];
321     int data_sieving = 0;
322     ADIO_Offset *srt_off = NULL;
323     int *srt_len = NULL;
324     int srt_num = 0;
325     ADIO_Offset block_offset;
326     int block_len;
327 
328     *error_code = MPI_SUCCESS;	/* changed below if error */
329     /* only I/O errors are currently reported */
330 
331     /* calculate the number of writes of stripe size to be done.
332      * That gives the no. of communication phases as well.
333      * Note:
334      *   Because we redistribute data in stripe-contiguous pattern for Lustre,
335      *   each process has the same no. of communication phases.
336      */
337 
338     for (i = 0; i < nprocs; i++) {
339 	if (others_req[i].count) {
340 	    st_loc = others_req[i].offsets[0];
341 	    end_loc = others_req[i].offsets[0];
342 	    break;
343 	}
344     }
345     for (i = 0; i < nprocs; i++) {
346 	for (j = 0; j < others_req[i].count; j++) {
347 	    st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
348 	    end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j] +
349                                           others_req[i].lens[j] - 1));
350 	}
351     }
352     /* this process does no writing. */
353     if ((st_loc == -1) && (end_loc == -1))
354 	ntimes = 0;
355     MPI_Allreduce(&end_loc, &max_end_loc, 1, MPI_LONG_LONG_INT, MPI_MAX, fd->comm);
356     /* avoid min_st_loc be -1 */
357     if (st_loc == -1)
358         st_loc = max_end_loc;
359     MPI_Allreduce(&st_loc, &min_st_loc, 1, MPI_LONG_LONG_INT, MPI_MIN, fd->comm);
360     /* align downward */
361     min_st_loc -= min_st_loc % (ADIO_Offset)stripe_size;
362 
363     /* Each time, only avail_cb_nodes number of IO clients perform IO,
364      * so, step_size=avail_cb_nodes*stripe_size IO will be performed at most,
365      * and ntimes=whole_file_portion/step_size
366      */
367     step_size = (ADIO_Offset) avail_cb_nodes * stripe_size;
368     max_ntimes = (max_end_loc - min_st_loc + 1) / step_size
369         + (((max_end_loc - min_st_loc + 1) % step_size) ? 1 : 0);
370 /*     max_ntimes = (int)((max_end_loc - min_st_loc) / step_size + 1); */
371     if (ntimes)
372 	write_buf = (char *) ADIOI_Malloc(stripe_size);
373 
374     /* calculate the start offset for each iteration */
375     off_list = (ADIO_Offset *) ADIOI_Malloc(max_ntimes * sizeof(ADIO_Offset));
376     for (m = 0; m < max_ntimes; m ++)
377         off_list[m] = max_end_loc;
378     for (i = 0; i < nprocs; i++) {
379         for (j = 0; j < others_req[i].count; j ++) {
380             req_off = others_req[i].offsets[j];
381             m = (int)((req_off - min_st_loc) / step_size);
382             off_list[m] = ADIOI_MIN(off_list[m], req_off);
383         }
384     }
385 
386     recv_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
387     send_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
388     /* their use is explained below. calloc initializes to 0. */
389 
390     recv_count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
391     /* to store count of how many off-len pairs per proc are satisfied
392        in an iteration. */
393 
394     send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
395     /* total size of data to be sent to each proc. in an iteration.
396        Of size nprocs so that I can use MPI_Alltoall later. */
397 
398     recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
399     /* total size of data to be recd. from each proc. in an iteration. */
400 
401     sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
402     /* amount of data sent to each proc so far. Used in
403        ADIOI_Fill_send_buffer. initialized to 0 here. */
404 
405     send_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
406     curr_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
407     done_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
408     /* Above three are used in ADIOI_Fill_send_buffer */
409 
410     this_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
411 
412     recv_start_pos = (int *) ADIOI_Malloc(nprocs * sizeof(int));
413     /* used to store the starting value of recv_curr_offlen_ptr[i] in
414        this iteration */
415 
416     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
417     if (!buftype_is_contig) {
418 	ADIOI_Flatten_datatype(datatype);
419 	flat_buf = ADIOI_Flatlist;
420 	while (flat_buf->type != datatype)
421 	    flat_buf = flat_buf->next;
422     }
423     MPI_Type_extent(datatype, &buftype_extent);
424     /* I need to check if there are any outstanding nonblocking writes to
425      * the file, which could potentially interfere with the writes taking
426      * place in this collective write call. Since this is not likely to be
427      * common, let me do the simplest thing possible here: Each process
428      * completes all pending nonblocking operations before completing.
429      */
430     /*ADIOI_Complete_async(error_code);
431     if (*error_code != MPI_SUCCESS) return;
432     MPI_Barrier(fd->comm);
433     */
434 
435     iter_st_off = min_st_loc;
436 
437     /* Although we have recognized the data according to OST index,
438      * a read-modify-write will be done if there is a hole between the data.
439      * For example: if blocksize=60, xfersize=30 and stripe_size=100,
440      * then rank0 will collect data [0, 30] and [60, 90] then write. There
441      * is a hole in [30, 60], which will cause a read-modify-write in [0, 90].
442      *
443      * To reduce its impact on the performance, we can disable data sieving
444      * by hint "ds_in_coll".
445      */
446     /* check the hint for data sieving */
447     data_sieving = fd->hints->fs_hints.lustre.ds_in_coll;
448 
449     for (m = 0; m < max_ntimes; m++) {
450 	/* go through all others_req and my_req to check which will be received
451          * and sent in this iteration.
452          */
453 
454 	/* Note that MPI guarantees that displacements in filetypes are in
455 	   monotonically nondecreasing order and that, for writes, the
456 	   filetypes cannot specify overlapping regions in the file. This
457 	   simplifies implementation a bit compared to reads. */
458 
459 	/*
460            off         = start offset in the file for the data to be written in
461                          this iteration
462            iter_st_off = start offset of this iteration
463            real_size   = size of data written (bytes) corresponding to off
464            max_size    = possible maximum size of data written in this iteration
465            req_off     = offset in the file for a particular contiguous request minus
466                          what was satisfied in previous iteration
467            send_off    = offset the request needed by other processes in this iteration
468            req_len     = size corresponding to req_off
469            send_len    = size corresponding to send_off
470          */
471 
472 	/* first calculate what should be communicated */
473 	for (i = 0; i < nprocs; i++)
474 	    recv_count[i] = recv_size[i] = send_size[i] = 0;
475 
476         off = off_list[m];
477         max_size = ADIOI_MIN(step_size, max_end_loc - iter_st_off + 1);
478         real_size = (int) ADIOI_MIN((off / stripe_size + 1) * stripe_size -
479                                     off,
480                                     end_loc - off + 1);
481 
482 	for (i = 0; i < nprocs; i++) {
483             if (my_req[i].count) {
484                 this_buf_idx[i] = buf_idx[i][send_curr_offlen_ptr[i]];
485                 for (j = send_curr_offlen_ptr[i]; j < my_req[i].count; j++) {
486                     send_off = my_req[i].offsets[j];
487                     send_len = my_req[i].lens[j];
488                     if (send_off < iter_st_off + max_size) {
489                         send_size[i] += send_len;
490                     } else {
491                         break;
492                     }
493                 }
494                 send_curr_offlen_ptr[i] = j;
495             }
496 	    if (others_req[i].count) {
497 		recv_start_pos[i] = recv_curr_offlen_ptr[i];
498 		for (j = recv_curr_offlen_ptr[i]; j < others_req[i].count; j++) {
499                     req_off = others_req[i].offsets[j];
500                     req_len = others_req[i].lens[j];
501 		    if (req_off < iter_st_off + max_size) {
502 			recv_count[i]++;
503                         ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)write_buf)+req_off-off) == (ADIO_Offset)(MPIR_Upint)(write_buf+req_off-off));
504 			MPI_Address(write_buf + req_off - off,
505 				    &(others_req[i].mem_ptrs[j]));
506                         recv_size[i] += req_len;
507 		    } else {
508 			break;
509                     }
510 		}
511 		recv_curr_offlen_ptr[i] = j;
512 	    }
513 	}
514         /* use variable "hole" to pass data_sieving flag into W_Exchange_data */
515         hole = data_sieving;
516 	ADIOI_LUSTRE_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
517                                      len_list, send_size, recv_size, off, real_size,
518                                      recv_count, recv_start_pos,
519                                      sent_to_proc, nprocs, myrank,
520                                      buftype_is_contig, contig_access_count,
521                                      striping_info, others_req, send_buf_idx,
522                                      curr_to_proc, done_to_proc, &hole, m,
523                                   buftype_extent, this_buf_idx,
524                                   &srt_off, &srt_len, &srt_num, error_code);
525 
526 	if (*error_code != MPI_SUCCESS)
527             goto over;
528 
529 	flag = 0;
530 	for (i = 0; i < nprocs; i++)
531 	    if (recv_count[i]) {
532 		flag = 1;
533 		break;
534 	    }
535 	if (flag) {
536             /* check whether to do data sieving */
537             if(data_sieving == ADIOI_HINT_ENABLE) {
538 	        ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
539 			         ADIO_EXPLICIT_OFFSET, off, &status,
540 			         error_code);
541             } else {
542                 /* if there is no hole, write data in one time;
543                  * otherwise, write data in several times */
544                 if (!hole) {
545                     ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
546                                      ADIO_EXPLICIT_OFFSET, off, &status,
547                                      error_code);
548                 } else {
549                     block_offset = -1;
550                     block_len = 0;
551                     for (i = 0; i < srt_num; ++i) {
552                         if (srt_off[i] < off + real_size &&
553                             srt_off[i] >= off) {
554                             if (block_offset == -1) {
555                                 block_offset = srt_off[i];
556                                 block_len = srt_len[i];
557                             } else {
558                                 if (srt_off[i] == block_offset + block_len) {
559                                     block_len += srt_len[i];
560                                 } else {
561                                     ADIO_WriteContig(fd,
562                                                      write_buf + block_offset - off,
563                                                      block_len,
564                                                      MPI_BYTE, ADIO_EXPLICIT_OFFSET,
565                                                      block_offset, &status,
566                                                      error_code);
567 	                            if (*error_code != MPI_SUCCESS)
568 		                        goto over;
569                                     block_offset = srt_off[i];
570                                     block_len = srt_len[i];
571                                 }
572                             }
573                         }
574                     }
575                     if (block_offset != -1) {
576                         ADIO_WriteContig(fd,
577                                          write_buf + block_offset - off,
578                                          block_len,
579                                          MPI_BYTE, ADIO_EXPLICIT_OFFSET,
580                                          block_offset, &status,
581                                          error_code);
582                         if (*error_code != MPI_SUCCESS)
583                             goto over;
584                     }
585                 }
586             }
587 	    if (*error_code != MPI_SUCCESS)
588 		goto over;
589 	}
590         iter_st_off += max_size;
591     }
592 over:
593     if (srt_off)
594         ADIOI_Free(srt_off);
595     if (srt_len)
596         ADIOI_Free(srt_len);
597     if (ntimes)
598 	ADIOI_Free(write_buf);
599     ADIOI_Free(recv_curr_offlen_ptr);
600     ADIOI_Free(send_curr_offlen_ptr);
601     ADIOI_Free(recv_count);
602     ADIOI_Free(send_size);
603     ADIOI_Free(recv_size);
604     ADIOI_Free(sent_to_proc);
605     ADIOI_Free(recv_start_pos);
606     ADIOI_Free(send_buf_idx);
607     ADIOI_Free(curr_to_proc);
608     ADIOI_Free(done_to_proc);
609     ADIOI_Free(this_buf_idx);
610     ADIOI_Free(off_list);
611 }
612 
613 /* Sets error_code to MPI_SUCCESS if successful, or creates an error code
614  * in the case of error.
615  */
ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd,const void * buf,char * write_buf,ADIOI_Flatlist_node * flat_buf,ADIO_Offset * offset_list,ADIO_Offset * len_list,int * send_size,int * recv_size,ADIO_Offset off,int size,int * count,int * start_pos,int * sent_to_proc,int nprocs,int myrank,int buftype_is_contig,int contig_access_count,int * striping_info,ADIOI_Access * others_req,int * send_buf_idx,int * curr_to_proc,int * done_to_proc,int * hole,int iter,MPI_Aint buftype_extent,int * buf_idx,ADIO_Offset ** srt_off,int ** srt_len,int * srt_num,int * error_code)616 static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, const void *buf,
617 					 char *write_buf,
618 					 ADIOI_Flatlist_node *flat_buf,
619 					 ADIO_Offset *offset_list,
620 					 ADIO_Offset *len_list, int *send_size,
621 					 int *recv_size, ADIO_Offset off,
622 					 int size, int *count,
623 					 int *start_pos,
624 					 int *sent_to_proc, int nprocs,
625 					 int myrank, int buftype_is_contig,
626 					 int contig_access_count,
627 					 int *striping_info,
628 					 ADIOI_Access *others_req,
629 					 int *send_buf_idx,
630 					 int *curr_to_proc, int *done_to_proc,
631                                          int *hole, int iter,
632                                          MPI_Aint buftype_extent,
633 					 int *buf_idx,
634                           ADIO_Offset **srt_off, int **srt_len, int *srt_num,
635                           int *error_code)
636 {
637     int i, j, nprocs_recv, nprocs_send, err;
638     char **send_buf = NULL;
639     MPI_Request *requests, *send_req;
640     MPI_Datatype *recv_types;
641     MPI_Status *statuses, status;
642     int sum_recv;
643     int data_sieving = *hole;
644     static char myname[] = "ADIOI_W_EXCHANGE_DATA";
645 
646     /* create derived datatypes for recv */
647     nprocs_recv = 0;
648     for (i = 0; i < nprocs; i++)
649 	if (recv_size[i])
650 	    nprocs_recv++;
651 
652     recv_types = (MPI_Datatype *) ADIOI_Malloc((nprocs_recv + 1) *
653 					       sizeof(MPI_Datatype));
654     /* +1 to avoid a 0-size malloc */
655 
656     j = 0;
657     for (i = 0; i < nprocs; i++) {
658 	if (recv_size[i]) {
659 	    ADIOI_Type_create_hindexed_x(count[i],
660 			      &(others_req[i].lens[start_pos[i]]),
661 			      &(others_req[i].mem_ptrs[start_pos[i]]),
662 			      MPI_BYTE, recv_types + j);
663 	    /* absolute displacements; use MPI_BOTTOM in recv */
664 	    MPI_Type_commit(recv_types + j);
665 	    j++;
666 	}
667     }
668 
669     /* To avoid a read-modify-write,
670      * check if there are holes in the data to be written.
671      * For this, merge the (sorted) offset lists others_req using a heap-merge.
672      */
673 
674     *srt_num = 0;
675     for (i = 0; i < nprocs; i++)
676         *srt_num += count[i];
677     if (*srt_off)
678         *srt_off = (ADIO_Offset *) ADIOI_Realloc(*srt_off, (*srt_num + 1) * sizeof(ADIO_Offset));
679     else
680         *srt_off = (ADIO_Offset *) ADIOI_Malloc((*srt_num + 1) * sizeof(ADIO_Offset));
681     if (*srt_len)
682         *srt_len = (int *) ADIOI_Realloc(*srt_len, (*srt_num + 1) * sizeof(int));
683     else
684         *srt_len = (int *) ADIOI_Malloc((*srt_num + 1) * sizeof(int));
685     /* +1 to avoid a 0-size malloc */
686 
687     ADIOI_Heap_merge(others_req, count, *srt_off, *srt_len, start_pos,
688 		     nprocs, nprocs_recv, *srt_num);
689 
690     /* check if there are any holes */
691     *hole = 0;
692     for (i = 0; i < *srt_num - 1; i++) {
693         if ((*srt_off)[i] + (*srt_len)[i] < (*srt_off)[i + 1]) {
694             *hole = 1;
695 	    break;
696 	}
697     }
698     /* In some cases (see John Bent ROMIO REQ # 835), an odd interaction
699      * between aggregation, nominally contiguous regions, and cb_buffer_size
700      * should be handled with a read-modify-write (otherwise we will write out
701      * more data than we receive from everyone else (inclusive), so override
702      * hole detection
703      */
704     if (*hole == 0) {
705         sum_recv = 0;
706         for (i = 0; i < nprocs; i++)
707             sum_recv += recv_size[i];
708 	if (size > sum_recv)
709 	    *hole = 1;
710     }
711     /* check the hint for data sieving */
712     if (data_sieving == ADIOI_HINT_ENABLE && nprocs_recv && *hole) {
713         ADIO_ReadContig(fd, write_buf, size, MPI_BYTE,
714                         ADIO_EXPLICIT_OFFSET, off, &status, &err);
715         // --BEGIN ERROR HANDLING--
716         if (err != MPI_SUCCESS) {
717             *error_code = MPIO_Err_create_code(err,
718                                                MPIR_ERR_RECOVERABLE,
719                                                myname, __LINE__,
720                                                MPI_ERR_IO,
721                                                "**ioRMWrdwr", 0);
722             ADIOI_Free(recv_types);
723             return;
724         }
725         // --END ERROR HANDLING--
726     }
727 
728     nprocs_send = 0;
729     for (i = 0; i < nprocs; i++)
730 	if (send_size[i])
731 	    nprocs_send++;
732 
733     if (fd->atomicity) {
734 	/* bug fix from Wei-keng Liao and Kenin Coloma */
735 	requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + 1) *
736                                                 sizeof(MPI_Request));
737 	send_req = requests;
738     } else {
739 	requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1)*
740                                                 sizeof(MPI_Request));
741 	/* +1 to avoid a 0-size malloc */
742 
743 	/* post receives */
744 	j = 0;
745 	for (i = 0; i < nprocs; i++) {
746 	    if (recv_size[i]) {
747 		MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i,
748 			  myrank + i + 100 * iter, fd->comm, requests + j);
749 		j++;
750 	    }
751 	}
752 	send_req = requests + nprocs_recv;
753     }
754 
755     /* post sends.
756      * if buftype_is_contig, data can be directly sent from
757      * user buf at location given by buf_idx. else use send_buf.
758      */
759     if (buftype_is_contig) {
760 	j = 0;
761 	for (i = 0; i < nprocs; i++)
762 	    if (send_size[i]) {
763                 ADIOI_Assert(buf_idx[i] != -1);
764 		MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
765 			  MPI_BYTE, i, myrank + i + 100 * iter, fd->comm,
766 			  send_req + j);
767 		j++;
768 	    }
769     } else
770         if (nprocs_send) {
771 	/* buftype is not contig */
772 	send_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char *));
773 	for (i = 0; i < nprocs; i++)
774 	    if (send_size[i])
775 		send_buf[i] = (char *) ADIOI_Malloc(send_size[i]);
776 
777 	ADIOI_LUSTRE_Fill_send_buffer(fd, buf, flat_buf, send_buf, offset_list,
778                                       len_list, send_size, send_req,
779                                       sent_to_proc, nprocs, myrank,
780                                       contig_access_count, striping_info,
781                                       send_buf_idx, curr_to_proc, done_to_proc,
782                                       iter, buftype_extent);
783 	/* the send is done in ADIOI_Fill_send_buffer */
784     }
785 
786 	/* bug fix from Wei-keng Liao and Kenin Coloma */
787     if (fd->atomicity) {
788 	j = 0;
789 	for (i = 0; i < nprocs; i++) {
790 	    MPI_Status wkl_status;
791 	    if (recv_size[i]) {
792 		MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i,
793 			 myrank + i + 100 * iter, fd->comm, &wkl_status);
794 		j++;
795 	    }
796 	}
797     }
798 
799     for (i = 0; i < nprocs_recv; i++)
800 	MPI_Type_free(recv_types + i);
801     ADIOI_Free(recv_types);
802 
803 	/* bug fix from Wei-keng Liao and Kenin Coloma */
804 	/* +1 to avoid a 0-size malloc */
805     if (fd->atomicity) {
806 	statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + 1) *
807 					       sizeof(MPI_Status));
808     } else {
809 	statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1) *
810 					       sizeof(MPI_Status));
811     }
812 
813 #ifdef NEEDS_MPI_TEST
814     i = 0;
815     if (fd->atomicity) {
816 	/* bug fix from Wei-keng Liao and Kenin Coloma */
817 	while (!i)
818 	    MPI_Testall(nprocs_send, send_req, &i, statuses);
819     } else {
820 	while (!i)
821 	    MPI_Testall(nprocs_send + nprocs_recv, requests, &i, statuses);
822     }
823 #else
824 	/* bug fix from Wei-keng Liao and Kenin Coloma */
825     if (fd->atomicity)
826 	MPI_Waitall(nprocs_send, send_req, statuses);
827     else
828 	MPI_Waitall(nprocs_send + nprocs_recv, requests, statuses);
829 #endif
830     ADIOI_Free(statuses);
831     ADIOI_Free(requests);
832     if (!buftype_is_contig && nprocs_send) {
833 	for (i = 0; i < nprocs; i++)
834 	    if (send_size[i])
835 		ADIOI_Free(send_buf[i]);
836 	ADIOI_Free(send_buf);
837     }
838 }
839 
840 #define ADIOI_BUF_INCR \
841 { \
842     while (buf_incr) { \
843         size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
844         user_buf_idx += size_in_buf; \
845         flat_buf_sz -= size_in_buf; \
846         if (!flat_buf_sz) { \
847             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
848             else { \
849                 flat_buf_idx = 0; \
850                 n_buftypes++; \
851             } \
852             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
853                 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent;  \
854             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
855         } \
856         buf_incr -= size_in_buf; \
857     } \
858 }
859 
860 
861 #define ADIOI_BUF_COPY \
862 { \
863     while (size) { \
864         size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
865         ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIR_Upint)((MPIR_Upint)buf + user_buf_idx)); \
866         ADIOI_Assert(size_in_buf == (size_t)size_in_buf);               \
867         memcpy(&(send_buf[p][send_buf_idx[p]]), \
868                ((char *) buf) + user_buf_idx, size_in_buf); \
869         send_buf_idx[p] += size_in_buf; \
870         user_buf_idx += size_in_buf; \
871         flat_buf_sz -= size_in_buf; \
872         if (!flat_buf_sz) { \
873             if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
874             else { \
875                 flat_buf_idx = 0; \
876                 n_buftypes++; \
877             } \
878             user_buf_idx = flat_buf->indices[flat_buf_idx] + \
879                 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent;    \
880             flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
881         } \
882         size -= size_in_buf; \
883         buf_incr -= size_in_buf; \
884     } \
885     ADIOI_BUF_INCR \
886 }
887 
ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd,const void * buf,ADIOI_Flatlist_node * flat_buf,char ** send_buf,ADIO_Offset * offset_list,ADIO_Offset * len_list,int * send_size,MPI_Request * requests,int * sent_to_proc,int nprocs,int myrank,int contig_access_count,int * striping_info,int * send_buf_idx,int * curr_to_proc,int * done_to_proc,int iter,MPI_Aint buftype_extent)888 static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, const void *buf,
889 					  ADIOI_Flatlist_node *flat_buf,
890 					  char **send_buf,
891 					  ADIO_Offset *offset_list,
892 					  ADIO_Offset *len_list, int *send_size,
893 					  MPI_Request *requests,
894 					  int *sent_to_proc, int nprocs,
895 					  int myrank,
896 					  int contig_access_count,
897 					  int *striping_info,
898 					  int *send_buf_idx,
899 					  int *curr_to_proc,
900 					  int *done_to_proc, int iter,
901 					  MPI_Aint buftype_extent)
902 {
903     /* this function is only called if buftype is not contig */
904     int i, p, flat_buf_idx, size;
905     int flat_buf_sz, buf_incr, size_in_buf, jj, n_buftypes;
906     ADIO_Offset off, len, rem_len, user_buf_idx;
907 
908     /* curr_to_proc[p] = amount of data sent to proc. p that has already
909      * been accounted for so far
910      * done_to_proc[p] = amount of data already sent to proc. p in
911      * previous iterations
912      * user_buf_idx = current location in user buffer
913      * send_buf_idx[p] = current location in send_buf of proc. p
914      */
915 
916     for (i = 0; i < nprocs; i++) {
917 	send_buf_idx[i] = curr_to_proc[i] = 0;
918 	done_to_proc[i] = sent_to_proc[i];
919     }
920     jj = 0;
921 
922     user_buf_idx = flat_buf->indices[0];
923     flat_buf_idx = 0;
924     n_buftypes = 0;
925     flat_buf_sz = flat_buf->blocklens[0];
926 
927     /* flat_buf_idx = current index into flattened buftype
928      * flat_buf_sz = size of current contiguous component in flattened buf
929      */
930     for (i = 0; i < contig_access_count; i++) {
931 	off = offset_list[i];
932 	rem_len = (ADIO_Offset) len_list[i];
933 
934 	/*this request may span to more than one process */
935 	while (rem_len != 0) {
936 	    len = rem_len;
937 	    /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no
938 	     * longer than the single region that processor "p" is responsible
939 	     * for.
940 	     */
941 	    p = ADIOI_LUSTRE_Calc_aggregator(fd, off, &len, striping_info);
942 
943 	    if (send_buf_idx[p] < send_size[p]) {
944 		if (curr_to_proc[p] + len > done_to_proc[p]) {
945 		    if (done_to_proc[p] > curr_to_proc[p]) {
946 			size = (int) ADIOI_MIN(curr_to_proc[p] + len -
947 					       done_to_proc[p],
948 					       send_size[p] -
949 					       send_buf_idx[p]);
950 			buf_incr = done_to_proc[p] - curr_to_proc[p];
951 			ADIOI_BUF_INCR
952                         ADIOI_Assert((curr_to_proc[p] + len - done_to_proc[p]) == (unsigned)(curr_to_proc[p] + len - done_to_proc[p]));
953 			    buf_incr = (int) (curr_to_proc[p] + len -
954 					      done_to_proc[p]);
955                         ADIOI_Assert((done_to_proc[p] + size) == (unsigned)(done_to_proc[p] + size));
956 			curr_to_proc[p] = done_to_proc[p] + size;
957 		        ADIOI_BUF_COPY
958                     } else {
959 			size = (int) ADIOI_MIN(len, send_size[p] -
960 					       send_buf_idx[p]);
961 			buf_incr = (int) len;
962                         ADIOI_Assert((curr_to_proc[p] + size) == (unsigned)((ADIO_Offset)curr_to_proc[p] + size));
963 			curr_to_proc[p] += size;
964 		        ADIOI_BUF_COPY
965                     }
966 		    if (send_buf_idx[p] == send_size[p]) {
967 			MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p,
968 				  myrank + p + 100 * iter, fd->comm,
969 				  requests + jj);
970 			jj++;
971 		    }
972 		} else {
973                     ADIOI_Assert((curr_to_proc[p] + len) == (unsigned)((ADIO_Offset)curr_to_proc[p] + len));
974 		    curr_to_proc[p] += (int) len;
975 		    buf_incr = (int) len;
976 		    ADIOI_BUF_INCR
977                 }
978 	    } else {
979 		buf_incr = (int) len;
980 	        ADIOI_BUF_INCR
981             }
982 	    off += len;
983 	    rem_len -= len;
984 	}
985     }
986     for (i = 0; i < nprocs; i++)
987 	if (send_size[i])
988 	    sent_to_proc[i] = curr_to_proc[i];
989 }
990