1 /*
2  * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
3  *                         University Research and Technology
4  *                         Corporation.  All rights reserved.
5  * Copyright (c) 2004-2006 The University of Tennessee and The University
6  *                         of Tennessee Research Foundation.  All rights
7  *                         reserved.
8  * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9  *                         University of Stuttgart.  All rights reserved.
10  * Copyright (c) 2004-2005 The Regents of the University of California.
11  *                         All rights reserved.
12  * Copyright (c) 2008-2021 University of Houston. All rights reserved.
13  * Copyright (c) 2018      Cisco Systems, Inc.  All rights reserved
14  * Copyright (c) 2018      Research Organization for Information Science
15  *                         and Technology (RIST). All rights reserved.
16  * $COPYRIGHT$
17  *
18  * Additional copyrights may follow
19  *
20  * $HEADER$
21  *
22  * These symbols are in a file by themselves to provide nice linker
23  * semantics. Since linkers generally pull in symbols by object fules,
24  * keeping these symbols as the only symbols in this file prevents
25  * utility programs such as "ompi_info" from having to import entire
26  * modules just to query their version and parameters
27  */
28 
29 #include "ompi_config.h"
30 #include "mpi.h"
31 
32 #include <unistd.h>
33 #include <sys/uio.h>
34 #if HAVE_AIO_H
35 #include <aio.h>
36 #endif
37 
38 int fbtl_posix_max_aio_active_reqs=2048;
39 
40 #include "ompi/mca/fbtl/fbtl.h"
41 #include "ompi/mca/fbtl/posix/fbtl_posix.h"
42 
43 /*
44  * *******************************************************************
45  * ************************ actions structure ************************
46  * *******************************************************************
47  */
48 static mca_fbtl_base_module_1_0_0_t posix =  {
49     mca_fbtl_posix_module_init,     /* initalise after being selected */
50     mca_fbtl_posix_module_finalize, /* close a module on a communicator */
51     mca_fbtl_posix_preadv,          /* blocking read */
52 #if defined (FBTL_POSIX_HAVE_AIO)
53     mca_fbtl_posix_ipreadv,         /* non-blocking read*/
54 #else
55     NULL,                           /* non-blocking read */
56 #endif
57     mca_fbtl_posix_pwritev,         /* blocking write */
58 #if defined (FBTL_POSIX_HAVE_AIO)
59     mca_fbtl_posix_ipwritev,        /* non-blocking write */
60     mca_fbtl_posix_progress,        /* module specific progress */
61     mca_fbtl_posix_request_free     /* free module specific data items on the request */
62 #else
63     NULL,                           /* non-blocking write */
64     NULL,                           /* module specific progress */
65     NULL                            /* free module specific data items on the request */
66 #endif
67 };
68 /*
69  * *******************************************************************
70  * ************************* structure ends **************************
71  * *******************************************************************
72  */
73 
mca_fbtl_posix_component_init_query(bool enable_progress_threads,bool enable_mpi_threads)74 int mca_fbtl_posix_component_init_query(bool enable_progress_threads,
75                                       bool enable_mpi_threads) {
76     /* Nothing to do */
77 
78    return OMPI_SUCCESS;
79 }
80 
81 struct mca_fbtl_base_module_1_0_0_t *
mca_fbtl_posix_component_file_query(ompio_file_t * fh,int * priority)82 mca_fbtl_posix_component_file_query (ompio_file_t *fh, int *priority) {
83    *priority = mca_fbtl_posix_priority;
84 
85    if (UFS == fh->f_fstype) {
86        if (*priority < 50) {
87            *priority = 50;
88        }
89    }
90 
91    return &posix;
92 }
93 
mca_fbtl_posix_component_file_unquery(ompio_file_t * file)94 int mca_fbtl_posix_component_file_unquery (ompio_file_t *file) {
95    /* This function might be needed for some purposes later. for now it
96     * does not have anything to do since there are no steps which need
97     * to be undone if this module is not selected */
98 
99    return OMPI_SUCCESS;
100 }
101 
mca_fbtl_posix_module_init(ompio_file_t * file)102 int mca_fbtl_posix_module_init (ompio_file_t *file) {
103 
104 #if defined (FBTL_POSIX_HAVE_AIO)
105     long val = sysconf(_SC_AIO_MAX);
106     if ( -1 != val ) {
107 	fbtl_posix_max_aio_active_reqs = (int)val;
108     }
109 #endif
110     return OMPI_SUCCESS;
111 }
112 
113 
mca_fbtl_posix_module_finalize(ompio_file_t * file)114 int mca_fbtl_posix_module_finalize (ompio_file_t *file) {
115     return OMPI_SUCCESS;
116 }
117 
mca_fbtl_posix_progress(mca_ompio_request_t * req)118 bool mca_fbtl_posix_progress ( mca_ompio_request_t *req)
119 {
120     bool ret=false;
121 #if defined (FBTL_POSIX_HAVE_AIO)
122     int i=0, lcount=0, ret_code=0;
123     mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
124     off_t start_offset, end_offset, total_length;
125 
126     for (i=data->aio_first_active_req; i < data->aio_last_active_req; i++ ) {
127 	if ( EINPROGRESS == data->aio_req_status[i] ) {
128 	    data->aio_req_status[i] = aio_error ( &data->aio_reqs[i]);
129 	    if ( 0 == data->aio_req_status[i]){
130 		/* assuming right now that aio_return will return
131 		** the number of bytes written/read and not an error code,
132 		** since aio_error should have returned an error in that
133 		** case and not 0 ( which means request is complete)
134 		*/
135                 ssize_t ret2 = aio_return (&data->aio_reqs[i]);
136 		data->aio_total_len += ret2;
137                 if ( data->aio_reqs[i].aio_nbytes != (size_t)ret2 ) {
138                     /* Partial completion */
139                     data->aio_reqs[i].aio_offset += ret2;
140                     data->aio_reqs[i].aio_buf    = (char*)data->aio_reqs[i].aio_buf + ret2;
141                     data->aio_reqs[i].aio_nbytes -= ret2;
142                     data->aio_reqs[i].aio_reqprio = 0;
143                     data->aio_reqs[i].aio_sigevent.sigev_notify = SIGEV_NONE;
144                     data->aio_req_status[i]        = EINPROGRESS;
145                     start_offset = data->aio_reqs[i].aio_offset;
146                     total_length = data->aio_reqs[i].aio_nbytes;
147                     if ( data->aio_req_type == FBTL_POSIX_WRITE ) {
148                         ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
149                         if ( 0 < ret_code ) {
150                             opal_output(1, "mca_fbtl_posix_progress: error in mca_fbtl_posix_lock() %d", ret_code);
151                             /* Just in case some part of the lock actually succeeded. */
152                             mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
153                             return OMPI_ERROR;
154                         }
155                         if (-1 == aio_write(&data->aio_reqs[i])) {
156                             opal_output(1, "mca_fbtl_posix_progress: error in aio_write()");
157                             mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
158                             return OMPI_ERROR;
159                         }
160                     }
161                     else if (  data->aio_req_type == FBTL_POSIX_READ ) {
162                         ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
163                         if ( 0 < ret_code ) {
164                             opal_output(1, "mca_fbtl_posix_progress: error in mca_fbtl_posix_lock() %d", ret_code);
165                             /* Just in case some part of the lock actually succeeded. */
166                             mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
167                             return OMPI_ERROR;
168                         }
169                         if (-1 == aio_read(&data->aio_reqs[i])) {
170                             opal_output(1, "mca_fbtl_posix_progress: error in aio_read()");
171                             mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
172                             return OMPI_ERROR;
173                         }
174                         mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
175                     }
176                 }
177 		else {
178                     data->aio_open_reqs--;
179                     lcount++;
180                 }
181 	    }
182 	    else if ( EINPROGRESS == data->aio_req_status[i]){
183 		/* not yet done */
184 		continue;
185 	    }
186 	    else {
187 		/* an error occured. Mark the request done, but
188 		   set an error code in the status */
189 		req->req_ompi.req_status.MPI_ERROR = OMPI_ERROR;
190 		req->req_ompi.req_status._ucount = data->aio_total_len;
191 		ret = true;
192 		break;
193 	    }
194 	}
195 	else {
196 	    lcount++;
197 	}
198     }
199 #if 0
200     printf("lcount=%d open_reqs=%d\n", lcount, data->aio_open_reqs );
201 #endif
202 
203     if ( (lcount == data->aio_req_chunks) && (0 != data->aio_open_reqs )) {
204         /* release the lock of the previous operations */
205         mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
206 
207 	/* post the next batch of operations */
208 	data->aio_first_active_req = data->aio_last_active_req;
209 	if ( (data->aio_req_count-data->aio_last_active_req) > data->aio_req_chunks ) {
210 	    data->aio_last_active_req += data->aio_req_chunks;
211 	}
212 	else {
213 	    data->aio_last_active_req = data->aio_req_count;
214 	}
215 
216         start_offset = data->aio_reqs[data->aio_first_active_req].aio_offset;
217         end_offset   = data->aio_reqs[data->aio_last_active_req-1].aio_offset + data->aio_reqs[data->aio_last_active_req-1].aio_nbytes;
218         total_length = (end_offset - start_offset);
219 
220         if ( FBTL_POSIX_READ == data->aio_req_type ) {
221             ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_RDLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
222         }
223         else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
224             ret_code = mca_fbtl_posix_lock( &data->aio_lock, data->aio_fh, F_WRLCK, start_offset, total_length, OMPIO_LOCK_ENTIRE_REGION );
225         }
226         if ( 0 < ret_code ) {
227             opal_output(1, "mca_fbtl_posix_progress: error in mca_fbtl_posix_lock() %d", ret_code);
228             /* Just in case some part of the lock actually succeeded. */
229             mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
230             return OMPI_ERROR;
231         }
232 
233 	for ( i=data->aio_first_active_req; i< data->aio_last_active_req; i++ ) {
234 	    if ( FBTL_POSIX_READ == data->aio_req_type ) {
235 		if (-1 == aio_read(&data->aio_reqs[i])) {
236 		    opal_output(1, "mca_fbtl_posix_progress: error in aio_read()");
237                     mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
238 		    return OMPI_ERROR;
239 		}
240 	    }
241 	    else if ( FBTL_POSIX_WRITE == data->aio_req_type ) {
242 		if (-1 == aio_write(&data->aio_reqs[i])) {
243 		    opal_output(1, "mca_fbtl_posix_progress: error in aio_write()");
244                     mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
245 		    return OMPI_ERROR;
246 		}
247 	    }
248 	}
249 #if 0
250 	printf("posting new batch: first=%d last=%d\n", data->aio_first_active_req, data->aio_last_active_req );
251 #endif
252     }
253 
254     if ( 0 == data->aio_open_reqs ) {
255 	/* all pending operations are finished for this request */
256 	req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
257 	req->req_ompi.req_status._ucount = data->aio_total_len;
258         mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
259 	ret = true;
260     }
261 #endif
262     return ret;
263 }
264 
mca_fbtl_posix_request_free(mca_ompio_request_t * req)265 void mca_fbtl_posix_request_free ( mca_ompio_request_t *req)
266 {
267 #if defined (FBTL_POSIX_HAVE_AIO)
268     /* Free the fbtl specific data structures */
269     mca_fbtl_posix_request_data_t *data=(mca_fbtl_posix_request_data_t *)req->req_data;
270     if (NULL != data ) {
271         mca_fbtl_posix_unlock ( &data->aio_lock, data->aio_fh );
272 	if ( NULL != data->aio_reqs ) {
273 	    free ( data->aio_reqs);
274 	}
275 	if ( NULL != data->aio_req_status ) {
276 	    free ( data->aio_req_status );
277 	}
278 	free ( data );
279 	req->req_data = NULL;
280     }
281 #endif
282   return;
283 }
284