1 /*
2 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
3 * University Research and Technology
4 * Corporation. All rights reserved.
5 * Copyright (c) 2004-2006 The University of Tennessee and The University
6 * of Tennessee Research Foundation. All rights
7 * reserved.
8 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9 * University of Stuttgart. All rights reserved.
10 * Copyright (c) 2004-2005 The Regents of the University of California.
11 * All rights reserved.
12 * Copyright (c) 2008-2021 University of Houston. All rights reserved.
13 * Copyright (c) 2018 Cisco Systems, Inc. All rights reserved
14 * Copyright (c) 2018 Research Organization for Information Science
15 * and Technology (RIST). All rights reserved.
16 * $COPYRIGHT$
17 *
18 * Additional copyrights may follow
19 *
20 * $HEADER$
21 *
22 * These symbols are in a file by themselves to provide nice linker
23 * semantics. Since linkers generally pull in symbols by object fules,
24 * keeping these symbols as the only symbols in this file prevents
25 * utility programs such as "ompi_info" from having to import entire
26 * modules just to query their version and parameters
27 */
28
29 #include "ompi_config.h"
30 #include "mpi.h"
31
32 #include <unistd.h>
33 #include <sys/uio.h>
34 #if HAVE_AIO_H
35 #include <aio.h>
36 #endif
37
38 int fbtl_posix_max_aio_active_reqs=2048;
39
40 #include "ompi/mca/fbtl/fbtl.h"
41 #include "ompi/mca/fbtl/posix/fbtl_posix.h"
42
43 /*
44 * *******************************************************************
45 * ************************ actions structure ************************
46 * *******************************************************************
47 */
48 static mca_fbtl_base_module_1_0_0_t posix = {
49 mca_fbtl_posix_module_init, /* initalise after being selected */
50 mca_fbtl_posix_module_finalize, /* close a module on a communicator */
51 mca_fbtl_posix_preadv, /* blocking read */
52 #if defined (FBTL_POSIX_HAVE_AIO)
53 mca_fbtl_posix_ipreadv, /* non-blocking read*/
54 #else
55 NULL, /* non-blocking read */
56 #endif
57 mca_fbtl_posix_pwritev, /* blocking write */
58 #if defined (FBTL_POSIX_HAVE_AIO)
59 mca_fbtl_posix_ipwritev, /* non-blocking write */
60 mca_fbtl_posix_progress, /* module specific progress */
61 mca_fbtl_posix_request_free /* free module specific data items on the request */
62 #else
63 NULL, /* non-blocking write */
64 NULL, /* module specific progress */
65 NULL /* free module specific data items on the request */
66 #endif
67 };
68 /*
69 * *******************************************************************
70 * ************************* structure ends **************************
71 * *******************************************************************
72 */
73
mca_fbtl_posix_component_init_query(bool enable_progress_threads,bool enable_mpi_threads)74 int mca_fbtl_posix_component_init_query(bool enable_progress_threads,
75 bool enable_mpi_threads) {
76 /* Nothing to do */
77
78 return OMPI_SUCCESS;
79 }
80
81 struct mca_fbtl_base_module_1_0_0_t *
mca_fbtl_posix_component_file_query(ompio_file_t * fh,int * priority)82 mca_fbtl_posix_component_file_query (ompio_file_t *fh, int *priority) {
83 *priority = mca_fbtl_posix_priority;
84
85 if (UFS == fh->f_fstype) {
86 if (*priority < 50) {
87 *priority = 50;
88 }
89 }
90
91 return &posix;
92 }
93
mca_fbtl_posix_component_file_unquery(ompio_file_t * file)94 int mca_fbtl_posix_component_file_unquery (ompio_file_t *file) {
95 /* This function might be needed for some purposes later. for now it
96 * does not have anything to do since there are no steps which need
97 * to be undone if this module is not selected */
98
99 return OMPI_SUCCESS;
100 }
101
mca_fbtl_posix_module_init(ompio_file_t * file)102 int mca_fbtl_posix_module_init (ompio_file_t *file) {
103
104 #if defined (FBTL_POSIX_HAVE_AIO)
105 long val = sysconf(_SC_AIO_MAX);
106 if ( -1 != val ) {
107 fbtl_posix_max_aio_active_reqs = (int)val;
108 }
109 #endif
110 return OMPI_SUCCESS;
111 }
112
113
mca_fbtl_posix_module_finalize(ompio_file_t * file)114 int mca_fbtl_posix_module_finalize (ompio_file_t *file) {
115 return OMPI_SUCCESS;
116 }
117
mca_fbtl_posix_progress(mca_ompio_request_t * req)118 bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
119 {
120 bool ret=false;
121 #if defined (FBTL_POSIX_HAVE_AIO)
122 int i=0, lcount=0, ret_code=0;
123 mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
124 off_t start_offset, end_offset, total_length;
125
126 for (i=data->aio_first_active_req; i < data->aio_last_active_req; i++ ) {
127 if ( EINPROGRESS == data->aio_req_status[i] ) {
128 data->aio_req_status[i] = aio_error ( &data->aio_reqs[i]);
129 if ( 0 == data->aio_req_status[i]){
130 /* assuming right now that aio_return will return
131 ** the number of bytes written/read and not an error code,
132 ** since aio_error should have returned an error in that
133 ** case and not 0 ( which means request is complete)
134 */
135 ssize_t ret2 = aio_return (&data->aio_reqs[i]);
136 data->aio_total_len += ret2;
137 if ( data->aio_reqs[i].aio_nbytes != (size_t)ret2 ) {
138 /* Partial completion */
139 data->aio_reqs[i].aio_offset += ret2;
140 data->aio_reqs[i].aio_buf = (char*)data->aio_reqs[i].aio_buf + ret2;
141 data->aio_reqs[i].aio_nbytes -= ret2;
142 data->aio_reqs[i].aio_reqprio = 0;
143 data->aio_reqs[i].aio_sigevent.sigev_notify = SIGEV_NONE;
144 data->aio_req_status[i] = EINPROGRESS;
145 start_offset = data->aio_reqs[i].aio_offset;
146 total_length = data->aio_reqs[i].aio_nbytes;
147 if ( data->aio_req_type == FBTL_POSIX_WRITE ) {
148 ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
149 if ( 0 < ret_code ) {
150 opal_output(1, "mca_fbtl_posix_progress: error in mca_fbtl_posix_lock() %d", ret_code);
151 /* Just in case some part of the lock actually succeeded. */
152 mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
153 return OMPI_ERROR;
154 }
155 if (-1 == aio_write(&data->aio_reqs[i])) {
156 opal_output(1, "mca_fbtl_posix_progress: error in aio_write()");
157 mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
158 return OMPI_ERROR;
159 }
160 }
161 else if ( data->aio_req_type == FBTL_POSIX_READ ) {
162 ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
163 if ( 0 < ret_code ) {
164 opal_output(1, "mca_fbtl_posix_progress: error in mca_fbtl_posix_lock() %d", ret_code);
165 /* Just in case some part of the lock actually succeeded. */
166 mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
167 return OMPI_ERROR;
168 }
169 if (-1 == aio_read(&data->aio_reqs[i])) {
170 opal_output(1, "mca_fbtl_posix_progress: error in aio_read()");
171 mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
172 return OMPI_ERROR;
173 }
174 mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
175 }
176 }
177 else {
178 data->aio_open_reqs--;
179 lcount++;
180 }
181 }
182 else if ( EINPROGRESS == data->aio_req_status[i]){
183 /* not yet done */
184 continue;
185 }
186 else {
187 /* an error occured. Mark the request done, but
188 set an error code in the status */
189 req->req_ompi.req_status.MPI_ERROR = OMPI_ERROR;
190 req->req_ompi.req_status._ucount = data->aio_total_len;
191 ret = true;
192 break;
193 }
194 }
195 else {
196 lcount++;
197 }
198 }
199 #if 0
200 printf("lcount=%d open_reqs=%d\n", lcount, data->aio_open_reqs );
201 #endif
202
203 if ( (lcount == data->aio_req_chunks) && (0 != data->aio_open_reqs )) {
204 /* release the lock of the previous operations */
205 mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
206
207 /* post the next batch of operations */
208 data->aio_first_active_req = data->aio_last_active_req;
209 if ( (data->aio_req_count-data->aio_last_active_req) > data->aio_req_chunks ) {
210 data->aio_last_active_req += data->aio_req_chunks;
211 }
212 else {
213 data->aio_last_active_req = data->aio_req_count;
214 }
215
216 start_offset = data->aio_reqs[data->aio_first_active_req].aio_offset;
217 end_offset = data->aio_reqs[data->aio_last_active_req-1].aio_offset + data->aio_reqs[data->aio_last_active_req-1].aio_nbytes;
218 total_length = (end_offset - start_offset);
219
220 if ( FBTL_POSIX_READ == data->aio_req_type ) {
221 ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
222 }
223 else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
224 ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
225 }
226 if ( 0 < ret_code ) {
227 opal_output(1, "mca_fbtl_posix_progress: error in mca_fbtl_posix_lock() %d", ret_code);
228 /* Just in case some part of the lock actually succeeded. */
229 mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
230 return OMPI_ERROR;
231 }
232
233 for ( i=data->aio_first_active_req; i< data->aio_last_active_req; i++ ) {
234 if ( FBTL_POSIX_READ == data->aio_req_type ) {
235 if (-1 == aio_read(&data->aio_reqs[i])) {
236 opal_output(1, "mca_fbtl_posix_progress: error in aio_read()");
237 mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
238 return OMPI_ERROR;
239 }
240 }
241 else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
242 if (-1 == aio_write(&data->aio_reqs[i])) {
243 opal_output(1, "mca_fbtl_posix_progress: error in aio_write()");
244 mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
245 return OMPI_ERROR;
246 }
247 }
248 }
249 #if 0
250 printf("posting new batch: first=%d last=%d\n", data->aio_first_active_req, data->aio_last_active_req );
251 #endif
252 }
253
254 if ( 0 == data->aio_open_reqs ) {
255 /* all pending operations are finished for this request */
256 req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
257 req->req_ompi.req_status._ucount = data->aio_total_len;
258 mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
259 ret = true;
260 }
261 #endif
262 return ret;
263 }
264
mca_fbtl_posix_request_free(mca_ompio_request_t * req)265 void mca_fbtl_posix_request_free ( mca_ompio_request_t *req)
266 {
267 #if defined (FBTL_POSIX_HAVE_AIO)
268 /* Free the fbtl specific data structures */
269 mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
270 if (NULL != data ) {
271 mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
272 if ( NULL != data->aio_reqs ) {
273 free ( data->aio_reqs);
274 }
275 if ( NULL != data->aio_req_status ) {
276 free ( data->aio_req_status );
277 }
278 free ( data );
279 req->req_data = NULL;
280 }
281 #endif
282 return;
283 }
284