1 #if HAVE_CONFIG_H
2 # include "config.h"
3 #endif
4
5 /** @file
6 **********************************************************************\
7 ELementary I/O (ELIO) disk operations for parallel I/O libraries
8 Authors: Jarek Nieplocha (PNNL) and Jace Mogill (ANL)
9 *
10 * DISCLAIMER
11 *
12 * This material was prepared as an account of work sponsored by an
13 * agency of the United States Government. Neither the United States
14 * Government nor the United States Department of Energy, nor Battelle,
15 * nor any of their employees, MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR
16 * ASSUMES ANY LEGAL LIABILITY OR RESPONSIBILITY FOR THE ACCURACY,
17 * COMPLETENESS, OR USEFULNESS OF ANY INFORMATION, APPARATUS, PRODUCT,
18 * SOFTWARE, OR PROCESS DISCLOSED, OR REPRESENTS THAT ITS USE WOULD NOT
19 * INFRINGE PRIVATELY OWNED RIGHTS.
20 *
21 * ACKNOWLEDGMENT
22 *
23 * This software and its documentation were produced with United States
24 * Government support under Contract Number DE-AC06-76RLO-1830 awarded by
25 * the United States Department of Energy. The United States Government
26 * retains a paid-up non-exclusive, irrevocable worldwide license to
27 * reproduce, prepare derivative works, perform publicly and display
28 * publicly by or for the US Government, including the right to
29 * distribute to other US Government contractors.
30 */
31 #ifdef USE_LUSTRE
32 #include <lustre/lustre_user.h> /* for O_LOV_DELAY_CREATE, LL_IOC_LOV_SETSTRIPE */
33 #include <linux/lustre_idl.h> /* for struct lov_mds_md, LOV_MAGIC */
34 #include <sys/ioctl.h> /* for ioctl */
35 #endif
36
37 #include "eliop.h"
38
39 #if defined(WIN32)
40 #undef ERROR
41 #endif
42
43 #include "../sf/coms.h"
44
45 #if defined(CRAY) && defined(__crayx1)
46 #undef CRAY
47 #endif
48
49 #if defined(AIX) || defined(DECOSF) || defined(SGI64) || defined(CRAY) || defined(LINUXAIO)
50 /* systems with Asynchronous I/O */
51 #else
52 # ifndef NOAIO
53 # define NOAIO
54 # endif
55 #endif
56
57 /****************** Internal Constants and Parameters **********************/
58
59 #define MAX_AIO_REQ 4
60 #define NULL_AIO -123456
61 #define FOPEN_MODE 0644
62 #define MAX_ATTEMPTS 10
63
64
65 #ifndef NOAIO
66 # define AIO 1
67 #endif
68
69
70 #ifdef FFIO
71 # define WRITE ffwrite
72 # define WRITEA ffwritea
73 # define READ ffread
74 # define READA ffreada
75 # define CLOSE ffclose
76 # define SEEK ffseek
77 # define OPEN ffopens
78 # define DEFARG FULL
79 #else
80 # define WRITE write
81 # define WRITEA writea
82 # define READ read
83 # define READA reada
84 # define CLOSE close
85 # define SEEK lseek
86 # define OPEN open
87 # define DEFARG 0
88 #endif
89
90
91 #ifdef WIN32
92 #define ELIO_FSYNC _commit
93 #else
94 #include <unistd.h>
95 #define ELIO_FSYNC fsync
96 #endif
97
98 /* structure to emulate control block in Posix AIO */
99 #if defined (CRAY)
100 # if defined(FFIO)
101 typedef struct { struct ffsw stat; int filedes; }io_status_t;
102 # else
103 # include <sys/iosw.h>
104 typedef struct { struct iosw stat; int filedes; }io_status_t;
105 # endif
106 io_status_t cb_fout[MAX_AIO_REQ];
107 io_status_t *cb_fout_arr[MAX_AIO_REQ];
108
109 #elif defined(AIO)
110 # include <aio.h>
111 # if defined(AIX)
112 # define INPROGRESS EINPROG
113 # else
114 # define INPROGRESS EINPROGRESS
115 # endif
116 struct aiocb cb_fout[MAX_AIO_REQ];
117 #ifndef AIX
118 const
119 #endif
120 struct aiocb *cb_fout_arr[MAX_AIO_REQ];
121 #endif
122
123 #ifndef INPROGRESS
124 # define INPROGRESS 1
125 #endif
126
127 static long aio_req[MAX_AIO_REQ]; /* array for AIO requests */
128 static int first_elio_init = 1; /* intialization status */
129 int _elio_Errors_Fatal=0; /* sets mode of handling errors */
130
131
132 /****************************** Internal Macros *****************************/
133 #if defined(AIO)
134 # define AIO_LOOKUP(aio_i) {\
135 aio_i = 0;\
136 while(aio_req[aio_i] != NULL_AIO && aio_i < MAX_AIO_REQ) aio_i++;\
137 }
138 #else
139 # define AIO_LOOKUP(aio_i) aio_i = MAX_AIO_REQ
140 #endif
141
142 #define SYNC_EMULATE(op) *req_id = ELIO_DONE; \
143 if((stat= elio_ ## op (fd, offset, buf, bytes)) != bytes ){ \
144 ELIO_ERROR(stat,0); \
145 }else \
146 stat = 0;
147
148 #ifndef MIN
149 #define PARIO_MIN(a,b) (((a) <= (b)) ? (a) : (b))
150 #endif
151
152 /*
153 * Offsets bigger than ABSURDLY_LARGE generate a SEEKFAIL.
154 * The maximum no. of extents permitted for a file is MAX_EXTENT.
155 */
156
157 #if defined(_LARGE_FILES) || defined(_LARGEFILE_SOURCE) || defined(_LARGEFILE64_SOURCE) || _FILE_OFFSET_BITS+0 == 64 || SIZEOF_VOIDP == 8
158 # define LARGE_FILES
159 #endif
160
161 #define MAX_EXTENT 127
162 #ifdef LARGE_FILES
163 #define ABSURDLY_LARGE 1e14
164 #else
165 #define ABSURDLY_LARGE (MAX_EXTENT*2147483648.0)
166 #endif
167
168 /*****************************************************************************/
169
elio_max_file_size(Fd_t fd)170 static Off_t elio_max_file_size(Fd_t fd)
171 /*
172 * Return the maximum size permitted for this PHYSICAL file.
173 * Presently not file dependent.
174 */
175 {
176 #ifdef LARGE_FILES
177 return ABSURDLY_LARGE;
178 #else
179 return (2047.0*1024.0*1024.0); /* 2 GB - 1 MB */
180 #endif
181 }
182
elio_get_next_extent(Fd_t fd)183 static Fd_t elio_get_next_extent(Fd_t fd)
184 /*
185 * Return a pointer to the file descriptor that forms
186 * the next extent of this file. If the extension file
187 * does not exist then it is opened. If the open fails
188 * then the usual error condition of elio_open is returned.
189 */
190 {
191 Fd_t next_fd = (Fd_t) fd->next;
192 if (!next_fd) {
193 /* Eventually need to replace this with user controllable naming
194 * and combine with similar logic in delete routine.
195 */
196 char fname[ELIO_FILENAME_MAX];
197 int len;
198 if (fd->extent >= MAX_EXTENT)
199 return 0;
200 strcpy(fname, fd->name);
201 len = strlen(fname);
202 if (fd->extent) len -= 4;
203 sprintf(fname+len,"x%3.3d",fd->extent+1);
204 /*printf("Opening extent %d with name '%s'\n",fd->extent+1,fname);*/
205 if ((next_fd = elio_open(fname, fd->type, fd->mode))) {
206 next_fd->extent = fd->extent + 1;
207 fd->next = (struct fd_struct *) next_fd;
208 }
209 }
210 return next_fd;
211 }
212
elio_errors_fatal(int onoff)213 void elio_errors_fatal(int onoff)
214 {
215 _elio_Errors_Fatal = onoff;
216 }
217
218
219 /*\ Blocking Write
220 * - returns number of bytes written or error code (<0) if failed
221 \*/
elio_write(Fd_t fd,Off_t doffset,const void * buf,Size_t bytes)222 Size_t elio_write(Fd_t fd, Off_t doffset, const void* buf, Size_t bytes)
223 {
224 off_t offset;
225 Size_t stat, bytes_to_write = bytes;
226 Size_t nextbytes;
227
228 if (doffset >= ABSURDLY_LARGE)
229 ELIO_ERROR(SEEKFAIL,0);
230
231 /* Follow the linked list of extents down until we hit the file
232 that contains the offset */
233 if (doffset >= elio_max_file_size(fd)) {
234 Fd_t next_fd = elio_get_next_extent(fd);
235 if (!next_fd) ELIO_ERROR(OPENFAIL,0);
236 doffset -= elio_max_file_size(fd);
237 return elio_write(next_fd, doffset, buf, bytes);
238 }
239
240 /* Figure out if the write continues onto the next extent */
241 offset = (off_t) doffset;
242 nextbytes = 0;
243 if ((doffset+bytes_to_write) >= elio_max_file_size(fd)) {
244 nextbytes = bytes_to_write;
245 bytes_to_write = (Size_t) (elio_max_file_size(fd)-doffset);
246 nextbytes -= bytes_to_write;
247 }
248 /*printf("TRYING TO WRITE AT doffset=%f offset=%lu bw=%lu nb=%lu\n", doffset, offset,
249 bytes_to_write, nextbytes);*/
250
251 /* Write to this extent */
252
253 #ifdef PABLO
254 int pablo_code = PABLO_elio_write;
255 PABLO_start( pablo_code );
256 #endif
257
258 if(offset != SEEK(fd->fd,offset,SEEK_SET)) ELIO_ERROR(SEEKFAIL,0);
259
260 while (bytes_to_write) {
261 stat = WRITE(fd->fd, buf, bytes_to_write);
262 if ((stat == -1) && ((errno == EINTR) || (errno == EAGAIN))) {
263 ; /* interrupted write should be restarted */
264 } else if (stat > 0) {
265 bytes_to_write -= stat;
266 buf = stat + (char*)buf; /*advance pointer by # bytes written*/
267 } else {
268 ELIO_ERROR(WRITFAIL, stat);
269 }
270 }
271
272 /* Only get here if all has gone OK */
273
274 #ifdef PABLO
275 PABLO_end(pablo_code);
276 #endif
277
278 /* Write to next extent(s) ... relies on incrementing of buf */
279 if (nextbytes) {
280 Fd_t next_fd = elio_get_next_extent(fd);
281 if (!next_fd) ELIO_ERROR(OPENFAIL,0);
282 stat = elio_write(next_fd, (Off_t) 0, buf, nextbytes);
283 if (stat != nextbytes)
284 ELIO_ERROR(WRITFAIL, stat);
285 }
286
287 return bytes;
288 }
289
elio_set_cb(Fd_t fd,Off_t doffset,int reqn,void * buf,Size_t bytes)290 int elio_set_cb(Fd_t fd, Off_t doffset, int reqn, void *buf, Size_t bytes)
291 {
292 #if defined(AIO)
293 off_t offset = (off_t) doffset;
294 # if defined(CRAY)
295 if(offset != SEEK(fd->fd, offset, SEEK_SET))return (SEEKFAIL);
296 cb_fout_arr[reqn] = cb_fout+reqn;
297 cb_fout[reqn].filedes = fd->fd;
298 # else
299 cb_fout[reqn].aio_offset = offset;
300 cb_fout_arr[reqn] = cb_fout+reqn;
301 cb_fout[reqn].aio_buf = buf;
302 cb_fout[reqn].aio_nbytes = bytes;
303 # if defined(AIX)
304 cb_fout[reqn].aio_whence = SEEK_SET;
305 # else
306 cb_fout[reqn].aio_sigevent.sigev_notify = SIGEV_NONE;
307 cb_fout[reqn].aio_fildes = fd->fd;
308 # endif
309 # endif
310 #endif
311 return ELIO_OK;
312 }
313
314
315 /*\ Asynchronous Write: returns 0 if succeded or err code if failed
316 \*/
elio_awrite(Fd_t fd,Off_t doffset,const void * buf,Size_t bytes,io_request_t * req_id)317 int elio_awrite(Fd_t fd, Off_t doffset, const void* buf, Size_t bytes, io_request_t * req_id)
318 {
319 off_t offset;
320 Size_t stat;
321 #ifdef AIO
322 int aio_i;
323 #endif
324
325 if (doffset >= ABSURDLY_LARGE)
326 ELIO_ERROR(SEEKFAIL,0);
327
328 /* Follow the linked list of extents down until we hit the file
329 that contains the offset */
330 if (doffset >= elio_max_file_size(fd)) {
331 Fd_t next_fd = elio_get_next_extent(fd);
332 if (!next_fd) ELIO_ERROR(OPENFAIL,0);
333 doffset -= elio_max_file_size(fd);
334 return elio_awrite(next_fd, doffset, buf, bytes, req_id);
335 }
336
337 /* Figure out if the write continues onto the next extent
338 * ... if so then force the entire request to be done synchronously
339 * so that we don't have to manage multiple async requests */
340
341 if ((doffset+((Off_t) bytes)) >= elio_max_file_size(fd)) {
342 *req_id = ELIO_DONE;
343 if (elio_write(fd, doffset, buf, bytes) != bytes)
344 return -1;
345 else
346 return 0;
347 }
348
349 offset = (off_t) doffset;
350
351 #ifdef PABLO
352 int pablo_code = PABLO_elio_awrite;
353 PABLO_start( pablo_code );
354 #endif
355
356 *req_id = ELIO_DONE;
357
358 #ifdef AIO
359 AIO_LOOKUP(aio_i);
360
361 /* blocking io when request table is full */
362 if(aio_i >= MAX_AIO_REQ){
363 # if defined(DEBUG) && defined(ASYNC)
364 fprintf(stderr, "elio_awrite: Warning- asynch overflow\n");
365 # endif
366 SYNC_EMULATE(write);
367 } else {
368 int rc;
369 *req_id = (io_request_t) aio_i;
370 if((rc=elio_set_cb(fd, offset, aio_i, (void*) buf, bytes)))
371 ELIO_ERROR(rc,0);
372
373 # if defined(CRAY)
374 rc = WRITEA(fd->fd, (char*)buf, bytes, &cb_fout[aio_i].stat, DEFARG);
375 stat = (rc < 0)? -1 : 0;
376 # elif defined(AIX)
377 # if !defined(AIX52) && !defined(_AIO_AIX_SOURCE)
378 stat = aio_write(fd->fd, cb_fout + aio_i);
379 # endif
380 # else
381 stat = aio_write(cb_fout+aio_i);
382 # endif
383 aio_req[aio_i] = *req_id;
384 }
385
386 #else
387 /* call blocking write when AIO not available */
388 SYNC_EMULATE(write);
389 #endif
390
391 if(stat ==-1) ELIO_ERROR(AWRITFAIL, 0);
392
393 #ifdef PABLO
394 PABLO_end(pablo_code);
395 #endif
396
397 return((int)stat);
398 }
399
400
401 /*\ Truncate the file at the specified length.
402 \*/
elio_truncate(Fd_t fd,Off_t dlength)403 int elio_truncate(Fd_t fd, Off_t dlength)
404 {
405 off_t length = (off_t) dlength;
406 #ifdef WIN32
407 # define ftruncate _chsize
408 #endif
409
410 #ifdef PABLO
411 int pablo_code = PABLO_elio_truncate;
412 PABLO_start( pablo_code );
413 #endif
414 if(dlength >= elio_max_file_size(fd)){
415 Fd_t next_fd = elio_get_next_extent(fd);
416 dlength -= elio_max_file_size(fd);
417 # if defined(DEBUG)
418 printf(stderr," calling ftruncate with length = %f \n", dlength);
419 #endif
420 return elio_truncate(next_fd, dlength);
421 }
422 (void) SEEK(fd->fd, 0L, SEEK_SET);
423 if (ftruncate(fd->fd, length))
424 return TRUNFAIL;
425 else {
426 return ELIO_OK;
427 }
428 #ifdef PABLO
429 PABLO_end(pablo_code);
430 #endif
431 }
432
433
434 /*\ Return in length the length of the file
435 \*/
elio_length(Fd_t fd,Off_t * dlength)436 int elio_length(Fd_t fd, Off_t *dlength)
437 {
438 off_t length;
439 int status;
440
441 /* Add up the lengths of any extents */
442 if (fd->next) {
443 status = elio_length((Fd_t) fd->next, dlength);
444 *dlength += elio_max_file_size(fd);
445 return status;
446 }
447 else {
448 #ifdef PABLO
449 int pablo_code = PABLO_elio_length;
450 PABLO_start( pablo_code );
451 #endif
452
453 if ((length = SEEK(fd->fd, (off_t) 0, SEEK_END)) != -1)
454 status = ELIO_OK;
455 else
456 status = SEEKFAIL;
457
458 #ifdef PABLO
459 PABLO_end(pablo_code);
460 #endif
461
462 *dlength = (Off_t) length;
463 return status;
464 }
465 }
466
467
468 /*\ Blocking Read
469 * - returns number of bytes read or error code (<0) if failed
470 \*/
elio_read(Fd_t fd,Off_t doffset,void * buf,Size_t bytes)471 Size_t elio_read(Fd_t fd, Off_t doffset, void* buf, Size_t bytes)
472 {
473 off_t offset;
474 Size_t stat, bytes_to_read = bytes;
475 Size_t nextbytes;
476 int attempt=0;
477
478 if (doffset >= ABSURDLY_LARGE)
479 ELIO_ERROR(SEEKFAIL,0);
480
481 /* Follow the linked list of extents down until we hit the file
482 that contains the offset */
483 if (doffset >= elio_max_file_size(fd)) {
484 Fd_t next_fd = elio_get_next_extent(fd);
485 if (!next_fd) ELIO_ERROR(OPENFAIL,0);
486 doffset -= elio_max_file_size(fd);
487 return elio_read(next_fd, doffset, buf, bytes);
488 }
489
490 /* Figure out if the read continues onto the next extent */
491 offset = (off_t) doffset;
492 nextbytes = 0;
493 if ((doffset+bytes_to_read) >= elio_max_file_size(fd)) {
494 nextbytes = bytes_to_read;
495 bytes_to_read = (Size_t) (elio_max_file_size(fd)-doffset);
496 nextbytes -= bytes_to_read;
497 }
498
499
500 /* Read from this physical file */
501
502 #ifdef PABLO
503 int pablo_code = PABLO_elio_read;
504 PABLO_start( pablo_code );
505 #endif
506
507 if(offset != SEEK(fd->fd,offset,SEEK_SET)) ELIO_ERROR(SEEKFAIL,0);
508
509 while (bytes_to_read) {
510 stat = READ(fd->fd, buf, bytes_to_read);
511 if(stat==0){
512 ELIO_ERROR(EOFFAIL, stat);
513 } else if ((stat == -1) && ((errno == EINTR) || (errno == EAGAIN))) {
514 ; /* interrupted read should be restarted */
515 } else if (stat > 0) {
516 bytes_to_read -= stat;
517 buf = stat + (char*)buf; /*advance pointer by # bytes read*/
518 } else {
519 ELIO_ERROR(READFAIL, stat);
520 }
521 attempt++;
522 }
523
524 /* Only get here if all went OK */
525
526 #ifdef PABLO
527 PABLO_end(pablo_code);
528 #endif
529
530 /* Read from next extent(s) ... relies on incrementing of buf */
531 if (nextbytes) {
532 Fd_t next_fd = elio_get_next_extent(fd);
533 if (!next_fd) ELIO_ERROR(OPENFAIL,0);
534 stat = elio_read(next_fd, (Off_t) 0, buf, nextbytes);
535 if (stat != nextbytes)
536 ELIO_ERROR(READFAIL, stat);
537 }
538
539
540 return bytes;
541 }
542
543
544
545 /*\ Asynchronous Read: returns 0 if succeded or -1 if failed
546 \*/
elio_aread(Fd_t fd,Off_t doffset,void * buf,Size_t bytes,io_request_t * req_id)547 int elio_aread(Fd_t fd, Off_t doffset, void* buf, Size_t bytes, io_request_t * req_id)
548 {
549 off_t offset = (off_t) doffset;
550 Size_t stat;
551 #ifdef AIO
552 int aio_i;
553 #endif
554 #ifdef CRAY
555 int rc;
556 #endif
557
558 if (doffset >= ABSURDLY_LARGE)
559 ELIO_ERROR(SEEKFAIL,0);
560
561 /* Follow the linked list of extents down until we hit the file
562 that contains the offset */
563 if (doffset >= elio_max_file_size(fd)) {
564 Fd_t next_fd = elio_get_next_extent(fd);
565 if (!next_fd) ELIO_ERROR(OPENFAIL,0);
566 doffset -= elio_max_file_size(fd);
567 return elio_aread(next_fd, doffset, buf, bytes, req_id);
568 }
569
570 /* Figure out if the read continues onto the next extent
571 * ... if so then force the entire request to be done synchronously
572 * so that we don't have to manage multiple async requests */
573
574 if ((doffset+((Off_t) bytes)) >= elio_max_file_size(fd)) {
575 *req_id = ELIO_DONE;
576 if (elio_read(fd, doffset, buf, bytes) != bytes)
577 return -1;
578 else
579 return 0;
580 }
581
582 offset = (off_t) doffset;
583
584 #ifdef PABLO
585 int pablo_code = PABLO_elio_aread;
586 PABLO_start( pablo_code );
587 #endif
588
589 *req_id = ELIO_DONE;
590
591 #ifdef AIO
592 AIO_LOOKUP(aio_i);
593
594 /* blocking io when request table is full */
595 if(aio_i >= MAX_AIO_REQ){
596 # if defined(DEBUG)
597 fprintf(stderr, "elio_read: Warning- asynch overflow\n");
598 # endif
599 SYNC_EMULATE(read);
600
601 } else {
602
603 *req_id = (io_request_t) aio_i;
604 if((stat=elio_set_cb(fd, offset, aio_i, (void*) buf, bytes)))
605 ELIO_ERROR((int)stat,0);
606 # if defined(CRAY)
607 rc = READA(fd->fd, buf, bytes, &cb_fout[aio_i].stat, DEFARG);
608 stat = (rc < 0)? -1 : 0;
609 # elif defined(AIX)
610 #if !defined(AIX52) && !defined(_AIO_AIX_SOURCE)
611 stat = aio_read(fd->fd, cb_fout+aio_i);
612 #endif
613 # else
614 stat = aio_read(cb_fout+aio_i);
615 # endif
616 aio_req[aio_i] = *req_id;
617 }
618 #else
619
620 /* call blocking write when AIO not available */
621 SYNC_EMULATE(read);
622
623 #endif
624
625 if(stat ==-1) ELIO_ERROR(AWRITFAIL, 0);
626
627 #ifdef PABLO
628 PABLO_end(pablo_code);
629 #endif
630
631 return((int)stat);
632 }
633
634
635 /*\ Wait for asynchronous I/O operation to complete. Invalidate id.
636 \*/
elio_wait(io_request_t * req_id)637 int elio_wait(io_request_t *req_id)
638 {
639 int aio_i=0;
640 int rc;
641
642 rc=0; /* just to remove the compiler warning */
643 #ifdef PABLO
644 int pablo_code = PABLO_elio_wait;
645 PABLO_start( pablo_code );
646 #endif
647
648 if(*req_id != ELIO_DONE ) {
649
650 # ifdef AIO
651 # if defined(CRAY)
652
653 # if defined(FFIO)
654 {
655 struct ffsw dumstat, *prdstat=&(cb_fout[*req_id].stat);
656 fffcntl(cb_fout[*req_id].filedes, FC_RECALL, prdstat, &dumstat);
657 if (FFSTAT(*prdstat) == FFERR) ELIO_ERROR(SUSPFAIL,0);
658 }
659 # else
660 {
661 struct iosw *statlist[1];
662 statlist[0] = &(cb_fout[*req_id].stat);
663 recall(cb_fout[*req_id].filedes, 1, statlist);
664 }
665 # endif
666
667 # elif defined(AIX)
668 # if !defined(AIX52) && !defined(_AIO_AIX_SOURCE)
669 do { /* I/O can be interrupted on SP through rcvncall ! */
670 rc =(int)aio_suspend(1, cb_fout_arr+(int)*req_id);
671 } while(rc == -1 && errno == EINTR);
672 # endif
673
674 # else
675 if((int)aio_suspend((const struct aiocb *const*)(cb_fout_arr+(int)*req_id), 1, NULL) != 0) rc =-1;
676 # endif
677 if(rc ==-1) ELIO_ERROR(SUSPFAIL,0);
678
679 # if defined(DECOSF)
680 /* on DEC aio_return is required to clean internal data structures */
681 if(aio_return(cb_fout+(int)*req_id) == -1) ELIO_ERROR(RETUFAIL,0);
682 # endif
683 #endif
684
685 while(aio_req[aio_i] != *req_id && aio_i < MAX_AIO_REQ) aio_i++;
686 if(aio_i >= MAX_AIO_REQ) ELIO_ERROR(HANDFAIL, aio_i);
687
688 aio_req[aio_i] = NULL_AIO;
689 *req_id = ELIO_DONE;
690 }
691
692 #ifdef PABLO
693 PABLO_end(pablo_code);
694 #endif
695
696 return ELIO_OK;
697 }
698
699
700
701 /*\ Check if asynchronous I/O operation completed. If yes, invalidate id.
702 \*/
elio_probe(io_request_t * req_id,int * status)703 int elio_probe(io_request_t *req_id, int* status)
704 {
705 int errval=-1;
706 int aio_i = 0;
707
708 #ifdef PABLO
709 int pablo_code = PABLO_elio_probe;
710 PABLO_start( pablo_code );
711 #endif
712
713 if(*req_id == ELIO_DONE){
714 *status = ELIO_DONE;
715 } else {
716
717 #ifdef AIO
718 # if defined(CRAY)
719
720 # if defined(FFIO)
721 {
722 struct ffsw dumstat, *prdstat=&(cb_fout[*req_id].stat);
723 fffcntl(cb_fout[*req_id].filedes, FC_ASPOLL, prdstat, &dumstat);
724 errval = (FFSTAT(*prdstat) == 0) ? INPROGRESS: 0;
725 }
726 # else
727
728 errval = ( IO_DONE(cb_fout[*req_id].stat) == 0)? INPROGRESS: 0;
729
730 # endif
731
732 # elif defined(AIX)
733 errval = aio_error(cb_fout[(int)*req_id].aio_handle);
734 # else
735 errval = aio_error(cb_fout+(int)*req_id);
736 # endif
737 #endif
738 switch (errval) {
739 case 0:
740 while(aio_req[aio_i] != *req_id && aio_i < MAX_AIO_REQ) aio_i++;
741 if(aio_i >= MAX_AIO_REQ) ELIO_ERROR(HANDFAIL, aio_i);
742
743 *req_id = ELIO_DONE;
744 *status = ELIO_DONE;
745 aio_req[aio_i] = NULL_AIO;
746 break;
747 case INPROGRESS:
748 *status = ELIO_PENDING;
749 break;
750 default:
751 return PROBFAIL;
752 }
753 }
754
755 #ifdef PABLO
756 PABLO_end(pablo_code);
757 #endif
758
759 return ELIO_OK;
760 }
761
762
763 #if defined(CRAY) && defined(FFIO)
cray_part_info(char * dirname,long * pparts,long * sparts)764 static int cray_part_info(char *dirname,long *pparts,long *sparts)
765 {
766 struct statfs stats;
767 long temp,count=0;
768
769 if(statfs(dirname, &stats, sizeof(struct statfs), 0) == -1) return -1;
770
771 temp = stats.f_priparts;
772 while(temp != 0){
773 count++;
774 temp <<= 1;
775 }
776 *pparts = count;
777
778 if(stats.f_secparts != 0){
779
780 temp = (stats.f_secparts << count);
781 count = 0;
782 while(temp != 0){
783 count++;
784 temp <<= 1;
785 }
786 *sparts = count;
787 }
788 return ELIO_OK;
789
790 }
791
792 #endif
793
794
795 /*\ Noncollective File Open
796 \*/
elio_open(const char * fname,int type,int mode)797 Fd_t elio_open(const char* fname, int type, int mode)
798 {
799 Fd_t fd=NULL;
800 stat_t statinfo;
801 int ptype=0, rc;
802 char dirname[ELIO_FILENAME_MAX];
803
804 /*
805 Create a file for writing to in lustre with
806 a specified pagesize and stripe.
807 pagesize = 1048576;
808 lustre_stripe_count = 32;
809 are good choices.
810 */
811 #ifdef USE_LUSTRE
812 struct lov_mds_md stripecfg;
813 int lustre_file;
814 int lustre_stripe_count;
815 int pagesize;
816 pagesize = 1048576;
817 lustre_stripe_count = 32;
818 #endif
819
820 #ifdef PABLO
821 int pablo_code = PABLO_elio_open;
822 PABLO_start( pablo_code );
823 #endif
824
825 if(first_elio_init) elio_init();
826
827 switch(type){
828 case ELIO_W: ptype = O_CREAT | O_TRUNC | O_WRONLY;
829 break;
830 case ELIO_R: ptype = O_RDONLY;
831 break;
832 case ELIO_RW: ptype = O_CREAT | O_RDWR;
833 break;
834 default:
835 ELIO_ERROR_NULL(MODEFAIL, type);
836 }
837
838 #if defined(WIN32)
839 ptype |= O_BINARY;
840 #endif
841
842 if((fd = (Fd_t ) malloc(sizeof(fd_struct)) ) == NULL)
843 ELIO_ERROR_NULL(ALOCFAIL, 0);
844
845 if( (rc = elio_dirname(fname, dirname, ELIO_FILENAME_MAX)) != ELIO_OK) {
846 free(fd);
847 ELIO_ERROR_NULL(rc, 0);
848 }
849
850 if( (rc = elio_stat(dirname, &statinfo)) != ELIO_OK) {
851 free(fd);
852 ELIO_ERROR_NULL(rc, 0);
853 }
854
855 fd->fs = statinfo.fs;
856 fd->mode = mode;
857 fd->type = type;
858 fd->extent = 0;
859 fd->next = NULL;
860
861 #ifdef USE_LUSTRE
862 lustre_file = (strncmp(fname,"/dtemp",6) == 0) && (access(fname, F_OK) != 0) && (ME() == 0);
863 if (lustre_file) {
864 ptype = ptype | O_LOV_DELAY_CREATE ;
865 }
866 #endif
867
868 #if defined(CRAY) && defined(FFIO)
869 {
870 struct ffsw ffstat;
871 long pparts, sparts, cbits, cblocks;
872 extern long _MPP_MY_PE;
873 char *ffio_str="cache:256"; /* intern I/O buffer/cache 256*4096 bytes */
874 /* JN: we do not want read-ahead write-behind*/
875
876 if(cray_part_info(dirname,&pparts,&sparts) != ELIO_OK){
877 free(fd);
878 ELIO_ERROR_NULL(STATFAIL, 0);
879 }
880
881 ptype |= ( O_BIG | O_PLACE | O_RAW );
882 cbits = (sparts != 0) ? 1 : 0;
883
884 if( sparts != 0) {
885
886 /* stripe is set so we only select secondary partitions with cbits */
887 if(mode == ELIO_SHARED){
888 cbits = ~((~0L)<<PARIO_MIN(32,sparts)); /* use all secondary partitions */
889 cblocks = 100;
890 }else{
891 cbits = 1 << (_MPP_MY_PE%sparts); /* round robin over s part */
892 }
893
894 cbits <<= pparts; /* move us out of the primary partitions */
895
896 }
897
898
899 /* printf ("parts=%d cbits = %X\n",sparts,cbits);*/
900
901 if(mode == ELIO_SHARED)
902 fd->fd = OPEN(fname, ptype, FOPEN_MODE, cbits, cblocks, &ffstat, NULL);
903 else
904 fd->fd = OPEN(fname, ptype, FOPEN_MODE, 0L , 0 , &ffstat, ffio_str);
905
906 }
907 #else
908 fd->fd = OPEN(fname, ptype, FOPEN_MODE );
909 #endif
910
911 if( (int)fd->fd == -1) {
912 free(fd);
913 ELIO_ERROR_NULL(OPENFAIL, 0);
914 }
915
916 fd->name = strdup(fname);
917
918 #ifdef USE_LUSTRE
919 if (lustre_file) {
920 stripecfg.lmm_magic = LOV_MAGIC;
921 stripecfg.lmm_pattern = 0; /* Only available option for now. */
922 stripecfg.lmm_stripe_size = pagesize; /* Stripe size in bytes. */
923 stripecfg.lmm_stripe_count = lustre_stripe_count;
924 if (ioctl((int)fd->fd, LL_IOC_LOV_SETSTRIPE, &stripecfg) < 0) {
925 fprintf(stderr,
926 "fp_create_out_filefp: Error: unable to stripe %s file.\n"
927 "error was %s\n",
928 fname,strerror(errno));
929 fflush(stderr);
930 free(fd);
931 ELIO_ERROR_NULL(OPENFAIL, 0);
932 }
933 } /* end if (luster_file) (is in /dtemp) */
934 #endif
935
936 #ifdef PABLO
937 PABLO_end(pablo_code);
938 #endif
939
940 return(fd);
941 }
942
943 /*\ Close File
944 \*/
elio_close(Fd_t fd)945 int elio_close(Fd_t fd)
946 {
947 int status = ELIO_OK;
948 #ifdef PABLO
949 pablo_code = PABLO_elio_close;
950 PABLO_start( pablo_code );
951 #endif
952
953 if (fd->next)
954 status = elio_close((Fd_t) fd->next);
955
956 /*printf("Closing extent %d name %s\n", fd->extent, fd->name);*/
957 if(CLOSE(fd->fd)==-1 || (status != ELIO_OK))
958 ELIO_ERROR(CLOSFAIL, 0);
959
960 free(fd->name);
961 free(fd);
962
963 #ifdef PABLO
964 PABLO_end(pablo_code);
965 #endif
966 return ELIO_OK;
967 }
968
969
970
971 /*\ Close File
972 \*/
elio_fsync(Fd_t fd)973 int elio_fsync(Fd_t fd)
974 {
975 int status = ELIO_OK;
976
977 #ifdef ELIO_FSYNC
978 if (fd->next)
979 status = elio_fsync((Fd_t) fd->next);
980
981 /* printf("syncing extent %d name %s\n", fd->extent, fd->name); */
982 /* if(ELIO_FSYNC(fd->fd)==-1 || (status != ELIO_OK)) */
983 #ifndef WIN32
984 sync();
985 #endif
986 if(ELIO_FSYNC(fd->fd)==-1 )
987 ELIO_ERROR(FSYNCFAIL, 0);
988 #endif
989
990 return ELIO_OK;
991 }
992
993
994 /*\ Delete File
995 \*/
elio_delete(const char * filename)996 int elio_delete(const char* filename)
997 {
998 int rc;
999
1000 if (access(filename, F_OK) != 0) /* Succeed if the file does not exist */
1001 return ELIO_OK;
1002
1003 #ifdef PABLO
1004 int pablo_code = PABLO_elio_delete;
1005 PABLO_start( pablo_code );
1006 #endif
1007
1008 rc = unlink(filename);
1009
1010 /* Remeber the first rc ... now delete possible extents until
1011 one fails */
1012
1013 {
1014 int extent;
1015 for (extent=1; extent<MAX_EXTENT; extent++) {
1016 char fname[ELIO_FILENAME_MAX];
1017 sprintf(fname,"%sx%3.3d",filename,extent);
1018 /*printf("Deleting extent %d with name '%s'\n",extent,fname);*/
1019 if (unlink(fname)) break;
1020 }
1021 }
1022
1023 if(rc ==-1) ELIO_ERROR(DELFAIL,0);
1024
1025 #ifdef PABLO
1026 PABLO_end(pablo_code);
1027 #endif
1028 return(ELIO_OK);
1029 }
1030
1031
1032
1033 /*\ Initialize ELIO
1034 \*/
elio_init(void)1035 void elio_init(void)
1036 {
1037 if(first_elio_init) {
1038 # if defined(ASYNC)
1039 int i;
1040 for(i=0; i < MAX_AIO_REQ; i++)
1041 aio_req[i] = NULL_AIO;
1042 # endif
1043 first_elio_init = 0;
1044 }
1045 }
1046
1047
1048 /*\ Return Error String Associated with Given Error Code
1049 \*/
elio_errmsg(int code,char * msg)1050 void elio_errmsg(int code, char *msg)
1051 {
1052 if(code==ELIO_OK){
1053 (void) strcpy(msg, ">OK");
1054 return;
1055 }
1056 else if(code == ELIO_PENDING_ERR) code = elio_pending_error;
1057
1058 if(code<OFFSET || code >OFFSET+ERRLEN) *msg=(char)0;
1059 else (void) strcpy(msg, errtable[-OFFSET + code]);
1060 }
1061
1062
1063 int elio_pending_error=UNKNFAIL;
1064
1065 char *errtable[ERRLEN] ={
1066 ">Unable to Seek",
1067 ">Write Failed",
1068 ">Asynchronous Write Failed",
1069 ">Read Failed",
1070 ">Asynchronous Read Failed",
1071 ">Suspend Failed",
1072 ">I/O Request Handle not in Table",
1073 ">Incorrect File Mode",
1074 ">Unable to Determine Directory",
1075 ">Stat For Specified File or Directory Failed",
1076 ">Open Failed",
1077 ">Unable To Allocate Internal Data Structure",
1078 ">Unsupported Feature",
1079 ">Unlink Failed",
1080 ">Close Failed",
1081 ">Operation Interrupted Too Many Times",
1082 ">AIO Return Failed",
1083 ">Name String too Long",
1084 ">Unable to Determine Filesystem Type",
1085 ">Numeric Conversion Error",
1086 ">Incorrect Filesystem/Device Type",
1087 ">Error in Probe",
1088 ">Unable to Truncate",
1089 ">End of File",
1090 ">Fsync Failed",
1091 ""};
1092