1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3 * Copyright (C) 1997 University of Chicago.
4 * See COPYRIGHT notice in top-level directory.
5 *
6 * Copyright (C) 2007 Oak Ridge National Laboratory
7 *
8 * Copyright (C) 2008 Sun Microsystems, Lustre group
9 */
10
11 #include "ad_lustre.h"
12 #include "adio_extern.h"
13
14 /* prototypes of functions used for collective writes only. */
15 static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, const void *buf,
16 MPI_Datatype datatype, int nprocs,
17 int myrank,
18 ADIOI_Access *others_req,
19 ADIOI_Access *my_req,
20 ADIO_Offset *offset_list,
21 ADIO_Offset *len_list,
22 int contig_access_count,
23 int *striping_info,
24 int **buf_idx, int *error_code);
25 static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, const void *buf,
26 ADIOI_Flatlist_node *flat_buf,
27 char **send_buf,
28 ADIO_Offset *offset_list,
29 ADIO_Offset *len_list, int *send_size,
30 MPI_Request *requests,
31 int *sent_to_proc, int nprocs,
32 int myrank, int contig_access_count,
33 int *striping_info,
34 int *send_buf_idx,
35 int *curr_to_proc,
36 int *done_to_proc, int iter,
37 MPI_Aint buftype_extent);
38 static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, const void *buf,
39 char *write_buf,
40 ADIOI_Flatlist_node *flat_buf,
41 ADIO_Offset *offset_list,
42 ADIO_Offset *len_list, int *send_size,
43 int *recv_size, ADIO_Offset off,
44 int size, int *count,
45 int *start_pos,
46 int *sent_to_proc, int nprocs,
47 int myrank, int buftype_is_contig,
48 int contig_access_count,
49 int *striping_info,
50 ADIOI_Access *others_req,
51 int *send_buf_idx,
52 int *curr_to_proc,
53 int *done_to_proc, int *hole,
54 int iter, MPI_Aint buftype_extent,
55 int *buf_idx,
56 ADIO_Offset **srt_off, int **srt_len, int *srt_num,
57 int *error_code);
58 void ADIOI_Heap_merge(ADIOI_Access *others_req, int *count,
59 ADIO_Offset *srt_off, int *srt_len, int *start_pos,
60 int nprocs, int nprocs_recv, int total_elements);
61
ADIOI_LUSTRE_WriteStridedColl(ADIO_File fd,const void * buf,int count,MPI_Datatype datatype,int file_ptr_type,ADIO_Offset offset,ADIO_Status * status,int * error_code)62 void ADIOI_LUSTRE_WriteStridedColl(ADIO_File fd, const void *buf, int count,
63 MPI_Datatype datatype,
64 int file_ptr_type, ADIO_Offset offset,
65 ADIO_Status *status, int *error_code)
66 {
67 /* Uses a generalized version of the extended two-phase method described
68 * in "An Extended Two-Phase Method for Accessing Sections of
69 * Out-of-Core Arrays", Rajeev Thakur and Alok Choudhary,
70 * Scientific Programming, (5)4:301--317, Winter 1996.
71 * http://www.mcs.anl.gov/home/thakur/ext2ph.ps
72 */
73
74 ADIOI_Access *my_req;
75 /* array of nprocs access structures, one for each other process has
76 this process's request */
77
78 ADIOI_Access *others_req;
79 /* array of nprocs access structures, one for each other process
80 whose request is written by this process. */
81
82 int i, filetype_is_contig, nprocs, myrank, do_collect = 0;
83 int contig_access_count = 0, buftype_is_contig, interleave_count = 0;
84 int *count_my_req_per_proc, count_my_req_procs, count_others_req_procs;
85 ADIO_Offset orig_fp, start_offset, end_offset, off;
86 ADIO_Offset *offset_list = NULL, *st_offsets = NULL, *end_offsets = NULL;
87 ADIO_Offset *len_list = NULL;
88 int **buf_idx = NULL, *striping_info = NULL;
89 int old_error, tmp_error;
90
91 MPI_Comm_size(fd->comm, &nprocs);
92 MPI_Comm_rank(fd->comm, &myrank);
93
94 orig_fp = fd->fp_ind;
95
96 /* IO patten identification if cb_write isn't disabled */
97 if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
98 /* For this process's request, calculate the list of offsets and
99 lengths in the file and determine the start and end offsets. */
100
101 /* Note: end_offset points to the last byte-offset that will be accessed.
102 * e.g., if start_offset=0 and 100 bytes to be read, end_offset=99
103 */
104
105 ADIOI_Calc_my_off_len(fd, count, datatype, file_ptr_type, offset,
106 &offset_list, &len_list, &start_offset,
107 &end_offset, &contig_access_count);
108
109 /* each process communicates its start and end offsets to other
110 * processes. The result is an array each of start and end offsets
111 * stored in order of process rank.
112 */
113 st_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
114 end_offsets = (ADIO_Offset *) ADIOI_Malloc(nprocs * sizeof(ADIO_Offset));
115 MPI_Allgather(&start_offset, 1, ADIO_OFFSET, st_offsets, 1,
116 ADIO_OFFSET, fd->comm);
117 MPI_Allgather(&end_offset, 1, ADIO_OFFSET, end_offsets, 1,
118 ADIO_OFFSET, fd->comm);
119 /* are the accesses of different processes interleaved? */
120 for (i = 1; i < nprocs; i++)
121 if ((st_offsets[i] < end_offsets[i-1]) &&
122 (st_offsets[i] <= end_offsets[i]))
123 interleave_count++;
124 /* This is a rudimentary check for interleaving, but should suffice
125 for the moment. */
126
127 /* Two typical access patterns can benefit from collective write.
128 * 1) the processes are interleaved, and
129 * 2) the req size is small.
130 */
131 if (interleave_count > 0) {
132 do_collect = 1;
133 } else {
134 do_collect = ADIOI_LUSTRE_Docollect(fd, contig_access_count,
135 len_list, nprocs);
136 }
137 }
138 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
139
140 /* Decide if collective I/O should be done */
141 if ((!do_collect && fd->hints->cb_write == ADIOI_HINT_AUTO) ||
142 fd->hints->cb_write == ADIOI_HINT_DISABLE) {
143
144 /* use independent accesses */
145 if (fd->hints->cb_write != ADIOI_HINT_DISABLE) {
146 ADIOI_Free(offset_list);
147 ADIOI_Free(len_list);
148 ADIOI_Free(st_offsets);
149 ADIOI_Free(end_offsets);
150 }
151
152 fd->fp_ind = orig_fp;
153 ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
154 if (buftype_is_contig && filetype_is_contig) {
155 if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
156 off = fd->disp + (ADIO_Offset)(fd->etype_size) * offset;
157 ADIO_WriteContig(fd, buf, count, datatype,
158 ADIO_EXPLICIT_OFFSET,
159 off, status, error_code);
160 } else
161 ADIO_WriteContig(fd, buf, count, datatype, ADIO_INDIVIDUAL,
162 0, status, error_code);
163 } else {
164 ADIO_WriteStrided(fd, buf, count, datatype, file_ptr_type,
165 offset, status, error_code);
166 }
167 return;
168 }
169
170 /* Get Lustre hints information */
171 ADIOI_LUSTRE_Get_striping_info(fd, &striping_info, 1);
172
173 /* calculate what portions of the access requests of this process are
174 * located in which process
175 */
176 ADIOI_LUSTRE_Calc_my_req(fd, offset_list, len_list, contig_access_count,
177 striping_info, nprocs, &count_my_req_procs,
178 &count_my_req_per_proc, &my_req,
179 &buf_idx);
180
181 /* based on everyone's my_req, calculate what requests of other processes
182 * will be accessed by this process.
183 * count_others_req_procs = number of processes whose requests (including
184 * this process itself) will be accessed by this process
185 * count_others_req_per_proc[i] indicates how many separate contiguous
186 * requests of proc. i will be accessed by this process.
187 */
188
189 ADIOI_Calc_others_req(fd, count_my_req_procs, count_my_req_per_proc,
190 my_req, nprocs, myrank, &count_others_req_procs,
191 &others_req);
192 ADIOI_Free(count_my_req_per_proc);
193
194 /* exchange data and write in sizes of no more than stripe_size. */
195 ADIOI_LUSTRE_Exch_and_write(fd, buf, datatype, nprocs, myrank,
196 others_req, my_req, offset_list, len_list,
197 contig_access_count, striping_info,
198 buf_idx, error_code);
199
200 /* If this collective write is followed by an independent write,
201 * it's possible to have those subsequent writes on other processes
202 * race ahead and sneak in before the read-modify-write completes.
203 * We carry out a collective communication at the end here so no one
204 * can start independent i/o before collective I/O completes.
205 *
206 * need to do some gymnastics with the error codes so that if something
207 * went wrong, all processes report error, but if a process has a more
208 * specific error code, we can still have that process report the
209 * additional information */
210
211 old_error = *error_code;
212 if (*error_code != MPI_SUCCESS)
213 *error_code = MPI_ERR_IO;
214
215 /* optimization: if only one process performing i/o, we can perform
216 * a less-expensive Bcast */
217 #ifdef ADIOI_MPE_LOGGING
218 MPE_Log_event(ADIOI_MPE_postwrite_a, 0, NULL);
219 #endif
220 if (fd->hints->cb_nodes == 1)
221 MPI_Bcast(error_code, 1, MPI_INT,
222 fd->hints->ranklist[0], fd->comm);
223 else {
224 tmp_error = *error_code;
225 MPI_Allreduce(&tmp_error, error_code, 1, MPI_INT,
226 MPI_MAX, fd->comm);
227 }
228 #ifdef ADIOI_MPE_LOGGING
229 MPE_Log_event(ADIOI_MPE_postwrite_b, 0, NULL);
230 #endif
231
232 if ((old_error != MPI_SUCCESS) && (old_error != MPI_ERR_IO))
233 *error_code = old_error;
234
235
236 if (!buftype_is_contig)
237 ADIOI_Delete_flattened(datatype);
238
239 /* free all memory allocated for collective I/O */
240 /* free others_req */
241 for (i = 0; i < nprocs; i++) {
242 if (others_req[i].count) {
243 ADIOI_Free(others_req[i].offsets);
244 ADIOI_Free(others_req[i].lens);
245 ADIOI_Free(others_req[i].mem_ptrs);
246 }
247 }
248 ADIOI_Free(others_req);
249 /* free my_req here */
250 for (i = 0; i < nprocs; i++) {
251 if (my_req[i].count) {
252 ADIOI_Free(my_req[i].offsets);
253 ADIOI_Free(my_req[i].lens);
254 }
255 }
256 ADIOI_Free(my_req);
257 for (i = 0; i < nprocs; i++) {
258 ADIOI_Free(buf_idx[i]);
259 }
260 ADIOI_Free(buf_idx);
261 ADIOI_Free(offset_list);
262 ADIOI_Free(len_list);
263 ADIOI_Free(st_offsets);
264 ADIOI_Free(end_offsets);
265 ADIOI_Free(striping_info);
266
267 #ifdef HAVE_STATUS_SET_BYTES
268 if (status) {
269 MPI_Count bufsize, size;
270 /* Don't set status if it isn't needed */
271 MPI_Type_size_x(datatype, &size);
272 bufsize = size * count;
273 MPIR_Status_set_bytes(status, datatype, bufsize);
274 }
275 /* This is a temporary way of filling in status. The right way is to
276 * keep track of how much data was actually written during collective I/O.
277 */
278 #endif
279
280 fd->fp_sys_posn = -1; /* set it to null. */
281 }
282
283 /* If successful, error_code is set to MPI_SUCCESS. Otherwise an error
284 * code is created and returned in error_code.
285 */
ADIOI_LUSTRE_Exch_and_write(ADIO_File fd,const void * buf,MPI_Datatype datatype,int nprocs,int myrank,ADIOI_Access * others_req,ADIOI_Access * my_req,ADIO_Offset * offset_list,ADIO_Offset * len_list,int contig_access_count,int * striping_info,int ** buf_idx,int * error_code)286 static void ADIOI_LUSTRE_Exch_and_write(ADIO_File fd, const void *buf,
287 MPI_Datatype datatype, int nprocs,
288 int myrank, ADIOI_Access *others_req,
289 ADIOI_Access *my_req,
290 ADIO_Offset *offset_list,
291 ADIO_Offset *len_list,
292 int contig_access_count,
293 int *striping_info, int **buf_idx,
294 int *error_code)
295 {
296 /* Send data to appropriate processes and write in sizes of no more
297 * than lustre stripe_size.
298 * The idea is to reduce the amount of extra memory required for
299 * collective I/O. If all data were written all at once, which is much
300 * easier, it would require temp space more than the size of user_buf,
301 * which is often unacceptable. For example, to write a distributed
302 * array to a file, where each local array is 8Mbytes, requiring
303 * at least another 8Mbytes of temp space is unacceptable.
304 */
305
306 int hole, i, j, m, flag, ntimes = 1 , max_ntimes, buftype_is_contig;
307 ADIO_Offset st_loc = -1, end_loc = -1, min_st_loc, max_end_loc;
308 ADIO_Offset off, req_off, send_off, iter_st_off, *off_list;
309 ADIO_Offset max_size, step_size = 0;
310 int real_size, req_len, send_len;
311 int *recv_curr_offlen_ptr, *recv_count, *recv_size;
312 int *send_curr_offlen_ptr, *send_size;
313 int *sent_to_proc, *recv_start_pos;
314 int *send_buf_idx, *curr_to_proc, *done_to_proc;
315 int *this_buf_idx;
316 char *write_buf = NULL;
317 MPI_Status status;
318 ADIOI_Flatlist_node *flat_buf = NULL;
319 MPI_Aint buftype_extent;
320 int stripe_size = striping_info[0], avail_cb_nodes = striping_info[2];
321 int data_sieving = 0;
322 ADIO_Offset *srt_off = NULL;
323 int *srt_len = NULL;
324 int srt_num = 0;
325 ADIO_Offset block_offset;
326 int block_len;
327
328 *error_code = MPI_SUCCESS; /* changed below if error */
329 /* only I/O errors are currently reported */
330
331 /* calculate the number of writes of stripe size to be done.
332 * That gives the no. of communication phases as well.
333 * Note:
334 * Because we redistribute data in stripe-contiguous pattern for Lustre,
335 * each process has the same no. of communication phases.
336 */
337
338 for (i = 0; i < nprocs; i++) {
339 if (others_req[i].count) {
340 st_loc = others_req[i].offsets[0];
341 end_loc = others_req[i].offsets[0];
342 break;
343 }
344 }
345 for (i = 0; i < nprocs; i++) {
346 for (j = 0; j < others_req[i].count; j++) {
347 st_loc = ADIOI_MIN(st_loc, others_req[i].offsets[j]);
348 end_loc = ADIOI_MAX(end_loc, (others_req[i].offsets[j] +
349 others_req[i].lens[j] - 1));
350 }
351 }
352 /* this process does no writing. */
353 if ((st_loc == -1) && (end_loc == -1))
354 ntimes = 0;
355 MPI_Allreduce(&end_loc, &max_end_loc, 1, MPI_LONG_LONG_INT, MPI_MAX, fd->comm);
356 /* avoid min_st_loc be -1 */
357 if (st_loc == -1)
358 st_loc = max_end_loc;
359 MPI_Allreduce(&st_loc, &min_st_loc, 1, MPI_LONG_LONG_INT, MPI_MIN, fd->comm);
360 /* align downward */
361 min_st_loc -= min_st_loc % (ADIO_Offset)stripe_size;
362
363 /* Each time, only avail_cb_nodes number of IO clients perform IO,
364 * so, step_size=avail_cb_nodes*stripe_size IO will be performed at most,
365 * and ntimes=whole_file_portion/step_size
366 */
367 step_size = (ADIO_Offset) avail_cb_nodes * stripe_size;
368 max_ntimes = (max_end_loc - min_st_loc + 1) / step_size
369 + (((max_end_loc - min_st_loc + 1) % step_size) ? 1 : 0);
370 /* max_ntimes = (int)((max_end_loc - min_st_loc) / step_size + 1); */
371 if (ntimes)
372 write_buf = (char *) ADIOI_Malloc(stripe_size);
373
374 /* calculate the start offset for each iteration */
375 off_list = (ADIO_Offset *) ADIOI_Malloc(max_ntimes * sizeof(ADIO_Offset));
376 for (m = 0; m < max_ntimes; m ++)
377 off_list[m] = max_end_loc;
378 for (i = 0; i < nprocs; i++) {
379 for (j = 0; j < others_req[i].count; j ++) {
380 req_off = others_req[i].offsets[j];
381 m = (int)((req_off - min_st_loc) / step_size);
382 off_list[m] = ADIOI_MIN(off_list[m], req_off);
383 }
384 }
385
386 recv_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
387 send_curr_offlen_ptr = (int *) ADIOI_Calloc(nprocs, sizeof(int));
388 /* their use is explained below. calloc initializes to 0. */
389
390 recv_count = (int *) ADIOI_Malloc(nprocs * sizeof(int));
391 /* to store count of how many off-len pairs per proc are satisfied
392 in an iteration. */
393
394 send_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
395 /* total size of data to be sent to each proc. in an iteration.
396 Of size nprocs so that I can use MPI_Alltoall later. */
397
398 recv_size = (int *) ADIOI_Malloc(nprocs * sizeof(int));
399 /* total size of data to be recd. from each proc. in an iteration. */
400
401 sent_to_proc = (int *) ADIOI_Calloc(nprocs, sizeof(int));
402 /* amount of data sent to each proc so far. Used in
403 ADIOI_Fill_send_buffer. initialized to 0 here. */
404
405 send_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
406 curr_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
407 done_to_proc = (int *) ADIOI_Malloc(nprocs * sizeof(int));
408 /* Above three are used in ADIOI_Fill_send_buffer */
409
410 this_buf_idx = (int *) ADIOI_Malloc(nprocs * sizeof(int));
411
412 recv_start_pos = (int *) ADIOI_Malloc(nprocs * sizeof(int));
413 /* used to store the starting value of recv_curr_offlen_ptr[i] in
414 this iteration */
415
416 ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
417 if (!buftype_is_contig) {
418 ADIOI_Flatten_datatype(datatype);
419 flat_buf = ADIOI_Flatlist;
420 while (flat_buf->type != datatype)
421 flat_buf = flat_buf->next;
422 }
423 MPI_Type_extent(datatype, &buftype_extent);
424 /* I need to check if there are any outstanding nonblocking writes to
425 * the file, which could potentially interfere with the writes taking
426 * place in this collective write call. Since this is not likely to be
427 * common, let me do the simplest thing possible here: Each process
428 * completes all pending nonblocking operations before completing.
429 */
430 /*ADIOI_Complete_async(error_code);
431 if (*error_code != MPI_SUCCESS) return;
432 MPI_Barrier(fd->comm);
433 */
434
435 iter_st_off = min_st_loc;
436
437 /* Although we have recognized the data according to OST index,
438 * a read-modify-write will be done if there is a hole between the data.
439 * For example: if blocksize=60, xfersize=30 and stripe_size=100,
440 * then rank0 will collect data [0, 30] and [60, 90] then write. There
441 * is a hole in [30, 60], which will cause a read-modify-write in [0, 90].
442 *
443 * To reduce its impact on the performance, we can disable data sieving
444 * by hint "ds_in_coll".
445 */
446 /* check the hint for data sieving */
447 data_sieving = fd->hints->fs_hints.lustre.ds_in_coll;
448
449 for (m = 0; m < max_ntimes; m++) {
450 /* go through all others_req and my_req to check which will be received
451 * and sent in this iteration.
452 */
453
454 /* Note that MPI guarantees that displacements in filetypes are in
455 monotonically nondecreasing order and that, for writes, the
456 filetypes cannot specify overlapping regions in the file. This
457 simplifies implementation a bit compared to reads. */
458
459 /*
460 off = start offset in the file for the data to be written in
461 this iteration
462 iter_st_off = start offset of this iteration
463 real_size = size of data written (bytes) corresponding to off
464 max_size = possible maximum size of data written in this iteration
465 req_off = offset in the file for a particular contiguous request minus
466 what was satisfied in previous iteration
467 send_off = offset the request needed by other processes in this iteration
468 req_len = size corresponding to req_off
469 send_len = size corresponding to send_off
470 */
471
472 /* first calculate what should be communicated */
473 for (i = 0; i < nprocs; i++)
474 recv_count[i] = recv_size[i] = send_size[i] = 0;
475
476 off = off_list[m];
477 max_size = ADIOI_MIN(step_size, max_end_loc - iter_st_off + 1);
478 real_size = (int) ADIOI_MIN((off / stripe_size + 1) * stripe_size -
479 off,
480 end_loc - off + 1);
481
482 for (i = 0; i < nprocs; i++) {
483 if (my_req[i].count) {
484 this_buf_idx[i] = buf_idx[i][send_curr_offlen_ptr[i]];
485 for (j = send_curr_offlen_ptr[i]; j < my_req[i].count; j++) {
486 send_off = my_req[i].offsets[j];
487 send_len = my_req[i].lens[j];
488 if (send_off < iter_st_off + max_size) {
489 send_size[i] += send_len;
490 } else {
491 break;
492 }
493 }
494 send_curr_offlen_ptr[i] = j;
495 }
496 if (others_req[i].count) {
497 recv_start_pos[i] = recv_curr_offlen_ptr[i];
498 for (j = recv_curr_offlen_ptr[i]; j < others_req[i].count; j++) {
499 req_off = others_req[i].offsets[j];
500 req_len = others_req[i].lens[j];
501 if (req_off < iter_st_off + max_size) {
502 recv_count[i]++;
503 ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)write_buf)+req_off-off) == (ADIO_Offset)(MPIR_Upint)(write_buf+req_off-off));
504 MPI_Address(write_buf + req_off - off,
505 &(others_req[i].mem_ptrs[j]));
506 recv_size[i] += req_len;
507 } else {
508 break;
509 }
510 }
511 recv_curr_offlen_ptr[i] = j;
512 }
513 }
514 /* use variable "hole" to pass data_sieving flag into W_Exchange_data */
515 hole = data_sieving;
516 ADIOI_LUSTRE_W_Exchange_data(fd, buf, write_buf, flat_buf, offset_list,
517 len_list, send_size, recv_size, off, real_size,
518 recv_count, recv_start_pos,
519 sent_to_proc, nprocs, myrank,
520 buftype_is_contig, contig_access_count,
521 striping_info, others_req, send_buf_idx,
522 curr_to_proc, done_to_proc, &hole, m,
523 buftype_extent, this_buf_idx,
524 &srt_off, &srt_len, &srt_num, error_code);
525
526 if (*error_code != MPI_SUCCESS)
527 goto over;
528
529 flag = 0;
530 for (i = 0; i < nprocs; i++)
531 if (recv_count[i]) {
532 flag = 1;
533 break;
534 }
535 if (flag) {
536 /* check whether to do data sieving */
537 if(data_sieving == ADIOI_HINT_ENABLE) {
538 ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
539 ADIO_EXPLICIT_OFFSET, off, &status,
540 error_code);
541 } else {
542 /* if there is no hole, write data in one time;
543 * otherwise, write data in several times */
544 if (!hole) {
545 ADIO_WriteContig(fd, write_buf, real_size, MPI_BYTE,
546 ADIO_EXPLICIT_OFFSET, off, &status,
547 error_code);
548 } else {
549 block_offset = -1;
550 block_len = 0;
551 for (i = 0; i < srt_num; ++i) {
552 if (srt_off[i] < off + real_size &&
553 srt_off[i] >= off) {
554 if (block_offset == -1) {
555 block_offset = srt_off[i];
556 block_len = srt_len[i];
557 } else {
558 if (srt_off[i] == block_offset + block_len) {
559 block_len += srt_len[i];
560 } else {
561 ADIO_WriteContig(fd,
562 write_buf + block_offset - off,
563 block_len,
564 MPI_BYTE, ADIO_EXPLICIT_OFFSET,
565 block_offset, &status,
566 error_code);
567 if (*error_code != MPI_SUCCESS)
568 goto over;
569 block_offset = srt_off[i];
570 block_len = srt_len[i];
571 }
572 }
573 }
574 }
575 if (block_offset != -1) {
576 ADIO_WriteContig(fd,
577 write_buf + block_offset - off,
578 block_len,
579 MPI_BYTE, ADIO_EXPLICIT_OFFSET,
580 block_offset, &status,
581 error_code);
582 if (*error_code != MPI_SUCCESS)
583 goto over;
584 }
585 }
586 }
587 if (*error_code != MPI_SUCCESS)
588 goto over;
589 }
590 iter_st_off += max_size;
591 }
592 over:
593 if (srt_off)
594 ADIOI_Free(srt_off);
595 if (srt_len)
596 ADIOI_Free(srt_len);
597 if (ntimes)
598 ADIOI_Free(write_buf);
599 ADIOI_Free(recv_curr_offlen_ptr);
600 ADIOI_Free(send_curr_offlen_ptr);
601 ADIOI_Free(recv_count);
602 ADIOI_Free(send_size);
603 ADIOI_Free(recv_size);
604 ADIOI_Free(sent_to_proc);
605 ADIOI_Free(recv_start_pos);
606 ADIOI_Free(send_buf_idx);
607 ADIOI_Free(curr_to_proc);
608 ADIOI_Free(done_to_proc);
609 ADIOI_Free(this_buf_idx);
610 ADIOI_Free(off_list);
611 }
612
613 /* Sets error_code to MPI_SUCCESS if successful, or creates an error code
614 * in the case of error.
615 */
ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd,const void * buf,char * write_buf,ADIOI_Flatlist_node * flat_buf,ADIO_Offset * offset_list,ADIO_Offset * len_list,int * send_size,int * recv_size,ADIO_Offset off,int size,int * count,int * start_pos,int * sent_to_proc,int nprocs,int myrank,int buftype_is_contig,int contig_access_count,int * striping_info,ADIOI_Access * others_req,int * send_buf_idx,int * curr_to_proc,int * done_to_proc,int * hole,int iter,MPI_Aint buftype_extent,int * buf_idx,ADIO_Offset ** srt_off,int ** srt_len,int * srt_num,int * error_code)616 static void ADIOI_LUSTRE_W_Exchange_data(ADIO_File fd, const void *buf,
617 char *write_buf,
618 ADIOI_Flatlist_node *flat_buf,
619 ADIO_Offset *offset_list,
620 ADIO_Offset *len_list, int *send_size,
621 int *recv_size, ADIO_Offset off,
622 int size, int *count,
623 int *start_pos,
624 int *sent_to_proc, int nprocs,
625 int myrank, int buftype_is_contig,
626 int contig_access_count,
627 int *striping_info,
628 ADIOI_Access *others_req,
629 int *send_buf_idx,
630 int *curr_to_proc, int *done_to_proc,
631 int *hole, int iter,
632 MPI_Aint buftype_extent,
633 int *buf_idx,
634 ADIO_Offset **srt_off, int **srt_len, int *srt_num,
635 int *error_code)
636 {
637 int i, j, nprocs_recv, nprocs_send, err;
638 char **send_buf = NULL;
639 MPI_Request *requests, *send_req;
640 MPI_Datatype *recv_types;
641 MPI_Status *statuses, status;
642 int sum_recv;
643 int data_sieving = *hole;
644 static char myname[] = "ADIOI_W_EXCHANGE_DATA";
645
646 /* create derived datatypes for recv */
647 nprocs_recv = 0;
648 for (i = 0; i < nprocs; i++)
649 if (recv_size[i])
650 nprocs_recv++;
651
652 recv_types = (MPI_Datatype *) ADIOI_Malloc((nprocs_recv + 1) *
653 sizeof(MPI_Datatype));
654 /* +1 to avoid a 0-size malloc */
655
656 j = 0;
657 for (i = 0; i < nprocs; i++) {
658 if (recv_size[i]) {
659 ADIOI_Type_create_hindexed_x(count[i],
660 &(others_req[i].lens[start_pos[i]]),
661 &(others_req[i].mem_ptrs[start_pos[i]]),
662 MPI_BYTE, recv_types + j);
663 /* absolute displacements; use MPI_BOTTOM in recv */
664 MPI_Type_commit(recv_types + j);
665 j++;
666 }
667 }
668
669 /* To avoid a read-modify-write,
670 * check if there are holes in the data to be written.
671 * For this, merge the (sorted) offset lists others_req using a heap-merge.
672 */
673
674 *srt_num = 0;
675 for (i = 0; i < nprocs; i++)
676 *srt_num += count[i];
677 if (*srt_off)
678 *srt_off = (ADIO_Offset *) ADIOI_Realloc(*srt_off, (*srt_num + 1) * sizeof(ADIO_Offset));
679 else
680 *srt_off = (ADIO_Offset *) ADIOI_Malloc((*srt_num + 1) * sizeof(ADIO_Offset));
681 if (*srt_len)
682 *srt_len = (int *) ADIOI_Realloc(*srt_len, (*srt_num + 1) * sizeof(int));
683 else
684 *srt_len = (int *) ADIOI_Malloc((*srt_num + 1) * sizeof(int));
685 /* +1 to avoid a 0-size malloc */
686
687 ADIOI_Heap_merge(others_req, count, *srt_off, *srt_len, start_pos,
688 nprocs, nprocs_recv, *srt_num);
689
690 /* check if there are any holes */
691 *hole = 0;
692 for (i = 0; i < *srt_num - 1; i++) {
693 if ((*srt_off)[i] + (*srt_len)[i] < (*srt_off)[i + 1]) {
694 *hole = 1;
695 break;
696 }
697 }
698 /* In some cases (see John Bent ROMIO REQ # 835), an odd interaction
699 * between aggregation, nominally contiguous regions, and cb_buffer_size
700 * should be handled with a read-modify-write (otherwise we will write out
701 * more data than we receive from everyone else (inclusive), so override
702 * hole detection
703 */
704 if (*hole == 0) {
705 sum_recv = 0;
706 for (i = 0; i < nprocs; i++)
707 sum_recv += recv_size[i];
708 if (size > sum_recv)
709 *hole = 1;
710 }
711 /* check the hint for data sieving */
712 if (data_sieving == ADIOI_HINT_ENABLE && nprocs_recv && *hole) {
713 ADIO_ReadContig(fd, write_buf, size, MPI_BYTE,
714 ADIO_EXPLICIT_OFFSET, off, &status, &err);
715 // --BEGIN ERROR HANDLING--
716 if (err != MPI_SUCCESS) {
717 *error_code = MPIO_Err_create_code(err,
718 MPIR_ERR_RECOVERABLE,
719 myname, __LINE__,
720 MPI_ERR_IO,
721 "**ioRMWrdwr", 0);
722 ADIOI_Free(recv_types);
723 return;
724 }
725 // --END ERROR HANDLING--
726 }
727
728 nprocs_send = 0;
729 for (i = 0; i < nprocs; i++)
730 if (send_size[i])
731 nprocs_send++;
732
733 if (fd->atomicity) {
734 /* bug fix from Wei-keng Liao and Kenin Coloma */
735 requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + 1) *
736 sizeof(MPI_Request));
737 send_req = requests;
738 } else {
739 requests = (MPI_Request *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1)*
740 sizeof(MPI_Request));
741 /* +1 to avoid a 0-size malloc */
742
743 /* post receives */
744 j = 0;
745 for (i = 0; i < nprocs; i++) {
746 if (recv_size[i]) {
747 MPI_Irecv(MPI_BOTTOM, 1, recv_types[j], i,
748 myrank + i + 100 * iter, fd->comm, requests + j);
749 j++;
750 }
751 }
752 send_req = requests + nprocs_recv;
753 }
754
755 /* post sends.
756 * if buftype_is_contig, data can be directly sent from
757 * user buf at location given by buf_idx. else use send_buf.
758 */
759 if (buftype_is_contig) {
760 j = 0;
761 for (i = 0; i < nprocs; i++)
762 if (send_size[i]) {
763 ADIOI_Assert(buf_idx[i] != -1);
764 MPI_Isend(((char *) buf) + buf_idx[i], send_size[i],
765 MPI_BYTE, i, myrank + i + 100 * iter, fd->comm,
766 send_req + j);
767 j++;
768 }
769 } else
770 if (nprocs_send) {
771 /* buftype is not contig */
772 send_buf = (char **) ADIOI_Malloc(nprocs * sizeof(char *));
773 for (i = 0; i < nprocs; i++)
774 if (send_size[i])
775 send_buf[i] = (char *) ADIOI_Malloc(send_size[i]);
776
777 ADIOI_LUSTRE_Fill_send_buffer(fd, buf, flat_buf, send_buf, offset_list,
778 len_list, send_size, send_req,
779 sent_to_proc, nprocs, myrank,
780 contig_access_count, striping_info,
781 send_buf_idx, curr_to_proc, done_to_proc,
782 iter, buftype_extent);
783 /* the send is done in ADIOI_Fill_send_buffer */
784 }
785
786 /* bug fix from Wei-keng Liao and Kenin Coloma */
787 if (fd->atomicity) {
788 j = 0;
789 for (i = 0; i < nprocs; i++) {
790 MPI_Status wkl_status;
791 if (recv_size[i]) {
792 MPI_Recv(MPI_BOTTOM, 1, recv_types[j], i,
793 myrank + i + 100 * iter, fd->comm, &wkl_status);
794 j++;
795 }
796 }
797 }
798
799 for (i = 0; i < nprocs_recv; i++)
800 MPI_Type_free(recv_types + i);
801 ADIOI_Free(recv_types);
802
803 /* bug fix from Wei-keng Liao and Kenin Coloma */
804 /* +1 to avoid a 0-size malloc */
805 if (fd->atomicity) {
806 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + 1) *
807 sizeof(MPI_Status));
808 } else {
809 statuses = (MPI_Status *) ADIOI_Malloc((nprocs_send + nprocs_recv + 1) *
810 sizeof(MPI_Status));
811 }
812
813 #ifdef NEEDS_MPI_TEST
814 i = 0;
815 if (fd->atomicity) {
816 /* bug fix from Wei-keng Liao and Kenin Coloma */
817 while (!i)
818 MPI_Testall(nprocs_send, send_req, &i, statuses);
819 } else {
820 while (!i)
821 MPI_Testall(nprocs_send + nprocs_recv, requests, &i, statuses);
822 }
823 #else
824 /* bug fix from Wei-keng Liao and Kenin Coloma */
825 if (fd->atomicity)
826 MPI_Waitall(nprocs_send, send_req, statuses);
827 else
828 MPI_Waitall(nprocs_send + nprocs_recv, requests, statuses);
829 #endif
830 ADIOI_Free(statuses);
831 ADIOI_Free(requests);
832 if (!buftype_is_contig && nprocs_send) {
833 for (i = 0; i < nprocs; i++)
834 if (send_size[i])
835 ADIOI_Free(send_buf[i]);
836 ADIOI_Free(send_buf);
837 }
838 }
839
840 #define ADIOI_BUF_INCR \
841 { \
842 while (buf_incr) { \
843 size_in_buf = ADIOI_MIN(buf_incr, flat_buf_sz); \
844 user_buf_idx += size_in_buf; \
845 flat_buf_sz -= size_in_buf; \
846 if (!flat_buf_sz) { \
847 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
848 else { \
849 flat_buf_idx = 0; \
850 n_buftypes++; \
851 } \
852 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
853 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
854 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
855 } \
856 buf_incr -= size_in_buf; \
857 } \
858 }
859
860
861 #define ADIOI_BUF_COPY \
862 { \
863 while (size) { \
864 size_in_buf = ADIOI_MIN(size, flat_buf_sz); \
865 ADIOI_Assert((((ADIO_Offset)(MPIR_Upint)buf) + user_buf_idx) == (ADIO_Offset)(MPIR_Upint)((MPIR_Upint)buf + user_buf_idx)); \
866 ADIOI_Assert(size_in_buf == (size_t)size_in_buf); \
867 memcpy(&(send_buf[p][send_buf_idx[p]]), \
868 ((char *) buf) + user_buf_idx, size_in_buf); \
869 send_buf_idx[p] += size_in_buf; \
870 user_buf_idx += size_in_buf; \
871 flat_buf_sz -= size_in_buf; \
872 if (!flat_buf_sz) { \
873 if (flat_buf_idx < (flat_buf->count - 1)) flat_buf_idx++; \
874 else { \
875 flat_buf_idx = 0; \
876 n_buftypes++; \
877 } \
878 user_buf_idx = flat_buf->indices[flat_buf_idx] + \
879 (ADIO_Offset)n_buftypes*(ADIO_Offset)buftype_extent; \
880 flat_buf_sz = flat_buf->blocklens[flat_buf_idx]; \
881 } \
882 size -= size_in_buf; \
883 buf_incr -= size_in_buf; \
884 } \
885 ADIOI_BUF_INCR \
886 }
887
ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd,const void * buf,ADIOI_Flatlist_node * flat_buf,char ** send_buf,ADIO_Offset * offset_list,ADIO_Offset * len_list,int * send_size,MPI_Request * requests,int * sent_to_proc,int nprocs,int myrank,int contig_access_count,int * striping_info,int * send_buf_idx,int * curr_to_proc,int * done_to_proc,int iter,MPI_Aint buftype_extent)888 static void ADIOI_LUSTRE_Fill_send_buffer(ADIO_File fd, const void *buf,
889 ADIOI_Flatlist_node *flat_buf,
890 char **send_buf,
891 ADIO_Offset *offset_list,
892 ADIO_Offset *len_list, int *send_size,
893 MPI_Request *requests,
894 int *sent_to_proc, int nprocs,
895 int myrank,
896 int contig_access_count,
897 int *striping_info,
898 int *send_buf_idx,
899 int *curr_to_proc,
900 int *done_to_proc, int iter,
901 MPI_Aint buftype_extent)
902 {
903 /* this function is only called if buftype is not contig */
904 int i, p, flat_buf_idx, size;
905 int flat_buf_sz, buf_incr, size_in_buf, jj, n_buftypes;
906 ADIO_Offset off, len, rem_len, user_buf_idx;
907
908 /* curr_to_proc[p] = amount of data sent to proc. p that has already
909 * been accounted for so far
910 * done_to_proc[p] = amount of data already sent to proc. p in
911 * previous iterations
912 * user_buf_idx = current location in user buffer
913 * send_buf_idx[p] = current location in send_buf of proc. p
914 */
915
916 for (i = 0; i < nprocs; i++) {
917 send_buf_idx[i] = curr_to_proc[i] = 0;
918 done_to_proc[i] = sent_to_proc[i];
919 }
920 jj = 0;
921
922 user_buf_idx = flat_buf->indices[0];
923 flat_buf_idx = 0;
924 n_buftypes = 0;
925 flat_buf_sz = flat_buf->blocklens[0];
926
927 /* flat_buf_idx = current index into flattened buftype
928 * flat_buf_sz = size of current contiguous component in flattened buf
929 */
930 for (i = 0; i < contig_access_count; i++) {
931 off = offset_list[i];
932 rem_len = (ADIO_Offset) len_list[i];
933
934 /*this request may span to more than one process */
935 while (rem_len != 0) {
936 len = rem_len;
937 /* NOTE: len value is modified by ADIOI_Calc_aggregator() to be no
938 * longer than the single region that processor "p" is responsible
939 * for.
940 */
941 p = ADIOI_LUSTRE_Calc_aggregator(fd, off, &len, striping_info);
942
943 if (send_buf_idx[p] < send_size[p]) {
944 if (curr_to_proc[p] + len > done_to_proc[p]) {
945 if (done_to_proc[p] > curr_to_proc[p]) {
946 size = (int) ADIOI_MIN(curr_to_proc[p] + len -
947 done_to_proc[p],
948 send_size[p] -
949 send_buf_idx[p]);
950 buf_incr = done_to_proc[p] - curr_to_proc[p];
951 ADIOI_BUF_INCR
952 ADIOI_Assert((curr_to_proc[p] + len - done_to_proc[p]) == (unsigned)(curr_to_proc[p] + len - done_to_proc[p]));
953 buf_incr = (int) (curr_to_proc[p] + len -
954 done_to_proc[p]);
955 ADIOI_Assert((done_to_proc[p] + size) == (unsigned)(done_to_proc[p] + size));
956 curr_to_proc[p] = done_to_proc[p] + size;
957 ADIOI_BUF_COPY
958 } else {
959 size = (int) ADIOI_MIN(len, send_size[p] -
960 send_buf_idx[p]);
961 buf_incr = (int) len;
962 ADIOI_Assert((curr_to_proc[p] + size) == (unsigned)((ADIO_Offset)curr_to_proc[p] + size));
963 curr_to_proc[p] += size;
964 ADIOI_BUF_COPY
965 }
966 if (send_buf_idx[p] == send_size[p]) {
967 MPI_Isend(send_buf[p], send_size[p], MPI_BYTE, p,
968 myrank + p + 100 * iter, fd->comm,
969 requests + jj);
970 jj++;
971 }
972 } else {
973 ADIOI_Assert((curr_to_proc[p] + len) == (unsigned)((ADIO_Offset)curr_to_proc[p] + len));
974 curr_to_proc[p] += (int) len;
975 buf_incr = (int) len;
976 ADIOI_BUF_INCR
977 }
978 } else {
979 buf_incr = (int) len;
980 ADIOI_BUF_INCR
981 }
982 off += len;
983 rem_len -= len;
984 }
985 }
986 for (i = 0; i < nprocs; i++)
987 if (send_size[i])
988 sent_to_proc[i] = curr_to_proc[i];
989 }
990