1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  *   Copyright (C) 1997 University of Chicago.
4  *   See COPYRIGHT notice in top-level directory.
5  *
6  *   Copyright (C) 2007 Oak Ridge National Laboratory
7  *
8  *   Copyright (C) 2008 Sun Microsystems, Lustre group
9  */
10 
11 #include "ad_lustre.h"
12 #include "adio_extern.h"
13 
14 #define ADIOI_BUFFERED_WRITE \
15 { \
16     if (req_off >= writebuf_off + writebuf_len) { \
17         if (writebuf_len) { \
18            ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \
19                              ADIO_EXPLICIT_OFFSET, writebuf_off,        \
20                              &status1, error_code);                     \
21            if (!(fd->atomicity)) \
22                 ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
23            if (*error_code != MPI_SUCCESS) { \
24                *error_code = MPIO_Err_create_code(*error_code, \
25                                                    MPIR_ERR_RECOVERABLE, \
26                                                    myname,              \
27                                                   __LINE__, MPI_ERR_IO, \
28                                                   "**iowswc", 0); \
29                ADIOI_Free(writebuf); \
30                return; \
31            } \
32         } \
33 	writebuf_off = req_off; \
34         /* stripe_size alignment */ \
35         writebuf_len = (unsigned) ADIOI_MIN(end_offset - writebuf_off + 1, \
36                                        (writebuf_off / stripe_size + 1) * \
37                                             stripe_size - writebuf_off); \
38 	if (!(fd->atomicity)) \
39             ADIOI_WRITE_LOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
40         ADIO_ReadContig(fd, writebuf, writebuf_len, MPI_BYTE,           \
41                         ADIO_EXPLICIT_OFFSET,                           \
42                         writebuf_off, &status1, error_code); \
43 	if (*error_code != MPI_SUCCESS) { \
44 	    *error_code = MPIO_Err_create_code(*error_code, \
45                                                MPIR_ERR_RECOVERABLE,    \
46                                                myname,                  \
47 					       __LINE__, MPI_ERR_IO, \
48 					       "**iowsrc", 0); \
49             ADIOI_Free(writebuf); \
50 	    return; \
51 	} \
52     } \
53     write_sz = (unsigned) (ADIOI_MIN(req_len,                           \
54                                      writebuf_off + writebuf_len - req_off)); \
55     ADIOI_Assert((ADIO_Offset)write_sz ==                               \
56                  ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off)); \
57     memcpy(writebuf + req_off - writebuf_off, (char *)buf +userbuf_off, write_sz); \
58     while (write_sz != req_len) {                                       \
59         ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \
60                          ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \
61         if (!(fd->atomicity)) \
62             ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
63         if (*error_code != MPI_SUCCESS) { \
64             *error_code = MPIO_Err_create_code(*error_code, \
65                                                MPIR_ERR_RECOVERABLE, myname, \
66                                                __LINE__, MPI_ERR_IO, \
67                                                "**iowswc", 0); \
68             ADIOI_Free(writebuf); \
69             return; \
70         } \
71         req_len -= write_sz; \
72         userbuf_off += write_sz; \
73         writebuf_off += writebuf_len; \
74         /* stripe_size alignment */ \
75         writebuf_len = (unsigned) ADIOI_MIN(end_offset - writebuf_off + 1, \
76                                        (writebuf_off / stripe_size + 1) * \
77                                             stripe_size - writebuf_off); \
78 	if (!(fd->atomicity)) \
79             ADIOI_WRITE_LOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
80         ADIO_ReadContig(fd, writebuf, writebuf_len, MPI_BYTE,           \
81                         ADIO_EXPLICIT_OFFSET,                           \
82                         writebuf_off, &status1, error_code); \
83 	if (*error_code != MPI_SUCCESS) { \
84 	    *error_code = MPIO_Err_create_code(*error_code, \
85 	                                       MPIR_ERR_RECOVERABLE, myname, \
86 		                               __LINE__, MPI_ERR_IO, \
87 				               "**iowsrc", 0); \
88             ADIOI_Free(writebuf); \
89 	    return; \
90 	} \
91         write_sz = ADIOI_MIN(req_len, writebuf_len); \
92         memcpy(writebuf, (char *)buf + userbuf_off, write_sz);          \
93     } \
94 }
95 
96 
97 /* this macro is used when filetype is contig and buftype is not contig.
98    it does not do a read-modify-write and does not lock*/
99 #define ADIOI_BUFFERED_WRITE_WITHOUT_READ \
100 { \
101     if (req_off >= writebuf_off + writebuf_len) { \
102         ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE,          \
103                          ADIO_EXPLICIT_OFFSET, writebuf_off, &status1,  \
104                          error_code);                                   \
105         if (*error_code != MPI_SUCCESS) {                               \
106             *error_code = MPIO_Err_create_code(*error_code,             \
107                                                MPIR_ERR_RECOVERABLE,    \
108                                                myname,                  \
109                                                __LINE__, MPI_ERR_IO,    \
110                                                "**iowswc", 0);          \
111             ADIOI_Free(writebuf);                                       \
112             return;                                                     \
113         }                                                               \
114 	writebuf_off = req_off; \
115         /* stripe_size alignment */ \
116         writebuf_len = (unsigned) ADIOI_MIN(end_offset - writebuf_off + 1, \
117                                        (writebuf_off / stripe_size + 1) * \
118                                             stripe_size - writebuf_off); \
119     } \
120     write_sz = (unsigned) ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off); \
121     ADIOI_Assert((ADIO_Offset)write_sz == ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off)); \
122     memcpy(writebuf + req_off - writebuf_off,                           \
123            (char *)buf + userbuf_off, write_sz);                        \
124     while (write_sz != req_len) {                                       \
125         ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \
126                          ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \
127         if (*error_code != MPI_SUCCESS) { \
128             *error_code = MPIO_Err_create_code(*error_code, \
129                                                MPIR_ERR_RECOVERABLE, myname, \
130                                                __LINE__, MPI_ERR_IO, \
131                                                "**iowswc", 0); \
132             ADIOI_Free(writebuf); \
133             return; \
134         } \
135         req_len -= write_sz; \
136         userbuf_off += write_sz; \
137         writebuf_off += writebuf_len; \
138         /* stripe_size alignment */ \
139         writebuf_len = (unsigned) ADIOI_MIN(end_offset - writebuf_off + 1, \
140                                        (writebuf_off / stripe_size + 1) * \
141                                             stripe_size - writebuf_off); \
142         write_sz = ADIOI_MIN(req_len, writebuf_len); \
143         memcpy(writebuf, (char *)buf + userbuf_off, write_sz);          \
144     } \
145 }
146 
ADIOI_LUSTRE_WriteStrided(ADIO_File fd,const void * buf,int count,MPI_Datatype datatype,int file_ptr_type,ADIO_Offset offset,ADIO_Status * status,int * error_code)147 void ADIOI_LUSTRE_WriteStrided(ADIO_File fd, const void *buf, int count,
148 			       MPI_Datatype datatype, int file_ptr_type,
149 			       ADIO_Offset offset, ADIO_Status * status,
150 			       int *error_code)
151 {
152     /* offset is in units of etype relative to the filetype. */
153     ADIOI_Flatlist_node *flat_buf, *flat_file;
154     ADIO_Offset i_offset, sum, size_in_filetype;
155     int i, j, k, st_index=0;
156     int n_etypes_in_filetype;
157     ADIO_Offset num, size, n_filetypes, etype_in_filetype, st_n_filetypes;
158     ADIO_Offset abs_off_in_filetype=0;
159     MPI_Count filetype_size, etype_size, buftype_size;
160     MPI_Aint filetype_extent, buftype_extent, filetype_lb, buftype_lb;
161     int buf_count, buftype_is_contig, filetype_is_contig;
162     ADIO_Offset userbuf_off;
163     ADIO_Offset off, req_off, disp, end_offset=0, writebuf_off, start_off;
164     char *writebuf;
165     unsigned bufsize, writebuf_len, write_sz;
166     ADIO_Status status1;
167     ADIO_Offset new_bwr_size, new_fwr_size, st_fwr_size, fwr_size=0, bwr_size, req_len;
168     int stripe_size;
169     static char myname[] = "ADIOI_LUSTRE_WriteStrided";
170 
171     if (fd->hints->ds_write == ADIOI_HINT_DISABLE) {
172 	/* if user has disabled data sieving on writes, use naive
173 	 * approach instead.
174 	 */
175 	ADIOI_GEN_WriteStrided_naive(fd,
176 				     buf,
177 				     count,
178 				     datatype,
179 				     file_ptr_type,
180 				     offset, status, error_code);
181 	return;
182     }
183 
184     *error_code = MPI_SUCCESS;	/* changed below if error */
185 
186     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
187     ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
188 
189     MPI_Type_size_x(fd->filetype, &filetype_size);
190     if (!filetype_size) {
191 #ifdef HAVE_STATUS_SET_BYTES
192 	MPIR_Status_set_bytes(status, datatype, 0);
193 #endif
194 	*error_code = MPI_SUCCESS;
195 	return;
196     }
197 
198     MPI_Type_get_extent(fd->filetype, &filetype_lb, &filetype_extent);
199     MPI_Type_size_x(datatype, &buftype_size);
200     MPI_Type_get_extent(datatype, &buftype_lb, &buftype_extent);
201     etype_size = fd->etype_size;
202 
203     ADIOI_Assert((buftype_size * count) == ((ADIO_Offset)(unsigned)buftype_size * (ADIO_Offset)count));
204     bufsize = buftype_size * count;
205 
206     /* get striping info */
207     stripe_size = fd->hints->striping_unit;
208 
209     /* Different buftype to different filetype */
210     if (!buftype_is_contig && filetype_is_contig) {
211         /* noncontiguous in memory, contiguous in file. */
212 	flat_buf = ADIOI_Flatten_and_find(datatype);
213 
214 	off = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind :
215             fd->disp + (ADIO_Offset)etype_size * offset;
216 
217 	start_off = off;
218 	end_offset = start_off + bufsize - 1;
219         /* write stripe size buffer each time */
220 	writebuf = (char *) ADIOI_Malloc(ADIOI_MIN(bufsize, stripe_size));
221         writebuf_off = 0;
222         writebuf_len = 0;
223 
224         /* if atomicity is true, lock the region to be accessed */
225 	if (fd->atomicity)
226 	    ADIOI_WRITE_LOCK(fd, start_off, SEEK_SET, bufsize);
227 
228 	for (j = 0; j < count; j++) {
229 	    for (i = 0; i < flat_buf->count; i++) {
230                 userbuf_off = (ADIO_Offset)j * (ADIO_Offset)buftype_extent +
231                     flat_buf->indices[i];
232 		req_off = off;
233 		req_len = flat_buf->blocklens[i];
234 		ADIOI_BUFFERED_WRITE_WITHOUT_READ
235 		off += flat_buf->blocklens[i];
236 	    }
237         }
238 
239 	/* write the buffer out finally */
240 	ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE,
241 			 ADIO_EXPLICIT_OFFSET, writebuf_off, &status1,
242 			 error_code);
243 
244 	if (fd->atomicity)
245 	    ADIOI_UNLOCK(fd, start_off, SEEK_SET, bufsize);
246 	if (*error_code != MPI_SUCCESS) {
247             ADIOI_Free(writebuf);
248 	    return;
249         }
250 	ADIOI_Free(writebuf);
251 	if (file_ptr_type == ADIO_INDIVIDUAL)
252 	    fd->fp_ind = off;
253     } else {
254         /* noncontiguous in file */
255         /* filetype already flattened in ADIO_Open */
256 	flat_file = ADIOI_Flatlist;
257 	while (flat_file->type != fd->filetype)
258 	    flat_file = flat_file->next;
259 	disp = fd->disp;
260 
261 	if (file_ptr_type == ADIO_INDIVIDUAL) {
262             /* Wei-keng reworked type processing to be a bit more efficient */
263             offset       = fd->fp_ind - disp;
264             n_filetypes  = (offset - flat_file->indices[0]) / filetype_extent;
265             offset      -= (ADIO_Offset)n_filetypes * filetype_extent;
266             /* now offset is local to this extent */
267 
268             /* find the block where offset is located, skip blocklens[i]==0 */
269             for (i=0; i<flat_file->count; i++) {
270                 ADIO_Offset dist;
271                 if (flat_file->blocklens[i] == 0) continue;
272                 dist = flat_file->indices[i] + flat_file->blocklens[i] - offset;
273                 /* fwr_size is from offset to the end of block i */
274                 if (dist == 0) {
275                     i++;
276                     offset   = flat_file->indices[i];
277                     fwr_size = flat_file->blocklens[i];
278 			break;
279 		    }
280                 if (dist > 0) {
281                     fwr_size = dist;
282                     break;
283 		}
284 	    }
285             st_index = i;  /* starting index in flat_file->indices[] */
286             offset += disp + (ADIO_Offset)n_filetypes*filetype_extent;
287         }
288         else {
289             n_etypes_in_filetype = filetype_size/etype_size;
290             n_filetypes = offset / n_etypes_in_filetype;
291             etype_in_filetype = offset % n_etypes_in_filetype;
292 	    size_in_filetype = etype_in_filetype * etype_size;
293 
294 	    sum = 0;
295 	    for (i = 0; i < flat_file->count; i++) {
296 		sum += flat_file->blocklens[i];
297 		if (sum > size_in_filetype) {
298 		    st_index = i;
299 		    fwr_size = sum - size_in_filetype;
300 		    abs_off_in_filetype = flat_file->indices[i] +
301 			size_in_filetype - (sum - flat_file->blocklens[i]);
302 		    break;
303 		}
304 	    }
305 
306 	    /* abs. offset in bytes in the file */
307 	    offset = disp + (ADIO_Offset) n_filetypes *filetype_extent +
308 		     abs_off_in_filetype;
309 	}
310 
311 	start_off = offset;
312 
313         /* Wei-keng Liao:write request is within single flat_file
314          * contig block*/
315         /* this could happen, for example, with subarray types that are
316          * actually fairly contiguous */
317         if (buftype_is_contig && bufsize <= fwr_size) {
318             req_off = start_off;
319             req_len = bufsize;
320             end_offset = start_off + bufsize - 1;
321 	    writebuf = (char *) ADIOI_Malloc(ADIOI_MIN(bufsize, stripe_size));
322 	    memset(writebuf, -1, ADIOI_MIN(bufsize, stripe_size));
323             writebuf_off = 0;
324             writebuf_len = 0;
325             userbuf_off = 0;
326             ADIOI_BUFFERED_WRITE_WITHOUT_READ
327             /* write the buffer out finally */
328             ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE,
329                              ADIO_EXPLICIT_OFFSET, writebuf_off, &status1,
330                              error_code);
331 
332             if (file_ptr_type == ADIO_INDIVIDUAL) {
333                 /* update MPI-IO file pointer to point to the first byte
334                  * that can be accessed in the fileview. */
335                 fd->fp_ind = offset + bufsize;
336                 if (bufsize == fwr_size) {
337                     do {
338                         st_index++;
339                         if (st_index == flat_file->count) {
340                             st_index = 0;
341                             n_filetypes++;
342                         }
343                     } while (flat_file->blocklens[st_index] == 0);
344                     fd->fp_ind = disp + flat_file->indices[st_index]
345                         + (ADIO_Offset)n_filetypes*filetype_extent;
346                 }
347             }
348             fd->fp_sys_posn = -1;   /* set it to null. */
349 #ifdef HAVE_STATUS_SET_BYTES
350             MPIR_Status_set_bytes(status, datatype, bufsize);
351 #endif
352             ADIOI_Free(writebuf);
353             return;
354         }
355 
356 	    /* Calculate end_offset, the last byte-offset that will be accessed.
357            e.g., if start_offset=0 and 100 bytes to be write, end_offset=99*/
358 
359 	    st_fwr_size = fwr_size;
360 	    st_n_filetypes = n_filetypes;
361         i_offset = 0;
362 	    j = st_index;
363 	    off = offset;
364 	    fwr_size = ADIOI_MIN(st_fwr_size, bufsize);
365         while (i_offset < bufsize) {
366             i_offset += fwr_size;
367 		end_offset = off + fwr_size - 1;
368 
369             j = (j+1) % flat_file->count;
370             n_filetypes += (j == 0) ? 1 : 0;
371             while (flat_file->blocklens[j]==0) {
372                 j = (j+1) % flat_file->count;
373                 n_filetypes += (j == 0) ? 1 : 0;
374 		}
375 
376 		off = disp + flat_file->indices[j] +
377                 n_filetypes*(ADIO_Offset)filetype_extent;
378             fwr_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i_offset);
379 	    }
380 
381 /* if atomicity is true, lock the region to be accessed */
382         if (fd->atomicity)
383             ADIOI_WRITE_LOCK(fd, start_off, SEEK_SET, end_offset-start_off+1);
384 
385 	    writebuf_off = 0;
386 	    writebuf_len = 0;
387 	    writebuf = (char *) ADIOI_Malloc(stripe_size);
388 	    memset(writebuf, -1, stripe_size);
389 
390 	    if (buftype_is_contig && !filetype_is_contig) {
391 
392 /* contiguous in memory, noncontiguous in file. should be the most
393 		   common case. */
394 
395             i_offset = 0;
396 		j = st_index;
397 		off = offset;
398 		n_filetypes = st_n_filetypes;
399 		fwr_size = ADIOI_MIN(st_fwr_size, bufsize);
400             while (i_offset < bufsize) {
401 		    if (fwr_size) {
402 			/* TYPE_UB and TYPE_LB can result in
403 			   fwr_size = 0. save system call in such cases */
404                     /* lseek(fd->fd_sys, off, SEEK_SET);
405                        err = write(fd->fd_sys, ((char *) buf) + i_offset, fwr_size);*/
406 
407 			req_off = off;
408 			req_len = fwr_size;
409                     userbuf_off = i_offset;
410 			ADIOI_BUFFERED_WRITE
411                     }
412                 i_offset += fwr_size;
413 
414 		    if (off + fwr_size < disp + flat_file->indices[j] +
415 		                         flat_file->blocklens[j] +
416                     n_filetypes*(ADIO_Offset)filetype_extent)
417 		        off += fwr_size;
418 		    /* did not reach end of contiguous block in filetype.
419 		    no more I/O needed. off is incremented by fwr_size. */
420 		    else {
421                     j = (j+1) % flat_file->count;
422                     n_filetypes += (j == 0) ? 1 : 0;
423                     while (flat_file->blocklens[j]==0) {
424                         j = (j+1) % flat_file->count;
425                         n_filetypes += (j == 0) ? 1 : 0;
426 			}
427 			off = disp + flat_file->indices[j] +
428                         n_filetypes*(ADIO_Offset)filetype_extent;
429 			fwr_size = ADIOI_MIN(flat_file->blocklens[j],
430                                          bufsize-i_offset);
431 		    }
432 		}
433         }
434         else {
435 /* noncontiguous in memory as well as in file */
436 	    flat_buf = ADIOI_Flatten_and_find(datatype);
437 
438 		k = num = buf_count = 0;
439             i_offset = flat_buf->indices[0];
440 		j = st_index;
441 		off = offset;
442 		n_filetypes = st_n_filetypes;
443 		fwr_size = st_fwr_size;
444 		bwr_size = flat_buf->blocklens[0];
445 
446 		while (num < bufsize) {
447 		    size = ADIOI_MIN(fwr_size, bwr_size);
448 		    if (size) {
449                     /* lseek(fd->fd_sys, off, SEEK_SET);
450                        err = write(fd->fd_sys, ((char *) buf) + i_offset, size); */
451 
452 		        req_off = off;
453 		        req_len = size;
454                     userbuf_off = i_offset;
455 		        ADIOI_BUFFERED_WRITE
456                     }
457 
458 	            new_fwr_size = fwr_size;
459 		    new_bwr_size = bwr_size;
460 
461 		    if (size == fwr_size) {
462 /* reached end of contiguous block in file */
463                     j = (j+1) % flat_file->count;
464                     n_filetypes += (j == 0) ? 1 : 0;
465                     while (flat_file->blocklens[j]==0) {
466                         j = (j+1) % flat_file->count;
467                         n_filetypes += (j == 0) ? 1 : 0;
468 			}
469 
470 			off = disp + flat_file->indices[j] +
471                         n_filetypes*(ADIO_Offset)filetype_extent;
472 
473                         new_fwr_size = flat_file->blocklens[j];
474 			if (size != bwr_size) {
475                         i_offset += size;
476 			    new_bwr_size -= size;
477 			}
478 		    }
479 
480 		    if (size == bwr_size) {
481 /* reached end of contiguous block in memory */
482 
483                     k = (k + 1)%flat_buf->count;
484 		        buf_count++;
485                     i_offset = (ADIO_Offset)buftype_extent *
486                         (ADIO_Offset)(buf_count/flat_buf->count) +
487                         flat_buf->indices[k];
488 			new_bwr_size = flat_buf->blocklens[k];
489 			if (size != fwr_size) {
490 			    off += size;
491 			    new_fwr_size -= size;
492 			}
493 		    }
494 		    num += size;
495 		    fwr_size = new_fwr_size;
496 		    bwr_size = new_bwr_size;
497 		}
498             }
499 
500 	    /* write the buffer out finally */
501 	    if (writebuf_len) {
502             ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE,
503                              ADIO_EXPLICIT_OFFSET,
504 	                         writebuf_off, &status1, error_code);
505 		if (!(fd->atomicity))
506 		    ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len);
507             if (*error_code != MPI_SUCCESS) return;
508 	    }
509 	    if (fd->atomicity)
510             ADIOI_UNLOCK(fd, start_off, SEEK_SET, end_offset-start_off+1);
511 
512         ADIOI_Free(writebuf);
513 
514         if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = off;
515     }
516 
517     fd->fp_sys_posn = -1;	/* set it to null. */
518 
519 #ifdef HAVE_STATUS_SET_BYTES
520     MPIR_Status_set_bytes(status, datatype, bufsize);
521 /* This is a temporary way of filling in status. The right way is to
522     keep track of how much data was actually written by ADIOI_BUFFERED_WRITE. */
523 #endif
524 
525     if (!buftype_is_contig)
526         ADIOI_Delete_flattened(datatype);
527 }
528