1 /* -*- Mode: C; c-basic-offset:4 ; indent-tabs-mode:nil ; -*- */
2 /*
3  *
4  *   Copyright (C) 1997 University of Chicago.
5  *   See COPYRIGHT notice in top-level directory.
6  */
7 
8 
9 #include "adio.h"
10 #include "adio_extern.h"
11 #include <unistd.h>
12 
13 
14 /* #define IO_DEBUG 1 */
ADIOI_NOLOCK_WriteStrided(ADIO_File fd,const void * buf,int count,MPI_Datatype datatype,int file_ptr_type,ADIO_Offset offset,ADIO_Status * status,int * error_code)15 void ADIOI_NOLOCK_WriteStrided(ADIO_File fd, const void *buf, int count,
16 			     MPI_Datatype datatype, int file_ptr_type,
17 			     ADIO_Offset offset, ADIO_Status *status, int
18 			     *error_code)
19 {
20 /* borrowed from old-school PVFS (v1) code. A driver for file systems that
21  * cannot or do not support client-side buffering
22  * Does not do data sieving optimization
23  * Does contain write-combining optimization for noncontig in memory, contig in
24  * file
25  */
26 
27 /* offset is in units of etype relative to the filetype. */
28 
29     ADIOI_Flatlist_node *flat_buf, *flat_file;
30     int j, k, st_index=0;
31     off_t err_lseek=-1;
32     ssize_t err=-1;
33     ADIO_Offset fwr_size=0, bwr_size, new_bwr_size, new_fwr_size, i_offset, num;
34     ADIO_Offset bufsize, n_etypes_in_filetype;
35     ADIO_Offset n_filetypes, etype_in_filetype, size, sum;
36     ADIO_Offset abs_off_in_filetype=0, size_in_filetype;
37     MPI_Count filetype_size, etype_size, buftype_size;
38     MPI_Aint filetype_extent, buftype_extent, indx, lb;
39     int buf_count, buftype_is_contig, filetype_is_contig;
40     ADIO_Offset off, disp;
41     int flag, err_flag=0;
42     static char myname[] = "ADIOI_NOLOCK_WRITESTRIDED";
43 #ifdef IO_DEBUG
44     int rank,nprocs;
45 #endif
46 
47     /* --BEGIN ERROR HANDLING-- */
48     if (fd->atomicity) {
49 	*error_code = MPIO_Err_create_code(MPI_SUCCESS, MPIR_ERR_RECOVERABLE,
50 					   myname, __LINE__,
51 					   MPI_ERR_INTERN,
52 					   "Atomic mode set in I/O function", 0);
53 	return;
54     }
55     /* --END ERROR HANDLING-- */
56 
57     ADIOI_Datatype_iscontig(datatype, &buftype_is_contig);
58     ADIOI_Datatype_iscontig(fd->filetype, &filetype_is_contig);
59 
60     MPI_Type_size_x(fd->filetype, &filetype_size);
61     if ( ! filetype_size ) {
62 #ifdef HAVE_STATUS_SET_BYTES
63 	MPIR_Status_set_bytes(status, datatype, 0);
64 #endif
65 	*error_code = MPI_SUCCESS;
66 	return;
67     }
68 
69 #ifdef IO_DEBUG
70     MPI_Comm_rank(fd->comm, &rank);
71     MPI_Comm_size(fd->comm, &nprocs);
72 #endif
73 
74     MPI_Type_get_extent(fd->filetype, &lb, &filetype_extent);
75     MPI_Type_size_x(datatype, &buftype_size);
76     MPI_Type_get_extent(datatype, &lb, &buftype_extent);
77     etype_size = fd->etype_size;
78 
79     ADIOI_Assert((buftype_size * count) == ((ADIO_Offset)(unsigned)buftype_size * (ADIO_Offset)count));
80     bufsize = buftype_size * count;
81 
82     if (!buftype_is_contig && filetype_is_contig) {
83 	char *combine_buf, *combine_buf_ptr;
84 	ADIO_Offset combine_buf_remain;
85 /* noncontiguous in memory, contiguous in file. use writev */
86 
87 	flat_buf = ADIOI_Flatten_and_find(datatype);
88 
89 	/* allocate our "combine buffer" to pack data into before writing */
90 	combine_buf = (char *) ADIOI_Malloc(fd->hints->ind_wr_buffer_size);
91 	combine_buf_ptr = combine_buf;
92 	combine_buf_remain = fd->hints->ind_wr_buffer_size;
93 
94 	/* seek to the right spot in the file */
95 	if (file_ptr_type == ADIO_EXPLICIT_OFFSET) {
96 	    off = fd->disp + etype_size * offset;
97 	    lseek(fd->fd_sys, off, SEEK_SET);
98 	}
99 	else off = lseek(fd->fd_sys, fd->fp_ind, SEEK_SET);
100 
101 	/* loop through all the flattened pieces.  combine into buffer until
102 	 * no more will fit, then write.
103 	 *
104 	 * special case of a given piece being bigger than the combine buffer
105 	 * is also handled.
106 	 */
107 	for (j=0; j<count; j++) {
108     int i;
109 	    for (i=0; i<flat_buf->count; i++) {
110 		if (flat_buf->blocklens[i] > combine_buf_remain && combine_buf != combine_buf_ptr) {
111 		    /* there is data in the buffer; write out the buffer so far */
112 #ifdef IO_DEBUG
113 		    printf("[%d/%d] nc mem c file (0) writing loc = %Ld sz = %Ld\n",
114 				    rank, nprocs, off,
115 				    fd->hints->ind_wr_buffer_size-combine_buf_remain);
116 #endif
117 #ifdef ADIOI_MPE_LOGGING
118 		    MPE_Log_event( ADIOI_MPE_write_a, 0, NULL );
119 #endif
120 		    err = write(fd->fd_sys,
121 				     combine_buf,
122 				     fd->hints->ind_wr_buffer_size - combine_buf_remain);
123 #ifdef ADIOI_MPE_LOGGING
124 		    MPE_Log_event( ADIOI_MPE_write_b, 0, NULL );
125 #endif
126 		    if (err == -1) err_flag = 1;
127 
128 		    /* reset our buffer info */
129 		    combine_buf_ptr = combine_buf;
130 		    combine_buf_remain = fd->hints->ind_wr_buffer_size;
131 		}
132 
133 		/* TODO: heuristic for when to not bother to use combine buffer? */
134 		if (flat_buf->blocklens[i] >= combine_buf_remain) {
135 		    /* special case: blocklen is as big as or bigger than the combine buf;
136 		     * write directly
137 		     */
138 #ifdef IO_DEBUG
139 		    printf("[%d/%d] nc mem c file (1) writing loc = %Ld sz = %d\n",
140 				    rank, nprocs, off,
141 				    flat_buf->blocklens[i]);
142 #endif
143         ADIOI_Assert(flat_buf->blocklens[i] == (unsigned)flat_buf->blocklens[i]);
144         ADIOI_Assert((((ADIO_Offset)(MPIU_Upint)buf) + (ADIO_Offset)j*(ADIO_Offset)buftype_extent + flat_buf->indices[i]) == (ADIO_Offset)((MPIU_Upint)buf + (ADIO_Offset)j*(ADIO_Offset)buftype_extent + flat_buf->indices[i]));
145 #ifdef ADIOI_MPE_LOGGING
146 		    MPE_Log_event( ADIOI_MPE_write_a, 0, NULL );
147 #endif
148 		    err = write(fd->fd_sys,
149 				     ((char *) buf) + (ADIO_Offset)j*(ADIO_Offset)buftype_extent + flat_buf->indices[i],
150 				     (unsigned)flat_buf->blocklens[i]);
151 #ifdef ADIOI_MPE_LOGGING
152 		    MPE_Log_event( ADIOI_MPE_write_b, 0, NULL );
153 #endif
154 		    if (err == -1) err_flag = 1;
155 		    off += flat_buf->blocklens[i]; /* keep up with the final file offset too */
156 		}
157 		else {
158 		    /* copy more data into combine buffer */
159 		    memcpy(combine_buf_ptr,
160 			   ((char *) buf) + j*buftype_extent + flat_buf->indices[i],
161 			   flat_buf->blocklens[i]);
162 		    combine_buf_ptr += flat_buf->blocklens[i];
163 		    combine_buf_remain -= flat_buf->blocklens[i];
164 		    off += flat_buf->blocklens[i]; /* keep up with the final file offset too */
165 		}
166 	    }
167 	}
168 
169 	if (combine_buf_ptr != combine_buf) {
170 	    /* data left in buffer to write */
171 #ifdef IO_DEBUG
172 	    printf("[%d/%d] nc mem c file (2) writing loc = %Ld sz = %Ld\n",
173 			    rank, nprocs, off,
174 			     fd->hints->ind_wr_buffer_size-combine_buf_remain);
175 #endif
176 #ifdef ADIOI_MPE_LOGGING
177 	    MPE_Log_event( ADIOI_MPE_write_a, 0, NULL );
178 #endif
179 	    err = write(fd->fd_sys,
180 			     combine_buf,
181 			     fd->hints->ind_wr_buffer_size - combine_buf_remain);
182 #ifdef ADIOI_MPE_LOGGING
183 	    MPE_Log_event( ADIOI_MPE_write_b, 0, NULL );
184 #endif
185 	    if (err == -1) err_flag = 1;
186 	}
187 
188 	if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = off;
189 
190 	ADIOI_Free(combine_buf);
191 
192 	if (err_flag) {
193 	    *error_code = MPIO_Err_create_code(MPI_SUCCESS,
194 					       MPIR_ERR_RECOVERABLE, myname,
195 					       __LINE__, MPI_ERR_IO, "**io",
196 					       "**io %s", strerror(errno));
197 	}
198 	else *error_code = MPI_SUCCESS;
199     } /* if (!buftype_is_contig && filetype_is_contig)  ... */
200 
201     else {  /* noncontiguous in file */
202 
203 /* split up into several contiguous writes */
204 
205 /* find starting location in the file */
206 
207 /* filetype already flattened in ADIO_Open */
208 	flat_file = ADIOI_Flatlist;
209 	while (flat_file->type != fd->filetype) flat_file = flat_file->next;
210         disp = fd->disp;
211 
212 	if (file_ptr_type == ADIO_INDIVIDUAL) {
213 	    offset = fd->fp_ind; /* in bytes */
214             n_filetypes = -1;
215             flag = 0;
216             while (!flag) {
217                 int i;
218                 n_filetypes++;
219                 for (i=0; i<flat_file->count; i++) {
220                     if (disp + flat_file->indices[i] +
221                         n_filetypes*(ADIO_Offset)filetype_extent + flat_file->blocklens[i]
222                             >= offset) {
223                         st_index = i;
224                         fwr_size = disp + flat_file->indices[i] +
225                                 n_filetypes*(ADIO_Offset)filetype_extent
226                                  + flat_file->blocklens[i] - offset;
227                         flag = 1;
228                         break;
229                     }
230                 }
231             }
232 	}
233 	else {
234             int i;
235 	    n_etypes_in_filetype = filetype_size/etype_size;
236 	    n_filetypes = offset / n_etypes_in_filetype;
237 	    etype_in_filetype = offset % n_etypes_in_filetype;
238 	    size_in_filetype = etype_in_filetype * etype_size;
239 
240 	    sum = 0;
241 	    for (i=0; i<flat_file->count; i++) {
242 		sum += flat_file->blocklens[i];
243 		if (sum > size_in_filetype) {
244 		    st_index = i;
245 		    fwr_size = sum - size_in_filetype;
246 		    abs_off_in_filetype = flat_file->indices[i] +
247 			size_in_filetype - (sum - flat_file->blocklens[i]);
248 		    break;
249 		}
250 	    }
251 
252 	    /* abs. offset in bytes in the file */
253             offset = disp + n_filetypes*(ADIO_Offset)filetype_extent + abs_off_in_filetype;
254 	}
255 
256 	if (buftype_is_contig && !filetype_is_contig) {
257 
258 /* contiguous in memory, noncontiguous in file. should be the most
259    common case. */
260 
261 	    i_offset = 0;
262 	    j = st_index;
263 	    off = offset;
264 	    fwr_size = ADIOI_MIN(fwr_size, bufsize);
265 	    while (i_offset < bufsize) {
266                 if (fwr_size) {
267                     /* TYPE_UB and TYPE_LB can result in
268                        fwr_size = 0. save system call in such cases */
269 #ifdef ADIOI_MPE_LOGGING
270 		    MPE_Log_event(ADIOI_MPE_lseek_a, 0, NULL);
271 #endif
272 #ifdef IO_DEBUG
273 		    printf("[%d/%d] c mem nc file writing loc = %Ld sz = %d\n",
274 			    rank, nprocs, off, fwr_size);
275 #endif
276 		    err_lseek = lseek(fd->fd_sys, off, SEEK_SET);
277 #ifdef ADIOI_MPE_LOGGING
278 		    MPE_Log_event(ADIOI_MPE_lseek_b, 0, NULL);
279 #endif
280 		    if (err_lseek == -1) err_flag = 1;
281 #ifdef ADIOI_MPE_LOGGING
282 		    MPE_Log_event(ADIOI_MPE_write_a, 0, NULL);
283 #endif
284 		    err = write(fd->fd_sys, ((char *) buf) + i_offset, fwr_size);
285 #ifdef ADIOI_MPE_LOGGING
286 		    MPE_Log_event(ADIOI_MPE_write_b, 0, NULL);
287 #endif
288 		    if (err == -1) err_flag = 1;
289 		}
290 		i_offset += fwr_size;
291 
292                 if (off + fwr_size < disp + flat_file->indices[j] +
293                    flat_file->blocklens[j] + n_filetypes*(ADIO_Offset)filetype_extent)
294                        off += fwr_size;
295                 /* did not reach end of contiguous block in filetype.
296                    no more I/O needed. off is incremented by fwr_size. */
297                 else {
298 		    if (j < (flat_file->count - 1)) j++;
299 		    else {
300 			j = 0;
301 			n_filetypes++;
302 		    }
303 		    off = disp + flat_file->indices[j] +
304                                         n_filetypes*(ADIO_Offset)filetype_extent;
305 		    fwr_size = ADIOI_MIN(flat_file->blocklens[j], bufsize-i_offset);
306 		}
307 	    }
308 	}
309 	else {
310 /* noncontiguous in memory as well as in file */
311 
312 	    flat_buf = ADIOI_Flatten_and_find(datatype);
313 
314 	    k = num = buf_count = 0;
315 	    indx = flat_buf->indices[0];
316 	    j = st_index;
317 	    off = offset;
318 	    bwr_size = flat_buf->blocklens[0];
319 
320 	    while (num < bufsize) {
321 		size = ADIOI_MIN(fwr_size, bwr_size);
322 		if (size) {
323 #ifdef IO_DEBUG
324 		    printf("[%d/%d] nc mem nc file writing loc = %Ld sz = %d\n",
325 				    rank, nprocs, off, size);
326 #endif
327 #ifdef ADIOI_MPE_LOGGING
328 		    MPE_Log_event( ADIOI_MPE_lseek_a, 0, NULL );
329 #endif
330 		    lseek(fd->fd_sys, off, SEEK_SET);
331 #ifdef ADIOI_MPE_LOGGING
332 		    MPE_Log_event( ADIOI_MPE_lseek_b, 0, NULL );
333 #endif
334 		    if (err == -1) err_flag = 1;
335 #ifdef ADIOI_MPE_LOGGING
336 		    MPE_Log_event( ADIOI_MPE_write_a, 0, NULL );
337 #endif
338                     ADIOI_Assert(size == (size_t) size);
339                     ADIOI_Assert(off == (off_t) off);
340 		    err = write(fd->fd_sys, ((char *) buf) + indx, size);
341 #ifdef ADIOI_MPE_LOGGING
342 		    MPE_Log_event( ADIOI_MPE_write_b, 0, NULL );
343 #endif
344 		    if (err == -1) err_flag = 1;
345 		}
346 
347 		new_fwr_size = fwr_size;
348 		new_bwr_size = bwr_size;
349 
350 		if (size == fwr_size) {
351 /* reached end of contiguous block in file */
352                     if (j < (flat_file->count - 1)) j++;
353                     else {
354                         j = 0;
355                         n_filetypes++;
356                     }
357 
358                     off = disp + flat_file->indices[j] +
359                                    n_filetypes*(ADIO_Offset)filetype_extent;
360 
361 		    new_fwr_size = flat_file->blocklens[j];
362 		    if (size != bwr_size) {
363 			indx += size;
364 			new_bwr_size -= size;
365 		    }
366 		}
367 
368 		if (size == bwr_size) {
369 /* reached end of contiguous block in memory */
370 
371 		    k = (k + 1)%flat_buf->count;
372 		    buf_count++;
373 		    indx = buftype_extent*(buf_count/flat_buf->count) +
374 			flat_buf->indices[k];
375 		    new_bwr_size = flat_buf->blocklens[k];
376 		    if (size != fwr_size) {
377 			off += size;
378 			new_fwr_size -= size;
379 		    }
380 		}
381 		num += size;
382 		fwr_size = new_fwr_size;
383                 bwr_size = new_bwr_size;
384 	    }
385 	}
386 
387         if (file_ptr_type == ADIO_INDIVIDUAL) fd->fp_ind = off;
388 	if (err_flag) {
389 	    *error_code = MPIO_Err_create_code(MPI_SUCCESS,
390 					       MPIR_ERR_RECOVERABLE, myname,
391 					       __LINE__, MPI_ERR_IO, "**io",
392 					       "**io %s", strerror(errno));
393 	}
394 	else *error_code = MPI_SUCCESS;
395     }
396 
397     fd->fp_sys_posn = -1;   /* set it to null. */
398 
399 #ifdef HAVE_STATUS_SET_BYTES
400     MPIR_Status_set_bytes(status, datatype, bufsize);
401 /* This is a temporary way of filling in status. The right way is to
402    keep track of how much data was actually written by ADIOI_BUFFERED_WRITE. */
403 #endif
404 
405     if (!buftype_is_contig) ADIOI_Delete_flattened(datatype);
406 }
407