1 /*
2  *  Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
3  *                          University Research and Technology
4  *                          Corporation.  All rights reserved.
5  *  Copyright (c) 2004-2016 The University of Tennessee and The University
6  *                          of Tennessee Research Foundation.  All rights
7  *                          reserved.
8  *  Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9  *                          University of Stuttgart.  All rights reserved.
10  *  Copyright (c) 2004-2005 The Regents of the University of California.
11  *                          All rights reserved.
12  *  Copyright (c) 2008-2019 University of Houston. All rights reserved.
13  *  Copyright (c) 2018      Research Organization for Information Science
14  *                          and Technology (RIST). All rights reserved.
15  *  $COPYRIGHT$
16  *
17  *  Additional copyrights may follow
18  *
19  *  $HEADER$
20  */
21 
22 #include "ompi_config.h"
23 
24 #include "ompi/communicator/communicator.h"
25 #include "ompi/info/info.h"
26 #include "ompi/file/file.h"
27 #include "ompi/mca/fs/fs.h"
28 #include "ompi/mca/fs/base/base.h"
29 #include "ompi/mca/fcoll/fcoll.h"
30 #include "ompi/mca/fcoll/base/base.h"
31 #include "ompi/mca/fbtl/fbtl.h"
32 #include "ompi/mca/fbtl/base/base.h"
33 
34 #include "common_ompio.h"
35 #include "common_ompio_request.h"
36 #include "common_ompio_buffer.h"
37 #include <unistd.h>
38 #include <math.h>
39 
40 
41 /* Read and write routines are split into two interfaces.
42 **   The
43 **   mca_io_ompio_file_read/write[_at]
44 **
45 ** routines are the ones registered with the ompio modules.
46 ** The
47 **
48 ** mca_common_ompio_file_read/write[_at]
49 **
50 ** routesin are used e.g. from the shared file pointer modules.
51 ** The main difference is, that the first one takes an ompi_file_t
52 ** as a file pointer argument, while the second uses the ompio internal
53 ** ompio_file_t structure.
54 */
55 
mca_common_ompio_file_read(ompio_file_t * fh,void * buf,int count,struct ompi_datatype_t * datatype,ompi_status_public_t * status)56 int mca_common_ompio_file_read (ompio_file_t *fh,
57 			      void *buf,
58 			      int count,
59 			      struct ompi_datatype_t *datatype,
60 			      ompi_status_public_t *status)
61 {
62     int ret = OMPI_SUCCESS;
63 
64     size_t total_bytes_read = 0;       /* total bytes that have been read*/
65     size_t bytes_per_cycle = 0;        /* total read in each cycle by each process*/
66     int index = 0;
67     int cycles = 0;
68 
69     uint32_t iov_count = 0;
70     struct iovec *decoded_iov = NULL;
71 
72     size_t max_data=0, real_bytes_read=0;
73     size_t spc=0;
74     ssize_t ret_code=0;
75     int i = 0; /* index into the decoded iovec of the buffer */
76     int j = 0; /* index into the file vie iovec */
77 
78     if (fh->f_amode & MPI_MODE_WRONLY){
79 //      opal_output(10, "Improper use of FILE Mode, Using WRONLY for Read!\n");
80         ret = MPI_ERR_ACCESS;
81       return ret;
82     }
83 
84     if ( 0 == count ) {
85         if ( MPI_STATUS_IGNORE != status ) {
86             status->_ucount = 0;
87         }
88         return ret;
89     }
90 
91     bool need_to_copy = false;
92     opal_convertor_t convertor;
93 #if OPAL_CUDA_SUPPORT
94     int is_gpu, is_managed;
95     mca_common_ompio_check_gpu_buf ( fh, buf, &is_gpu, &is_managed);
96     if ( is_gpu && !is_managed ) {
97         need_to_copy = true;
98     }
99 #endif
100 
101     if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
102          !(datatype == &ompi_mpi_byte.dt  ||
103            datatype == &ompi_mpi_char.dt   )) {
104         /* only need to copy if any of these conditions are given:
105            1. buffer is an unmanaged CUDA buffer (checked above).
106            2. Datarepresentation is anything other than 'native' and
107            3. datatype is not byte or char (i.e it does require some actual
108               work to be done e.g. for external32.
109         */
110         need_to_copy = true;
111     }
112 
113     if ( need_to_copy ) {
114         char *tbuf=NULL;
115 
116         OMPIO_PREPARE_READ_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count);
117     }
118     else {
119         mca_common_ompio_decode_datatype (fh,
120                                           datatype,
121                                           count,
122                                           buf,
123                                           &max_data,
124                                           fh->f_mem_convertor,
125                                           &decoded_iov,
126                                           &iov_count);
127     }
128 
129     if ( 0 < max_data && 0 == fh->f_iov_count  ) {
130         if ( MPI_STATUS_IGNORE != status ) {
131             status->_ucount = 0;
132         }
133         if (NULL != decoded_iov) {
134             free (decoded_iov);
135             decoded_iov = NULL;
136         }
137         return OMPI_SUCCESS;
138     }
139 
140     if ( -1 == OMPIO_MCA_GET(fh, cycle_buffer_size )) {
141         bytes_per_cycle = max_data;
142     }
143     else {
144 	bytes_per_cycle = OMPIO_MCA_GET(fh, cycle_buffer_size);
145     }
146     cycles = ceil((double)max_data/bytes_per_cycle);
147 
148 #if 0
149 	printf ("Bytes per Cycle: %d   Cycles: %d max_data:%d \n",bytes_per_cycle, cycles, max_data);
150 #endif
151 
152     j = fh->f_index_in_file_view;
153 
154     for (index = 0; index < cycles; index++) {
155 
156 	mca_common_ompio_build_io_array ( fh,
157                                           index,
158                                           cycles,
159                                           bytes_per_cycle,
160                                           max_data,
161                                           iov_count,
162                                           decoded_iov,
163                                           &i,
164                                           &j,
165                                           &total_bytes_read,
166                                           &spc,
167                                           &fh->f_io_array,
168                                           &fh->f_num_of_io_entries);
169 
170         if (fh->f_num_of_io_entries) {
171             ret_code = fh->f_fbtl->fbtl_preadv (fh);
172             if ( 0<= ret_code ) {
173                 real_bytes_read+=(size_t)ret_code;
174             }
175         }
176 
177         fh->f_num_of_io_entries = 0;
178         if (NULL != fh->f_io_array) {
179             free (fh->f_io_array);
180             fh->f_io_array = NULL;
181         }
182     }
183 
184     if ( need_to_copy ) {
185         size_t pos=0;
186 
187         opal_convertor_unpack (&convertor, decoded_iov, &iov_count, &pos );
188         opal_convertor_cleanup (&convertor);
189         mca_common_ompio_release_buf (fh, decoded_iov->iov_base);
190     }
191 
192     if (NULL != decoded_iov) {
193         free (decoded_iov);
194         decoded_iov = NULL;
195     }
196 
197     if ( MPI_STATUS_IGNORE != status ) {
198         status->_ucount = real_bytes_read;
199     }
200 
201     return ret;
202 }
203 
mca_common_ompio_file_read_at(ompio_file_t * fh,OMPI_MPI_OFFSET_TYPE offset,void * buf,int count,struct ompi_datatype_t * datatype,ompi_status_public_t * status)204 int mca_common_ompio_file_read_at (ompio_file_t *fh,
205 				 OMPI_MPI_OFFSET_TYPE offset,
206 				 void *buf,
207 				 int count,
208 				 struct ompi_datatype_t *datatype,
209 				 ompi_status_public_t * status)
210 {
211     int ret = OMPI_SUCCESS;
212     OMPI_MPI_OFFSET_TYPE prev_offset;
213 
214     mca_common_ompio_file_get_position (fh, &prev_offset );
215 
216     mca_common_ompio_set_explicit_offset (fh, offset);
217     ret = mca_common_ompio_file_read (fh,
218 				    buf,
219 				    count,
220 				    datatype,
221 				    status);
222 
223     // An explicit offset file operation is not suppsed to modify
224     // the internal file pointer. So reset the pointer
225     // to the previous value
226     mca_common_ompio_set_explicit_offset (fh, prev_offset);
227 
228     return ret;
229 }
230 
231 
mca_common_ompio_file_iread(ompio_file_t * fh,void * buf,int count,struct ompi_datatype_t * datatype,ompi_request_t ** request)232 int mca_common_ompio_file_iread (ompio_file_t *fh,
233 			       void *buf,
234 			       int count,
235 			       struct ompi_datatype_t *datatype,
236 			       ompi_request_t **request)
237 {
238     int ret = OMPI_SUCCESS;
239     mca_ompio_request_t *ompio_req=NULL;
240     size_t spc=0;
241 
242     if (fh->f_amode & MPI_MODE_WRONLY){
243 //      opal_output(10, "Improper use of FILE Mode, Using WRONLY for Read!\n");
244         ret = MPI_ERR_ACCESS;
245       return ret;
246     }
247 
248     mca_common_ompio_request_alloc ( &ompio_req, MCA_OMPIO_REQUEST_READ);
249 
250     if ( 0 == count ) {
251         ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
252         ompio_req->req_ompi.req_status._ucount = 0;
253         ompi_request_complete (&ompio_req->req_ompi, false);
254         *request = (ompi_request_t *) ompio_req;
255 
256         return OMPI_SUCCESS;
257     }
258 
259     if ( NULL != fh->f_fbtl->fbtl_ipreadv ) {
260         // This fbtl has support for non-blocking operations
261 
262         size_t total_bytes_read = 0;       /* total bytes that have been read*/
263         uint32_t iov_count = 0;
264         struct iovec *decoded_iov = NULL;
265 
266         size_t max_data = 0;
267         int i = 0; /* index into the decoded iovec of the buffer */
268         int j = 0; /* index into the file vie iovec */
269 
270         bool need_to_copy = false;
271 
272 #if OPAL_CUDA_SUPPORT
273         int is_gpu, is_managed;
274         mca_common_ompio_check_gpu_buf ( fh, buf, &is_gpu, &is_managed);
275         if ( is_gpu && !is_managed ) {
276             need_to_copy = true;
277         }
278 #endif
279 
280         if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
281              !(datatype == &ompi_mpi_byte.dt  ||
282                datatype == &ompi_mpi_char.dt   )) {
283             /* only need to copy if any of these conditions are given:
284                1. buffer is an unmanaged CUDA buffer (checked above).
285                2. Datarepresentation is anything other than 'native' and
286                3. datatype is not byte or char (i.e it does require some actual
287                work to be done e.g. for external32.
288             */
289             need_to_copy = true;
290         }
291 
292         if ( need_to_copy ) {
293             char *tbuf=NULL;
294 
295             OMPIO_PREPARE_READ_BUF(fh,buf,count,datatype,tbuf,&ompio_req->req_convertor,max_data,decoded_iov,iov_count);
296 
297             ompio_req->req_tbuf = tbuf;
298             ompio_req->req_size = max_data;
299         }
300         else {
301             mca_common_ompio_decode_datatype (fh,
302                                               datatype,
303                                               count,
304                                               buf,
305                                               &max_data,
306                                               fh->f_mem_convertor,
307                                               &decoded_iov,
308                                               &iov_count);
309         }
310 
311         if ( 0 < max_data && 0 == fh->f_iov_count  ) {
312             ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
313             ompio_req->req_ompi.req_status._ucount = 0;
314             ompi_request_complete (&ompio_req->req_ompi, false);
315             *request = (ompi_request_t *) ompio_req;
316             if (NULL != decoded_iov) {
317                 free (decoded_iov);
318                 decoded_iov = NULL;
319             }
320 
321             return OMPI_SUCCESS;
322         }
323 
324         // Non-blocking operations have to occur in a single cycle
325         j = fh->f_index_in_file_view;
326 
327         mca_common_ompio_build_io_array ( fh,
328                                           0,         // index
329                                           1,         // no. of cyces
330                                           max_data,  // setting bytes per cycle to match data
331                                           max_data,
332                                           iov_count,
333                                           decoded_iov,
334                                           &i,
335                                           &j,
336                                           &total_bytes_read,
337                                           &spc,
338                                           &fh->f_io_array,
339                                           &fh->f_num_of_io_entries);
340 
341 	if (fh->f_num_of_io_entries) {
342 	  fh->f_fbtl->fbtl_ipreadv (fh, (ompi_request_t *) ompio_req);
343 	}
344 
345         mca_common_ompio_register_progress ();
346 
347 	fh->f_num_of_io_entries = 0;
348 	if (NULL != fh->f_io_array) {
349 	    free (fh->f_io_array);
350 	    fh->f_io_array = NULL;
351 	}
352 
353 	if (NULL != decoded_iov) {
354 	    free (decoded_iov);
355 	    decoded_iov = NULL;
356 	}
357     }
358     else {
359 	// This fbtl does not  support non-blocking operations
360 	ompi_status_public_t status;
361 	ret = mca_common_ompio_file_read (fh, buf, count, datatype, &status);
362 
363 	ompio_req->req_ompi.req_status.MPI_ERROR = ret;
364 	ompio_req->req_ompi.req_status._ucount = status._ucount;
365 	ompi_request_complete (&ompio_req->req_ompi, false);
366     }
367 
368     *request = (ompi_request_t *) ompio_req;
369     return ret;
370 }
371 
372 
mca_common_ompio_file_iread_at(ompio_file_t * fh,OMPI_MPI_OFFSET_TYPE offset,void * buf,int count,struct ompi_datatype_t * datatype,ompi_request_t ** request)373 int mca_common_ompio_file_iread_at (ompio_file_t *fh,
374 				  OMPI_MPI_OFFSET_TYPE offset,
375 				  void *buf,
376 				  int count,
377 				  struct ompi_datatype_t *datatype,
378 				  ompi_request_t **request)
379 {
380     int ret = OMPI_SUCCESS;
381     OMPI_MPI_OFFSET_TYPE prev_offset;
382     mca_common_ompio_file_get_position (fh, &prev_offset );
383 
384     mca_common_ompio_set_explicit_offset (fh, offset);
385     ret = mca_common_ompio_file_iread (fh,
386 				    buf,
387 				    count,
388 				    datatype,
389 				    request);
390 
391     /* An explicit offset file operation is not suppsed to modify
392     ** the internal file pointer. So reset the pointer
393     ** to the previous value
394     ** It is OK to reset the position already here, althgouth
395     ** the operation might still be pending/ongoing, since
396     ** the entire array of <offset, length, memaddress> have
397     ** already been constructed in the file_iread operation
398     */
399     mca_common_ompio_set_explicit_offset (fh, prev_offset);
400 
401     return ret;
402 }
403 
404 
405 /* Infrastructure for collective operations  */
mca_common_ompio_file_read_all(ompio_file_t * fh,void * buf,int count,struct ompi_datatype_t * datatype,ompi_status_public_t * status)406 int mca_common_ompio_file_read_all (ompio_file_t *fh,
407                                     void *buf,
408                                     int count,
409                                     struct ompi_datatype_t *datatype,
410                                     ompi_status_public_t * status)
411 {
412     int ret = OMPI_SUCCESS;
413 
414 
415     if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
416          !(datatype == &ompi_mpi_byte.dt  ||
417            datatype == &ompi_mpi_char.dt   )) {
418         /* No need to check for GPU buffer for collective I/O.
419            Most algorithms copy data from aggregators, and send/recv
420            to/from GPU buffers works if ompi was compiled was GPU support.
421 
422            If the individual fcoll component is used: there are no aggregators
423            in that concept. However, since they call common_ompio_file_write,
424            CUDA buffers are handled by that routine.
425 
426            Thus, we only check for
427            1. Datarepresentation is anything other than 'native' and
428            2. datatype is not byte or char (i.e it does require some actual
429               work to be done e.g. for external32.
430         */
431         size_t pos=0, max_data=0;
432         char *tbuf=NULL;
433         opal_convertor_t convertor;
434         struct iovec *decoded_iov = NULL;
435         uint32_t iov_count = 0;
436 
437         OMPIO_PREPARE_READ_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count);
438         ret = fh->f_fcoll->fcoll_file_read_all (fh,
439                                                 decoded_iov->iov_base,
440                                                 decoded_iov->iov_len,
441                                                 MPI_BYTE,
442                                                 status);
443         opal_convertor_unpack (&convertor, decoded_iov, &iov_count, &pos );
444 
445         opal_convertor_cleanup (&convertor);
446         mca_common_ompio_release_buf (fh, decoded_iov->iov_base);
447         if (NULL != decoded_iov) {
448             free (decoded_iov);
449             decoded_iov = NULL;
450         }
451     }
452     else {
453         ret = fh->f_fcoll->fcoll_file_read_all (fh,
454                                                 buf,
455                                                 count,
456                                                 datatype,
457                                                 status);
458     }
459     return ret;
460 }
461 
mca_common_ompio_file_read_at_all(ompio_file_t * fh,OMPI_MPI_OFFSET_TYPE offset,void * buf,int count,struct ompi_datatype_t * datatype,ompi_status_public_t * status)462 int mca_common_ompio_file_read_at_all (ompio_file_t *fh,
463 				     OMPI_MPI_OFFSET_TYPE offset,
464 				     void *buf,
465 				     int count,
466 				     struct ompi_datatype_t *datatype,
467 				     ompi_status_public_t * status)
468 {
469     int ret = OMPI_SUCCESS;
470     OMPI_MPI_OFFSET_TYPE prev_offset;
471     mca_common_ompio_file_get_position (fh, &prev_offset );
472 
473     mca_common_ompio_set_explicit_offset (fh, offset);
474     ret = mca_common_ompio_file_read_all (fh,
475                                           buf,
476                                           count,
477                                           datatype,
478                                           status);
479 
480     mca_common_ompio_set_explicit_offset (fh, prev_offset);
481     return ret;
482 }
483 
mca_common_ompio_file_iread_all(ompio_file_t * fp,void * buf,int count,struct ompi_datatype_t * datatype,ompi_request_t ** request)484 int mca_common_ompio_file_iread_all (ompio_file_t *fp,
485                                      void *buf,
486                                      int count,
487                                      struct ompi_datatype_t *datatype,
488                                      ompi_request_t **request)
489 {
490     int ret = OMPI_SUCCESS;
491 
492     if ( NULL != fp->f_fcoll->fcoll_file_iread_all ) {
493 	ret = fp->f_fcoll->fcoll_file_iread_all (fp,
494 						 buf,
495 						 count,
496 						 datatype,
497 						 request);
498     }
499     else {
500 	/* this fcoll component does not support non-blocking
501 	   collective I/O operations. WE fake it with
502 	   individual non-blocking I/O operations. */
503 	ret = mca_common_ompio_file_iread ( fp, buf, count, datatype, request );
504     }
505 
506     return ret;
507 }
508 
mca_common_ompio_file_iread_at_all(ompio_file_t * fp,OMPI_MPI_OFFSET_TYPE offset,void * buf,int count,struct ompi_datatype_t * datatype,ompi_request_t ** request)509 int mca_common_ompio_file_iread_at_all (ompio_file_t *fp,
510 				      OMPI_MPI_OFFSET_TYPE offset,
511 				      void *buf,
512 				      int count,
513 				      struct ompi_datatype_t *datatype,
514 				      ompi_request_t **request)
515 {
516     int ret = OMPI_SUCCESS;
517     OMPI_MPI_OFFSET_TYPE prev_offset;
518 
519     mca_common_ompio_file_get_position (fp, &prev_offset );
520     mca_common_ompio_set_explicit_offset (fp, offset);
521 
522     ret = mca_common_ompio_file_iread_all (fp,
523                                            buf,
524                                            count,
525                                            datatype,
526                                            request);
527 
528     mca_common_ompio_set_explicit_offset (fp, prev_offset);
529     return ret;
530 }
531 
532 
mca_common_ompio_set_explicit_offset(ompio_file_t * fh,OMPI_MPI_OFFSET_TYPE offset)533 int mca_common_ompio_set_explicit_offset (ompio_file_t *fh,
534                                           OMPI_MPI_OFFSET_TYPE offset)
535 {
536     size_t i = 0;
537     size_t k = 0;
538 
539     if ( fh->f_view_size  > 0 ) {
540 	/* starting offset of the current copy of the filew view */
541 	fh->f_offset = (fh->f_view_extent *
542 			((offset*fh->f_etype_size) / fh->f_view_size)) + fh->f_disp;
543 
544 
545 	/* number of bytes used within the current copy of the file view */
546 	fh->f_total_bytes = (offset*fh->f_etype_size) % fh->f_view_size;
547 	i = fh->f_total_bytes;
548 
549 
550 	/* Initialize the block id and the starting offset of the current block
551 	   within the current copy of the file view to zero */
552 	fh->f_index_in_file_view = 0;
553 	fh->f_position_in_file_view = 0;
554 
555 	/* determine block id that the offset is located in and
556 	   the starting offset of that block */
557 	k = fh->f_decoded_iov[fh->f_index_in_file_view].iov_len;
558 	while (i >= k) {
559 	    fh->f_position_in_file_view = k;
560 	    fh->f_index_in_file_view++;
561 	    k += fh->f_decoded_iov[fh->f_index_in_file_view].iov_len;
562 	}
563     }
564 
565     return OMPI_SUCCESS;
566 }
567