1 /*
2 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
3 * University Research and Technology
4 * Corporation. All rights reserved.
5 * Copyright (c) 2004-2011 The University of Tennessee and The University
6 * of Tennessee Research Foundation. All rights
7 * reserved.
8 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9 * University of Stuttgart. All rights reserved.
10 * Copyright (c) 2004-2005 The Regents of the University of California.
11 * All rights reserved.
12 * Copyright (c) 2008-2020 University of Houston. All rights reserved.
13 * Copyright (c) 2015-2018 Research Organization for Information Science
14 * and Technology (RIST). All rights reserved.
15 * $COPYRIGHT$
16 *
17 * Additional copyrights may follow
18 *
19 * $HEADER$
20 */
21
22 #include "ompi_config.h"
23 #include "fbtl_posix.h"
24
25 #include "mpi.h"
26 #include <unistd.h>
27 #include <limits.h>
28 #include "ompi/constants.h"
29 #include "ompi/mca/fbtl/fbtl.h"
30
31
32 static ssize_t mca_fbtl_posix_preadv_datasieving (ompio_file_t *fh);
33 static ssize_t mca_fbtl_posix_preadv_generic (ompio_file_t *fh);
34
mca_fbtl_posix_preadv(ompio_file_t * fh)35 ssize_t mca_fbtl_posix_preadv (ompio_file_t *fh )
36 {
37 ssize_t bytes_read=0, ret_code=0;
38 struct flock lock;
39 int ret;
40
41 if (NULL == fh->f_io_array) {
42 return OMPI_ERROR;
43 }
44
45 if ( fh->f_num_of_io_entries > 1 ) {
46 bool do_data_sieving = true;
47
48 size_t avg_gap_size=0;
49 size_t avg_block_size = 0;
50 off_t prev_offset = (off_t)fh->f_io_array[0].offset;
51 int i;
52 for ( i=0; i< fh->f_num_of_io_entries; i++ ) {
53 avg_block_size += fh->f_io_array[i].length;
54 avg_gap_size += (size_t)((off_t)fh->f_io_array[i].offset - prev_offset);
55 prev_offset = (off_t)fh->f_io_array[i].offset;
56 }
57 avg_block_size = avg_block_size / fh->f_num_of_io_entries;
58 avg_gap_size = avg_gap_size / fh->f_num_of_io_entries;
59
60 if ( false == mca_fbtl_posix_read_datasieving ||
61 0 == avg_gap_size ||
62 avg_block_size > mca_fbtl_posix_max_block_size ||
63 avg_gap_size > mca_fbtl_posix_max_gap_size ) {
64 do_data_sieving = false;
65 }
66
67 if ( do_data_sieving) {
68 return mca_fbtl_posix_preadv_datasieving (fh);
69 }
70 else {
71 return mca_fbtl_posix_preadv_generic (fh);
72 }
73 }
74 else {
75 // i.e. fh->f_num_of_io_entries == 1
76 ret = mca_fbtl_posix_lock ( &lock, fh, F_RDLCK, (off_t)fh->f_io_array[0].offset,
77 (off_t)fh->f_io_array[0].length, OMPIO_LOCK_ENTIRE_REGION );
78 if ( 0 < ret ) {
79 opal_output(1, "mca_fbtl_posix_preadv: error in mca_fbtl_posix_lock() ret=%d: %s",
80 ret, strerror(errno));
81 /* Just in case some part of the lock worked */
82 mca_fbtl_posix_unlock ( &lock, fh);
83 return OMPI_ERROR;
84 }
85
86 ret_code = pread(fh->fd, fh->f_io_array[0].memory_address, fh->f_io_array[0].length,
87 (off_t)fh->f_io_array[0].offset );
88 mca_fbtl_posix_unlock ( &lock, fh );
89 if ( ret_code == -1 ) {
90 opal_output(1, "mca_fbtl_posix_preadv: error in (p)read(v):%s", strerror(errno));
91 return OMPI_ERROR;
92 }
93
94 bytes_read += ret_code;
95 }
96
97 return bytes_read;
98 }
99
mca_fbtl_posix_preadv_datasieving(ompio_file_t * fh)100 ssize_t mca_fbtl_posix_preadv_datasieving (ompio_file_t *fh)
101 {
102 size_t start, end, len;
103 size_t bufsize = 0;
104 int ret, i, j;
105 ssize_t bytes_read=0, ret_code=0;
106 struct flock lock;
107 char *temp_buf = NULL;
108
109 int startindex = 0;
110 int endindex = 0;
111 bool done = false;
112
113 while (!done) {
114 // Break the io_array into chunks such that the size of the temporary
115 // buffer does not exceed mca_fbtl_posix_max_tmpbuf_size bytes.
116 // Each iteration will thus work in the range (startindex, endindex[
117 startindex = endindex;
118 if ( startindex >= fh->f_num_of_io_entries ) {
119 done = true;
120 break;
121 }
122
123 size_t sstart = (size_t)fh->f_io_array[startindex].offset;
124 size_t slen=0;
125
126 for ( j = startindex; j < fh->f_num_of_io_entries; j++ ) {
127 endindex = j;
128 slen = ((size_t)fh->f_io_array[j].offset + fh->f_io_array[j].length) - sstart;
129 if (slen > mca_fbtl_posix_max_tmpbuf_size ) {
130 endindex = j-1;
131 break;
132 }
133 }
134 // Need to increment the value of endindex
135 // by one for the loop syntax to work correctly.
136 endindex++;
137
138 start = (size_t)fh->f_io_array[startindex].offset;
139 end = (size_t)fh->f_io_array[endindex-1].offset + fh->f_io_array[endindex-1].length;
140 len = end - start;
141
142 if ( len > bufsize ) {
143 if ( NULL != temp_buf ) {
144 free ( temp_buf);
145 }
146 temp_buf = (char *) malloc ( len );
147 if ( NULL == temp_buf ) {
148 opal_output(1, "OUT OF MEMORY\n");
149 return OMPI_ERR_OUT_OF_RESOURCE;
150 }
151 bufsize = len;
152 }
153
154 // Read the entire block.
155 ret = mca_fbtl_posix_lock ( &lock, fh, F_RDLCK, start, len, OMPIO_LOCK_ENTIRE_REGION );
156 if ( 0 < ret ) {
157 opal_output(1, "mca_fbtl_posix_preadv_datasieving: error in mca_fbtl_posix_lock() ret=%d: %s",
158 ret, strerror(errno));
159 /* Just in case some part of the lock worked */
160 mca_fbtl_posix_unlock ( &lock, fh);
161 free ( temp_buf);
162 return OMPI_ERROR;
163 }
164
165 ret_code = pread (fh->fd, temp_buf, len, start);
166 mca_fbtl_posix_unlock ( &lock, fh);
167 if ( ret_code == -1 ) {
168 opal_output(1, "mca_fbtl_posix_preadv_datasieving: error in (p)read(v):%s", strerror(errno));
169 free ( temp_buf);
170 return OMPI_ERROR;
171 }
172
173 // Copy out the elements that were requested.
174 size_t pos = 0;
175 size_t num_bytes;
176 size_t start_offset = (size_t) fh->f_io_array[startindex].offset;
177 for ( i = startindex ; i < endindex ; i++) {
178 pos = (size_t) fh->f_io_array[i].offset - start_offset;
179 if ( (ssize_t) pos > ret_code ) {
180 break;
181 }
182 num_bytes = fh->f_io_array[i].length;
183 if ( ((ssize_t) pos + (ssize_t)num_bytes) > ret_code ) {
184 num_bytes = ret_code - (ssize_t)pos;
185 }
186
187 memcpy (fh->f_io_array[i].memory_address, temp_buf + pos, num_bytes);
188 bytes_read += num_bytes;
189 }
190 }
191
192 free ( temp_buf);
193 return bytes_read;
194 }
195
mca_fbtl_posix_preadv_generic(ompio_file_t * fh)196 ssize_t mca_fbtl_posix_preadv_generic (ompio_file_t *fh )
197 {
198 ssize_t bytes_read=0, ret_code=0;
199 struct iovec *iov = NULL;
200 struct flock lock;
201 int ret, i;
202
203 int block=1;
204 int iov_count = 0;
205 OMPI_MPI_OFFSET_TYPE iov_offset = 0;
206 off_t total_length, end_offset=0;
207
208 iov = (struct iovec *) malloc (OMPIO_IOVEC_INITIAL_SIZE * sizeof (struct iovec));
209 if (NULL == iov) {
210 opal_output(1, "OUT OF MEMORY\n");
211 return OMPI_ERR_OUT_OF_RESOURCE;
212 }
213
214 for (i=0 ; i<fh->f_num_of_io_entries ; i++) {
215 if (0 == iov_count) {
216 iov[iov_count].iov_base = fh->f_io_array[i].memory_address;
217 iov[iov_count].iov_len = fh->f_io_array[i].length;
218 iov_offset = (OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i].offset;
219 end_offset = (off_t)fh->f_io_array[i].offset + (off_t)fh->f_io_array[i].length;
220 iov_count ++;
221 }
222
223 if (OMPIO_IOVEC_INITIAL_SIZE*block <= iov_count) {
224 block ++;
225 iov = (struct iovec *)realloc
226 (iov, OMPIO_IOVEC_INITIAL_SIZE * block *
227 sizeof(struct iovec));
228 if (NULL == iov) {
229 opal_output(1, "OUT OF MEMORY\n");
230 return OMPI_ERR_OUT_OF_RESOURCE;
231 }
232 }
233
234 if (fh->f_num_of_io_entries != i+1) {
235 if (((((OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i].offset +
236 (ptrdiff_t)fh->f_io_array[i].length) ==
237 (OMPI_MPI_OFFSET_TYPE)(intptr_t)fh->f_io_array[i+1].offset)) &&
238 (iov_count < IOV_MAX ) ){
239 iov[iov_count].iov_base =
240 fh->f_io_array[i+1].memory_address;
241 iov[iov_count].iov_len = fh->f_io_array[i+1].length;
242 end_offset = (off_t)fh->f_io_array[i].offset + (off_t)fh->f_io_array[i].length;
243 iov_count ++;
244 continue;
245 }
246 }
247
248 total_length = (end_offset - (off_t)iov_offset );
249
250 ret = mca_fbtl_posix_lock ( &lock, fh, F_RDLCK, iov_offset, total_length, OMPIO_LOCK_SELECTIVE );
251 if ( 0 < ret ) {
252 opal_output(1, "mca_fbtl_posix_preadv_generic: error in mca_fbtl_posix_lock() ret=%d: %s", ret, strerror(errno));
253 free (iov);
254 /* Just in case some part of the lock worked */
255 mca_fbtl_posix_unlock ( &lock, fh);
256 return OMPI_ERROR;
257 }
258 #if defined(HAVE_PREADV)
259 ret_code = preadv (fh->fd, iov, iov_count, iov_offset);
260 #else
261 if (-1 == lseek (fh->fd, iov_offset, SEEK_SET)) {
262 opal_output(1, "mca_fbtl_posix_preadv_generic: error in lseek:%s", strerror(errno));
263 free(iov);
264 mca_fbtl_posix_unlock ( &lock, fh );
265 return OMPI_ERROR;
266 }
267 ret_code = readv (fh->fd, iov, iov_count);
268 #endif
269 mca_fbtl_posix_unlock ( &lock, fh );
270 if ( 0 < ret_code ) {
271 bytes_read+=ret_code;
272 }
273 else if ( ret_code == -1 ) {
274 opal_output(1, "mca_fbtl_posix_preadv_generic: error in (p)readv:%s", strerror(errno));
275 free(iov);
276 return OMPI_ERROR;
277 }
278 else if ( 0 == ret_code ){
279 /* end of file reached, no point in continue reading; */
280 break;
281 }
282 iov_count = 0;
283 }
284
285 free (iov);
286 return bytes_read;
287 }
288