1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  *   Copyright (C) 1997 University of Chicago.
4  *   See COPYRIGHT notice in top-level directory.
5  *
6  *   Copyright (C) 2007 Oak Ridge National Laboratory
7  *
8  *   Copyright (C) 2008 Sun Microsystems, Lustre group
9  */
10 
11 #include "ad_lustre.h"
12 #include "adio_extern.h"
13 
14 #define ADIOI_BUFFERED_WRITE \
15 { \
16     if (req_off >= writebuf_off + writebuf_len) { \
17         if (writebuf_len) { \
18            ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \
19                              ADIO_EXPLICIT_OFFSET, writebuf_off,        \
20                              &status1, error_code);                     \
21            if (!(fd->atomicity)) \
22                 ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
23            if (*error_code != MPI_SUCCESS) { \
24                *error_code = MPIO_Err_create_code(*error_code, \
25                                                    MPIR_ERR_RECOVERABLE, \
26                                                    myname,              \
27                                                   __LINE__, MPI_ERR_IO, \
28                                                   "**iowswc", 0); \
29                ADIOI_Free(writebuf); \
30                return; \
31            } \
32         } \
33 	writebuf_off = req_off; \
34         /* stripe_size alignment */ \
35         writebuf_len = (unsigned) ADIOI_MIN(end_offset - writebuf_off + 1, \
36                                        (writebuf_off / stripe_size + 1) * \
37                                             stripe_size - writebuf_off); \
38 	if (!(fd->atomicity)) \
39             ADIOI_WRITE_LOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
40         ADIO_ReadContig(fd, writebuf, writebuf_len, MPI_BYTE,           \
41                         ADIO_EXPLICIT_OFFSET,                           \
42                         writebuf_off, &status1, error_code); \
43 	if (*error_code != MPI_SUCCESS) { \
44 	    *error_code = MPIO_Err_create_code(*error_code, \
45                                                MPIR_ERR_RECOVERABLE,    \
46                                                myname,                  \
47 					       __LINE__, MPI_ERR_IO, \
48 					       "**iowsrc", 0); \
49             ADIOI_Free(writebuf); \
50 	    return; \
51 	} \
52     } \
53     write_sz = (unsigned) (ADIOI_MIN(req_len,                           \
54                                      writebuf_off + writebuf_len - req_off)); \
55     ADIOI_Assert((ADIO_Offset)write_sz ==                               \
56                  ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off)); \
57     memcpy(writebuf + req_off - writebuf_off, (char *)buf +userbuf_off, write_sz); \
58     while (write_sz != req_len) {                                       \
59         ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \
60                          ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \
61         if (!(fd->atomicity)) \
62             ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
63         if (*error_code != MPI_SUCCESS) { \
64             *error_code = MPIO_Err_create_code(*error_code, \
65                                                MPIR_ERR_RECOVERABLE, myname, \
66                                                __LINE__, MPI_ERR_IO, \
67                                                "**iowswc", 0); \
68             ADIOI_Free(writebuf); \
69             return; \
70         } \
71         req_len -= write_sz; \
72         userbuf_off += write_sz; \
73         writebuf_off += writebuf_len; \
74         /* stripe_size alignment */ \
75         writebuf_len = (unsigned) ADIOI_MIN(end_offset - writebuf_off + 1, \
76                                        (writebuf_off / stripe_size + 1) * \
77                                             stripe_size - writebuf_off); \
78 	if (!(fd->atomicity)) \
79             ADIOI_WRITE_LOCK(fd, writebuf_off, SEEK_SET, writebuf_len); \
80         ADIO_ReadContig(fd, writebuf, writebuf_len, MPI_BYTE,           \
81                         ADIO_EXPLICIT_OFFSET,                           \
82                         writebuf_off, &status1, error_code); \
83 	if (*error_code != MPI_SUCCESS) { \
84 	    *error_code = MPIO_Err_create_code(*error_code, \
85 	                                       MPIR_ERR_RECOVERABLE, myname, \
86 		                               __LINE__, MPI_ERR_IO, \
87 				               "**iowsrc", 0); \
88             ADIOI_Free(writebuf); \
89 	    return; \
90 	} \
91         write_sz = ADIOI_MIN(req_len, writebuf_len); \
92         memcpy(writebuf, (char *)buf + userbuf_off, write_sz);          \
93     } \
94 }
95 
96 
97 /* this macro is used when filetype is contig and buftype is not contig.
98    it does not do a read-modify-write and does not lock*/
99 #define ADIOI_BUFFERED_WRITE_WITHOUT_READ \
100 { \
101     if (req_off >= writebuf_off + writebuf_len) { \
102         ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE,          \
103                          ADIO_EXPLICIT_OFFSET, writebuf_off, &status1,  \
104                          error_code);                                   \
105         if (*error_code != MPI_SUCCESS) {                               \
106             *error_code = MPIO_Err_create_code(*error_code,             \
107                                                MPIR_ERR_RECOVERABLE,    \
108                                                myname,                  \
109                                                __LINE__, MPI_ERR_IO,    \
110                                                "**iowswc", 0);          \
111             ADIOI_Free(writebuf);                                       \
112             return;                                                     \
113         }                                                               \
114 	writebuf_off = req_off; \
115         /* stripe_size alignment */ \
116         writebuf_len = (unsigned) ADIOI_MIN(end_offset - writebuf_off + 1, \
117                                        (writebuf_off / stripe_size + 1) * \
118                                             stripe_size - writebuf_off); \
119     } \
120     write_sz = (unsigned) ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off); \
121     ADIOI_Assert((ADIO_Offset)write_sz == ADIOI_MIN(req_len, writebuf_off + writebuf_len - req_off)); \
122     memcpy(writebuf + req_off - writebuf_off,                           \
123            (char *)buf + userbuf_off, write_sz);                        \
124     while (write_sz != req_len) {                                       \
125         ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE, \
126                          ADIO_EXPLICIT_OFFSET, writebuf_off, &status1, error_code); \
127         if (*error_code != MPI_SUCCESS) { \
128             *error_code = MPIO_Err_create_code(*error_code, \
129                                                MPIR_ERR_RECOVERABLE, myname, \
130                                                __LINE__, MPI_ERR_IO, \
131                                                "**iowswc", 0); \
132             ADIOI_Free(writebuf); \
133             return; \
134         } \
135         req_len -= write_sz; \
136         userbuf_off += write_sz; \
137         writebuf_off += writebuf_len; \
138         /* stripe_size alignment */ \
139         writebuf_len = (unsigned) ADIOI_MIN(end_offset - writebuf_off + 1, \
140                                        (writebuf_off / stripe_size + 1) * \
141                                             stripe_size - writebuf_off); \
142         write_sz = ADIOI_MIN(req_len, writebuf_len); \
143         memcpy(writebuf, (char *)buf + userbuf_off, write_sz);          \
144     } \
145 }
146 
ADIOI_LUSTRE_WriteStrided(ADIO_File fd,const void * buf,int count,MPI_Datatype datatype,int file_ptr_type,ADIO_Offset offset,ADIO_Status * status,int * error_code)147 void ADIOI_LUSTRE_WriteStrided(ADIO_File fd, const void *buf, int count,
148 			       MPI_Datatype datatype, int file_ptr_type,
149 			       ADIO_Offset offset, ADIO_Status * status,
150 			       int *error_code)
151 {
152     /* offset is in units of etype relative to the filetype. */
153     ADIOI_Flatlist_node *flat_buf, *flat_file;
154     ADIO_Offset i_offset, sum, size_in_filetype;
155     int i, j, k, st_index=0;
156     int n_etypes_in_filetype;
157     ADIO_Offset num, size, n_filetypes, etype_in_filetype, st_n_filetypes;
158     ADIO_Offset abs_off_in_filetype=0;
159     MPI_Count filetype_size, etype_size, buftype_size;
160     MPI_Aint filetype_extent, buftype_extent;
161     int buf_count, buftype_is_contig, filetype_is_contig;
162     ADIO_Offset userbuf_off;
163     ADIO_Offset off, req_off, disp, end_offset=0, writebuf_off, start_off;
164     char *writebuf;
165     unsigned bufsize, writebuf_len, write_sz;
166     ADIO_Status status1;
167     ADIO_Offset new_bwr_size, new_fwr_size, st_fwr_size, fwr_size=0, bwr_size, req_len;
168     int stripe_size;
169     static char myname[] = "ADIOI_LUSTRE_WriteStrided";
170 
171     if (fd->hints->ds_write == ADIOI_HINT_DISABLE) {
172 	/* if user has disabled data sieving on writes, use naive
173 	 * approach instead.
174 	 */
175 	ADIOI_GEN_WriteStrided_naive(fd,
176 				     buf,
177 				     count,
178 				     datatype,
179 				     file_ptr_type,
180 				     offset, status, error_code);
181 	return;
182     }
183 
184     *error_code = MPI_SUCCESS;	/* changed below if error */
185 
186     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
187     ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
188 
189     MPI_Type_size_x(fd->filetype, &filetype_size);
190     if (!filetype_size) {
191 #ifdef HAVE_STATUS_SET_BYTES
192 	MPIR_Status_set_bytes(status, datatype, 0);
193 #endif
194 	*error_code = MPI_SUCCESS;
195 	return;
196     }
197 
198     MPI_Type_extent(fd->filetype, &filetype_extent);
199     MPI_Type_size_x(datatype, &buftype_size);
200     MPI_Type_extent(datatype, &buftype_extent);
201     etype_size = fd->etype_size;
202 
203     ADIOI_Assert((buftype_size * count) == ((ADIO_Offset)(unsigned)buftype_size * (ADIO_Offset)count));
204     bufsize = buftype_size * count;
205 
206     /* get striping info */
207     stripe_size = fd->hints->striping_unit;
208 
209     /* Different buftype to different filetype */
210     if (!buftype_is_contig && filetype_is_contig) {
211         /* noncontiguous in memory, contiguous in file. */
212 	ADIOI_Flatten_datatype(datatype);
213 	flat_buf = ADIOI_Flatlist;
214 	while (flat_buf->type != datatype)
215 	    flat_buf = flat_buf->next;
216 
217 	off = (file_ptr_type == ADIO_INDIVIDUAL) ? fd->fp_ind :
218             fd->disp + (ADIO_Offset)etype_size * offset;
219 
220 	start_off = off;
221 	end_offset = start_off + bufsize - 1;
222         /* write stripe size buffer each time */
223 	writebuf = (char *) ADIOI_Malloc(ADIOI_MIN(bufsize, stripe_size));
224         writebuf_off = 0;
225         writebuf_len = 0;
226 
227         /* if atomicity is true, lock the region to be accessed */
228 	if (fd->atomicity)
229 	    ADIOI_WRITE_LOCK(fd, start_off, SEEK_SET, bufsize);
230 
231 	for (j = 0; j < count; j++) {
232 	    for (i = 0; i < flat_buf->count; i++) {
233                 userbuf_off = (ADIO_Offset)j * (ADIO_Offset)buftype_extent +
234                     flat_buf->indices[i];
235 		req_off = off;
236 		req_len = flat_buf->blocklens[i];
237 		ADIOI_BUFFERED_WRITE_WITHOUT_READ
238 		off += flat_buf->blocklens[i];
239 	    }
240         }
241 
242 	/* write the buffer out finally */
243 	ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE,
244 			 ADIO_EXPLICIT_OFFSET, writebuf_off, &status1,
245 			 error_code);
246 
247 	if (fd->atomicity)
248 	    ADIOI_UNLOCK(fd, start_off, SEEK_SET, bufsize);
249 	if (*error_code != MPI_SUCCESS) {
250             ADIOI_Free(writebuf);
251 	    return;
252         }
253 	ADIOI_Free(writebuf);
254 	if (file_ptr_type == ADIO_INDIVIDUAL)
255 	    fd->fp_ind = off;
256     } else {
257         /* noncontiguous in file */
258         /* filetype already flattened in ADIO_Open */
259 	flat_file = ADIOI_Flatlist;
260 	while (flat_file->type != fd->filetype)
261 	    flat_file = flat_file->next;
262 	disp = fd->disp;
263 
264 	if (file_ptr_type == ADIO_INDIVIDUAL) {
265             /* Wei-keng reworked type processing to be a bit more efficient */
266             offset       = fd->fp_ind - disp;
267             n_filetypes  = (offset - flat_file->indices[0]) / filetype_extent;
268             offset      -= (ADIO_Offset)n_filetypes * filetype_extent;
269             /* now offset is local to this extent */
270 
271             /* find the block where offset is located, skip blocklens[i]==0 */
272             for (i=0; i<flat_file->count; i++) {
273                 ADIO_Offset dist;
274                 if (flat_file->blocklens[i] == 0) continue;
275                 dist = flat_file->indices[i] + flat_file->blocklens[i] - offset;
276                 /* fwr_size is from offset to the end of block i */
277                 if (dist == 0) {
278                     i++;
279                     offset   = flat_file->indices[i];
280                     fwr_size = flat_file->blocklens[i];
281 			break;
282 		    }
283                 if (dist > 0) {
284                     fwr_size = dist;
285                     break;
286 		}
287 	    }
288             st_index = i;  /* starting index in flat_file->indices[] */
289             offset += disp + (ADIO_Offset)n_filetypes*filetype_extent;
290         }
291         else {
292             n_etypes_in_filetype = filetype_size/etype_size;
293             n_filetypes = offset / n_etypes_in_filetype;
294             etype_in_filetype = offset % n_etypes_in_filetype;
295 	    size_in_filetype = etype_in_filetype * etype_size;
296 
297 	    sum = 0;
298 	    for (i = 0; i < flat_file->count; i++) {
299 		sum += flat_file->blocklens[i];
300 		if (sum > size_in_filetype) {
301 		    st_index = i;
302 		    fwr_size = sum - size_in_filetype;
303 		    abs_off_in_filetype = flat_file->indices[i] +
304 			size_in_filetype - (sum - flat_file->blocklens[i]);
305 		    break;
306 		}
307 	    }
308 
309 	    /* abs. offset in bytes in the file */
310 	    offset = disp + (ADIO_Offset) n_filetypes *filetype_extent +
311 		     abs_off_in_filetype;
312 	}
313 
314 	start_off = offset;
315 
316         /* Wei-keng Liao:write request is within single flat_file
317          * contig block*/
318         /* this could happen, for example, with subarray types that are
319          * actually fairly contiguous */
320         if (buftype_is_contig && bufsize <= fwr_size) {
321             req_off = start_off;
322             req_len = bufsize;
323             end_offset = start_off + bufsize - 1;
324 	    writebuf = (char *) ADIOI_Malloc(ADIOI_MIN(bufsize, stripe_size));
325 	    memset(writebuf, -1, ADIOI_MIN(bufsize, stripe_size));
326             writebuf_off = 0;
327             writebuf_len = 0;
328             userbuf_off = 0;
329             ADIOI_BUFFERED_WRITE_WITHOUT_READ
330             /* write the buffer out finally */
331             ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE,
332                              ADIO_EXPLICIT_OFFSET, writebuf_off, &status1,
333                              error_code);
334 
335             if (file_ptr_type == ADIO_INDIVIDUAL) {
336                 /* update MPI-IO file pointer to point to the first byte
337                  * that can be accessed in the fileview. */
338                 fd->fp_ind = offset + bufsize;
339                 if (bufsize == fwr_size) {
340                     do {
341                         st_index++;
342                         if (st_index == flat_file->count) {
343                             st_index = 0;
344                             n_filetypes++;
345                         }
346                     } while (flat_file->blocklens[st_index] == 0);
347                     fd->fp_ind = disp + flat_file->indices[st_index]
348                         + (ADIO_Offset)n_filetypes*filetype_extent;
349                 }
350             }
351             fd->fp_sys_posn = -1;   /* set it to null. */
352 #ifdef HAVE_STATUS_SET_BYTES
353             MPIR_Status_set_bytes(status, datatype, bufsize);
354 #endif
355             ADIOI_Free(writebuf);
356             return;
357         }
358 
359 	    /* Calculate end_offset, the last byte-offset that will be accessed.
360            e.g., if start_offset=0 and 100 bytes to be write, end_offset=99*/
361 
362 	    st_fwr_size = fwr_size;
363 	    st_n_filetypes = n_filetypes;
364         i_offset = 0;
365 	    j = st_index;
366 	    off = offset;
367 	    fwr_size = ADIOI_MIN(st_fwr_size, bufsize);
368         while (i_offset < bufsize) {
369             i_offset += fwr_size;
370 		end_offset = off + fwr_size - 1;
371 
372             j = (j+1) % flat_file->count;
373             n_filetypes += (j == 0) ? 1 : 0;
374             while (flat_file->blocklens[j]==0) {
375                 j = (j+1) % flat_file->count;
376                 n_filetypes += (j == 0) ? 1 : 0;
377 		}
378 
379 		off = disp + flat_file->indices[j] +
380                 n_filetypes*(ADIO_Offset)filetype_extent;
381             fwr_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i_offset);
382 	    }
383 
384 /* if atomicity is true, lock the region to be accessed */
385         if (fd->atomicity)
386             ADIOI_WRITE_LOCK(fd, start_off, SEEK_SET, end_offset-start_off+1);
387 
388 	    writebuf_off = 0;
389 	    writebuf_len = 0;
390 	    writebuf = (char *) ADIOI_Malloc(stripe_size);
391 	    memset(writebuf, -1, stripe_size);
392 
393 	    if (buftype_is_contig && !filetype_is_contig) {
394 
395 /* contiguous in memory, noncontiguous in file. should be the most
396 		   common case. */
397 
398             i_offset = 0;
399 		j = st_index;
400 		off = offset;
401 		n_filetypes = st_n_filetypes;
402 		fwr_size = ADIOI_MIN(st_fwr_size, bufsize);
403             while (i_offset < bufsize) {
404 		    if (fwr_size) {
405 			/* TYPE_UB and TYPE_LB can result in
406 			   fwr_size = 0. save system call in such cases */
407                     /* lseek(fd->fd_sys, off, SEEK_SET);
408                        err = write(fd->fd_sys, ((char *) buf) + i_offset, fwr_size);*/
409 
410 			req_off = off;
411 			req_len = fwr_size;
412                     userbuf_off = i_offset;
413 			ADIOI_BUFFERED_WRITE
414                     }
415                 i_offset += fwr_size;
416 
417 		    if (off + fwr_size < disp + flat_file->indices[j] +
418 		                         flat_file->blocklens[j] +
419                     n_filetypes*(ADIO_Offset)filetype_extent)
420 		        off += fwr_size;
421 		    /* did not reach end of contiguous block in filetype.
422 		    no more I/O needed. off is incremented by fwr_size. */
423 		    else {
424                     j = (j+1) % flat_file->count;
425                     n_filetypes += (j == 0) ? 1 : 0;
426                     while (flat_file->blocklens[j]==0) {
427                         j = (j+1) % flat_file->count;
428                         n_filetypes += (j == 0) ? 1 : 0;
429 			}
430 			off = disp + flat_file->indices[j] +
431                         n_filetypes*(ADIO_Offset)filetype_extent;
432 			fwr_size = ADIOI_MIN(flat_file->blocklens[j],
433                                          bufsize-i_offset);
434 		    }
435 		}
436         }
437         else {
438 /* noncontiguous in memory as well as in file */
439 
440 	        ADIOI_Flatten_datatype(datatype);
441 	        flat_buf = ADIOI_Flatlist;
442             while (flat_buf->type != datatype) flat_buf = flat_buf->next;
443 
444 		k = num = buf_count = 0;
445             i_offset = flat_buf->indices[0];
446 		j = st_index;
447 		off = offset;
448 		n_filetypes = st_n_filetypes;
449 		fwr_size = st_fwr_size;
450 		bwr_size = flat_buf->blocklens[0];
451 
452 		while (num < bufsize) {
453 		    size = ADIOI_MIN(fwr_size, bwr_size);
454 		    if (size) {
455                     /* lseek(fd->fd_sys, off, SEEK_SET);
456                        err = write(fd->fd_sys, ((char *) buf) + i_offset, size); */
457 
458 		        req_off = off;
459 		        req_len = size;
460                     userbuf_off = i_offset;
461 		        ADIOI_BUFFERED_WRITE
462                     }
463 
464 	            new_fwr_size = fwr_size;
465 		    new_bwr_size = bwr_size;
466 
467 		    if (size == fwr_size) {
468 /* reached end of contiguous block in file */
469                     j = (j+1) % flat_file->count;
470                     n_filetypes += (j == 0) ? 1 : 0;
471                     while (flat_file->blocklens[j]==0) {
472                         j = (j+1) % flat_file->count;
473                         n_filetypes += (j == 0) ? 1 : 0;
474 			}
475 
476 			off = disp + flat_file->indices[j] +
477                         n_filetypes*(ADIO_Offset)filetype_extent;
478 
479                         new_fwr_size = flat_file->blocklens[j];
480 			if (size != bwr_size) {
481                         i_offset += size;
482 			    new_bwr_size -= size;
483 			}
484 		    }
485 
486 		    if (size == bwr_size) {
487 /* reached end of contiguous block in memory */
488 
489                     k = (k + 1)%flat_buf->count;
490 		        buf_count++;
491                     i_offset = (ADIO_Offset)buftype_extent *
492                         (ADIO_Offset)(buf_count/flat_buf->count) +
493                         flat_buf->indices[k];
494 			new_bwr_size = flat_buf->blocklens[k];
495 			if (size != fwr_size) {
496 			    off += size;
497 			    new_fwr_size -= size;
498 			}
499 		    }
500 		    num += size;
501 		    fwr_size = new_fwr_size;
502 		    bwr_size = new_bwr_size;
503 		}
504             }
505 
506 	    /* write the buffer out finally */
507 	    if (writebuf_len) {
508             ADIO_WriteContig(fd, writebuf, writebuf_len, MPI_BYTE,
509                              ADIO_EXPLICIT_OFFSET,
510 	                         writebuf_off, &status1, error_code);
511 		if (!(fd->atomicity))
512 		    ADIOI_UNLOCK(fd, writebuf_off, SEEK_SET, writebuf_len);
513             if (*error_code != MPI_SUCCESS) return;
514 	    }
515 	    if (fd->atomicity)
516             ADIOI_UNLOCK(fd, start_off, SEEK_SET, end_offset-start_off+1);
517 
518         ADIOI_Free(writebuf);
519 
520         if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = off;
521     }
522 
523     fd->fp_sys_posn = -1;	/* set it to null. */
524 
525 #ifdef HAVE_STATUS_SET_BYTES
526     MPIR_Status_set_bytes(status, datatype, bufsize);
527 /* This is a temporary way of filling in status. The right way is to
528     keep track of how much data was actually written by ADIOI_BUFFERED_WRITE. */
529 #endif
530 
531     if (!buftype_is_contig)
532         ADIOI_Delete_flattened(datatype);
533 }
534