1 /*
2 * Copyright (c) 2004-2005 The Trustees of Indiana University and Indiana
3 * University Research and Technology
4 * Corporation. All rights reserved.
5 * Copyright (c) 2004-2016 The University of Tennessee and The University
6 * of Tennessee Research Foundation. All rights
7 * reserved.
8 * Copyright (c) 2004-2005 High Performance Computing Center Stuttgart,
9 * University of Stuttgart. All rights reserved.
10 * Copyright (c) 2004-2005 The Regents of the University of California.
11 * All rights reserved.
12 * Copyright (c) 2008-2019 University of Houston. All rights reserved.
13 * Copyright (c) 2018 Research Organization for Information Science
14 * and Technology (RIST). All rights reserved.
15 * $COPYRIGHT$
16 *
17 * Additional copyrights may follow
18 *
19 * $HEADER$
20 */
21
22 #include "ompi_config.h"
23
24 #include "ompi/communicator/communicator.h"
25 #include "ompi/info/info.h"
26 #include "ompi/file/file.h"
27 #include "ompi/mca/fs/fs.h"
28 #include "ompi/mca/fs/base/base.h"
29 #include "ompi/mca/fcoll/fcoll.h"
30 #include "ompi/mca/fcoll/base/base.h"
31 #include "ompi/mca/fbtl/fbtl.h"
32 #include "ompi/mca/fbtl/base/base.h"
33
34 #include "common_ompio.h"
35 #include "common_ompio_request.h"
36 #include "common_ompio_buffer.h"
37 #include <unistd.h>
38 #include <math.h>
39
40
41 /* Read and write routines are split into two interfaces.
42 ** The
43 ** mca_io_ompio_file_read/write[_at]
44 **
45 ** routines are the ones registered with the ompio modules.
46 ** The
47 **
48 ** mca_common_ompio_file_read/write[_at]
49 **
50 ** routesin are used e.g. from the shared file pointer modules.
51 ** The main difference is, that the first one takes an ompi_file_t
52 ** as a file pointer argument, while the second uses the ompio internal
53 ** ompio_file_t structure.
54 */
55
mca_common_ompio_file_read(ompio_file_t * fh,void * buf,int count,struct ompi_datatype_t * datatype,ompi_status_public_t * status)56 int mca_common_ompio_file_read (ompio_file_t *fh,
57 void *buf,
58 int count,
59 struct ompi_datatype_t *datatype,
60 ompi_status_public_t *status)
61 {
62 int ret = OMPI_SUCCESS;
63
64 size_t total_bytes_read = 0; /* total bytes that have been read*/
65 size_t bytes_per_cycle = 0; /* total read in each cycle by each process*/
66 int index = 0;
67 int cycles = 0;
68
69 uint32_t iov_count = 0;
70 struct iovec *decoded_iov = NULL;
71
72 size_t max_data=0, real_bytes_read=0;
73 size_t spc=0;
74 ssize_t ret_code=0;
75 int i = 0; /* index into the decoded iovec of the buffer */
76 int j = 0; /* index into the file vie iovec */
77
78 if (fh->f_amode & MPI_MODE_WRONLY){
79 // opal_output(10, "Improper use of FILE Mode, Using WRONLY for Read!\n");
80 ret = MPI_ERR_ACCESS;
81 return ret;
82 }
83
84 if ( 0 == count ) {
85 if ( MPI_STATUS_IGNORE != status ) {
86 status->_ucount = 0;
87 }
88 return ret;
89 }
90
91 bool need_to_copy = false;
92 opal_convertor_t convertor;
93 #if OPAL_CUDA_SUPPORT
94 int is_gpu, is_managed;
95 mca_common_ompio_check_gpu_buf ( fh, buf, &is_gpu, &is_managed);
96 if ( is_gpu && !is_managed ) {
97 need_to_copy = true;
98 }
99 #endif
100
101 if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
102 !(datatype == &ompi_mpi_byte.dt ||
103 datatype == &ompi_mpi_char.dt )) {
104 /* only need to copy if any of these conditions are given:
105 1. buffer is an unmanaged CUDA buffer (checked above).
106 2. Datarepresentation is anything other than 'native' and
107 3. datatype is not byte or char (i.e it does require some actual
108 work to be done e.g. for external32.
109 */
110 need_to_copy = true;
111 }
112
113 if ( need_to_copy ) {
114 char *tbuf=NULL;
115
116 OMPIO_PREPARE_READ_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count);
117 }
118 else {
119 mca_common_ompio_decode_datatype (fh,
120 datatype,
121 count,
122 buf,
123 &max_data,
124 fh->f_mem_convertor,
125 &decoded_iov,
126 &iov_count);
127 }
128
129 if ( 0 < max_data && 0 == fh->f_iov_count ) {
130 if ( MPI_STATUS_IGNORE != status ) {
131 status->_ucount = 0;
132 }
133 if (NULL != decoded_iov) {
134 free (decoded_iov);
135 decoded_iov = NULL;
136 }
137 return OMPI_SUCCESS;
138 }
139
140 if ( -1 == OMPIO_MCA_GET(fh, cycle_buffer_size )) {
141 bytes_per_cycle = max_data;
142 }
143 else {
144 bytes_per_cycle = OMPIO_MCA_GET(fh, cycle_buffer_size);
145 }
146 cycles = ceil((double)max_data/bytes_per_cycle);
147
148 #if 0
149 printf ("Bytes per Cycle: %d Cycles: %d max_data:%d \n",bytes_per_cycle, cycles, max_data);
150 #endif
151
152 j = fh->f_index_in_file_view;
153
154 for (index = 0; index < cycles; index++) {
155
156 mca_common_ompio_build_io_array ( fh,
157 index,
158 cycles,
159 bytes_per_cycle,
160 max_data,
161 iov_count,
162 decoded_iov,
163 &i,
164 &j,
165 &total_bytes_read,
166 &spc,
167 &fh->f_io_array,
168 &fh->f_num_of_io_entries);
169
170 if (fh->f_num_of_io_entries) {
171 ret_code = fh->f_fbtl->fbtl_preadv (fh);
172 if ( 0<= ret_code ) {
173 real_bytes_read+=(size_t)ret_code;
174 }
175 }
176
177 fh->f_num_of_io_entries = 0;
178 if (NULL != fh->f_io_array) {
179 free (fh->f_io_array);
180 fh->f_io_array = NULL;
181 }
182 }
183
184 if ( need_to_copy ) {
185 size_t pos=0;
186
187 opal_convertor_unpack (&convertor, decoded_iov, &iov_count, &pos );
188 opal_convertor_cleanup (&convertor);
189 mca_common_ompio_release_buf (fh, decoded_iov->iov_base);
190 }
191
192 if (NULL != decoded_iov) {
193 free (decoded_iov);
194 decoded_iov = NULL;
195 }
196
197 if ( MPI_STATUS_IGNORE != status ) {
198 status->_ucount = real_bytes_read;
199 }
200
201 return ret;
202 }
203
mca_common_ompio_file_read_at(ompio_file_t * fh,OMPI_MPI_OFFSET_TYPE offset,void * buf,int count,struct ompi_datatype_t * datatype,ompi_status_public_t * status)204 int mca_common_ompio_file_read_at (ompio_file_t *fh,
205 OMPI_MPI_OFFSET_TYPE offset,
206 void *buf,
207 int count,
208 struct ompi_datatype_t *datatype,
209 ompi_status_public_t * status)
210 {
211 int ret = OMPI_SUCCESS;
212 OMPI_MPI_OFFSET_TYPE prev_offset;
213
214 mca_common_ompio_file_get_position (fh, &prev_offset );
215
216 mca_common_ompio_set_explicit_offset (fh, offset);
217 ret = mca_common_ompio_file_read (fh,
218 buf,
219 count,
220 datatype,
221 status);
222
223 // An explicit offset file operation is not suppsed to modify
224 // the internal file pointer. So reset the pointer
225 // to the previous value
226 mca_common_ompio_set_explicit_offset (fh, prev_offset);
227
228 return ret;
229 }
230
231
mca_common_ompio_file_iread(ompio_file_t * fh,void * buf,int count,struct ompi_datatype_t * datatype,ompi_request_t ** request)232 int mca_common_ompio_file_iread (ompio_file_t *fh,
233 void *buf,
234 int count,
235 struct ompi_datatype_t *datatype,
236 ompi_request_t **request)
237 {
238 int ret = OMPI_SUCCESS;
239 mca_ompio_request_t *ompio_req=NULL;
240 size_t spc=0;
241
242 if (fh->f_amode & MPI_MODE_WRONLY){
243 // opal_output(10, "Improper use of FILE Mode, Using WRONLY for Read!\n");
244 ret = MPI_ERR_ACCESS;
245 return ret;
246 }
247
248 mca_common_ompio_request_alloc ( &ompio_req, MCA_OMPIO_REQUEST_READ);
249
250 if ( 0 == count ) {
251 ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
252 ompio_req->req_ompi.req_status._ucount = 0;
253 ompi_request_complete (&ompio_req->req_ompi, false);
254 *request = (ompi_request_t *) ompio_req;
255
256 return OMPI_SUCCESS;
257 }
258
259 if ( NULL != fh->f_fbtl->fbtl_ipreadv ) {
260 // This fbtl has support for non-blocking operations
261
262 size_t total_bytes_read = 0; /* total bytes that have been read*/
263 uint32_t iov_count = 0;
264 struct iovec *decoded_iov = NULL;
265
266 size_t max_data = 0;
267 int i = 0; /* index into the decoded iovec of the buffer */
268 int j = 0; /* index into the file vie iovec */
269
270 bool need_to_copy = false;
271
272 #if OPAL_CUDA_SUPPORT
273 int is_gpu, is_managed;
274 mca_common_ompio_check_gpu_buf ( fh, buf, &is_gpu, &is_managed);
275 if ( is_gpu && !is_managed ) {
276 need_to_copy = true;
277 }
278 #endif
279
280 if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
281 !(datatype == &ompi_mpi_byte.dt ||
282 datatype == &ompi_mpi_char.dt )) {
283 /* only need to copy if any of these conditions are given:
284 1. buffer is an unmanaged CUDA buffer (checked above).
285 2. Datarepresentation is anything other than 'native' and
286 3. datatype is not byte or char (i.e it does require some actual
287 work to be done e.g. for external32.
288 */
289 need_to_copy = true;
290 }
291
292 if ( need_to_copy ) {
293 char *tbuf=NULL;
294
295 OMPIO_PREPARE_READ_BUF(fh,buf,count,datatype,tbuf,&ompio_req->req_convertor,max_data,decoded_iov,iov_count);
296
297 ompio_req->req_tbuf = tbuf;
298 ompio_req->req_size = max_data;
299 }
300 else {
301 mca_common_ompio_decode_datatype (fh,
302 datatype,
303 count,
304 buf,
305 &max_data,
306 fh->f_mem_convertor,
307 &decoded_iov,
308 &iov_count);
309 }
310
311 if ( 0 < max_data && 0 == fh->f_iov_count ) {
312 ompio_req->req_ompi.req_status.MPI_ERROR = OMPI_SUCCESS;
313 ompio_req->req_ompi.req_status._ucount = 0;
314 ompi_request_complete (&ompio_req->req_ompi, false);
315 *request = (ompi_request_t *) ompio_req;
316 if (NULL != decoded_iov) {
317 free (decoded_iov);
318 decoded_iov = NULL;
319 }
320
321 return OMPI_SUCCESS;
322 }
323
324 // Non-blocking operations have to occur in a single cycle
325 j = fh->f_index_in_file_view;
326
327 mca_common_ompio_build_io_array ( fh,
328 0, // index
329 1, // no. of cyces
330 max_data, // setting bytes per cycle to match data
331 max_data,
332 iov_count,
333 decoded_iov,
334 &i,
335 &j,
336 &total_bytes_read,
337 &spc,
338 &fh->f_io_array,
339 &fh->f_num_of_io_entries);
340
341 if (fh->f_num_of_io_entries) {
342 fh->f_fbtl->fbtl_ipreadv (fh, (ompi_request_t *) ompio_req);
343 }
344
345 mca_common_ompio_register_progress ();
346
347 fh->f_num_of_io_entries = 0;
348 if (NULL != fh->f_io_array) {
349 free (fh->f_io_array);
350 fh->f_io_array = NULL;
351 }
352
353 if (NULL != decoded_iov) {
354 free (decoded_iov);
355 decoded_iov = NULL;
356 }
357 }
358 else {
359 // This fbtl does not support non-blocking operations
360 ompi_status_public_t status;
361 ret = mca_common_ompio_file_read (fh, buf, count, datatype, &status);
362
363 ompio_req->req_ompi.req_status.MPI_ERROR = ret;
364 ompio_req->req_ompi.req_status._ucount = status._ucount;
365 ompi_request_complete (&ompio_req->req_ompi, false);
366 }
367
368 *request = (ompi_request_t *) ompio_req;
369 return ret;
370 }
371
372
mca_common_ompio_file_iread_at(ompio_file_t * fh,OMPI_MPI_OFFSET_TYPE offset,void * buf,int count,struct ompi_datatype_t * datatype,ompi_request_t ** request)373 int mca_common_ompio_file_iread_at (ompio_file_t *fh,
374 OMPI_MPI_OFFSET_TYPE offset,
375 void *buf,
376 int count,
377 struct ompi_datatype_t *datatype,
378 ompi_request_t **request)
379 {
380 int ret = OMPI_SUCCESS;
381 OMPI_MPI_OFFSET_TYPE prev_offset;
382 mca_common_ompio_file_get_position (fh, &prev_offset );
383
384 mca_common_ompio_set_explicit_offset (fh, offset);
385 ret = mca_common_ompio_file_iread (fh,
386 buf,
387 count,
388 datatype,
389 request);
390
391 /* An explicit offset file operation is not suppsed to modify
392 ** the internal file pointer. So reset the pointer
393 ** to the previous value
394 ** It is OK to reset the position already here, althgouth
395 ** the operation might still be pending/ongoing, since
396 ** the entire array of <offset, length, memaddress> have
397 ** already been constructed in the file_iread operation
398 */
399 mca_common_ompio_set_explicit_offset (fh, prev_offset);
400
401 return ret;
402 }
403
404
405 /* Infrastructure for collective operations */
mca_common_ompio_file_read_all(ompio_file_t * fh,void * buf,int count,struct ompi_datatype_t * datatype,ompi_status_public_t * status)406 int mca_common_ompio_file_read_all (ompio_file_t *fh,
407 void *buf,
408 int count,
409 struct ompi_datatype_t *datatype,
410 ompi_status_public_t * status)
411 {
412 int ret = OMPI_SUCCESS;
413
414
415 if ( !( fh->f_flags & OMPIO_DATAREP_NATIVE ) &&
416 !(datatype == &ompi_mpi_byte.dt ||
417 datatype == &ompi_mpi_char.dt )) {
418 /* No need to check for GPU buffer for collective I/O.
419 Most algorithms copy data from aggregators, and send/recv
420 to/from GPU buffers works if ompi was compiled was GPU support.
421
422 If the individual fcoll component is used: there are no aggregators
423 in that concept. However, since they call common_ompio_file_write,
424 CUDA buffers are handled by that routine.
425
426 Thus, we only check for
427 1. Datarepresentation is anything other than 'native' and
428 2. datatype is not byte or char (i.e it does require some actual
429 work to be done e.g. for external32.
430 */
431 size_t pos=0, max_data=0;
432 char *tbuf=NULL;
433 opal_convertor_t convertor;
434 struct iovec *decoded_iov = NULL;
435 uint32_t iov_count = 0;
436
437 OMPIO_PREPARE_READ_BUF(fh,buf,count,datatype,tbuf,&convertor,max_data,decoded_iov,iov_count);
438 ret = fh->f_fcoll->fcoll_file_read_all (fh,
439 decoded_iov->iov_base,
440 decoded_iov->iov_len,
441 MPI_BYTE,
442 status);
443 opal_convertor_unpack (&convertor, decoded_iov, &iov_count, &pos );
444
445 opal_convertor_cleanup (&convertor);
446 mca_common_ompio_release_buf (fh, decoded_iov->iov_base);
447 if (NULL != decoded_iov) {
448 free (decoded_iov);
449 decoded_iov = NULL;
450 }
451 }
452 else {
453 ret = fh->f_fcoll->fcoll_file_read_all (fh,
454 buf,
455 count,
456 datatype,
457 status);
458 }
459 return ret;
460 }
461
mca_common_ompio_file_read_at_all(ompio_file_t * fh,OMPI_MPI_OFFSET_TYPE offset,void * buf,int count,struct ompi_datatype_t * datatype,ompi_status_public_t * status)462 int mca_common_ompio_file_read_at_all (ompio_file_t *fh,
463 OMPI_MPI_OFFSET_TYPE offset,
464 void *buf,
465 int count,
466 struct ompi_datatype_t *datatype,
467 ompi_status_public_t * status)
468 {
469 int ret = OMPI_SUCCESS;
470 OMPI_MPI_OFFSET_TYPE prev_offset;
471 mca_common_ompio_file_get_position (fh, &prev_offset );
472
473 mca_common_ompio_set_explicit_offset (fh, offset);
474 ret = mca_common_ompio_file_read_all (fh,
475 buf,
476 count,
477 datatype,
478 status);
479
480 mca_common_ompio_set_explicit_offset (fh, prev_offset);
481 return ret;
482 }
483
mca_common_ompio_file_iread_all(ompio_file_t * fp,void * buf,int count,struct ompi_datatype_t * datatype,ompi_request_t ** request)484 int mca_common_ompio_file_iread_all (ompio_file_t *fp,
485 void *buf,
486 int count,
487 struct ompi_datatype_t *datatype,
488 ompi_request_t **request)
489 {
490 int ret = OMPI_SUCCESS;
491
492 if ( NULL != fp->f_fcoll->fcoll_file_iread_all ) {
493 ret = fp->f_fcoll->fcoll_file_iread_all (fp,
494 buf,
495 count,
496 datatype,
497 request);
498 }
499 else {
500 /* this fcoll component does not support non-blocking
501 collective I/O operations. WE fake it with
502 individual non-blocking I/O operations. */
503 ret = mca_common_ompio_file_iread ( fp, buf, count, datatype, request );
504 }
505
506 return ret;
507 }
508
mca_common_ompio_file_iread_at_all(ompio_file_t * fp,OMPI_MPI_OFFSET_TYPE offset,void * buf,int count,struct ompi_datatype_t * datatype,ompi_request_t ** request)509 int mca_common_ompio_file_iread_at_all (ompio_file_t *fp,
510 OMPI_MPI_OFFSET_TYPE offset,
511 void *buf,
512 int count,
513 struct ompi_datatype_t *datatype,
514 ompi_request_t **request)
515 {
516 int ret = OMPI_SUCCESS;
517 OMPI_MPI_OFFSET_TYPE prev_offset;
518
519 mca_common_ompio_file_get_position (fp, &prev_offset );
520 mca_common_ompio_set_explicit_offset (fp, offset);
521
522 ret = mca_common_ompio_file_iread_all (fp,
523 buf,
524 count,
525 datatype,
526 request);
527
528 mca_common_ompio_set_explicit_offset (fp, prev_offset);
529 return ret;
530 }
531
532
mca_common_ompio_set_explicit_offset(ompio_file_t * fh,OMPI_MPI_OFFSET_TYPE offset)533 int mca_common_ompio_set_explicit_offset (ompio_file_t *fh,
534 OMPI_MPI_OFFSET_TYPE offset)
535 {
536 size_t i = 0;
537 size_t k = 0;
538
539 if ( fh->f_view_size > 0 ) {
540 /* starting offset of the current copy of the filew view */
541 fh->f_offset = (fh->f_view_extent *
542 ((offset*fh->f_etype_size) / fh->f_view_size)) + fh->f_disp;
543
544
545 /* number of bytes used within the current copy of the file view */
546 fh->f_total_bytes = (offset*fh->f_etype_size) % fh->f_view_size;
547 i = fh->f_total_bytes;
548
549
550 /* Initialize the block id and the starting offset of the current block
551 within the current copy of the file view to zero */
552 fh->f_index_in_file_view = 0;
553 fh->f_position_in_file_view = 0;
554
555 /* determine block id that the offset is located in and
556 the starting offset of that block */
557 k = fh->f_decoded_iov[fh->f_index_in_file_view].iov_len;
558 while (i >= k) {
559 fh->f_position_in_file_view = k;
560 fh->f_index_in_file_view++;
561 k += fh->f_decoded_iov[fh->f_index_in_file_view].iov_len;
562 }
563 }
564
565 return OMPI_SUCCESS;
566 }
567