1 #if HAVE_CONFIG_H
2 #   include "config.h"
3 #endif
4 
5 /** @file
6  **********************************************************************\
7  ELementary I/O (ELIO) disk operations for parallel I/O libraries
8  Authors: Jarek Nieplocha (PNNL) and Jace Mogill (ANL)
9  *
10  * DISCLAIMER
11  *
12  * This material was prepared as an account of work sponsored by an
13  * agency of the United States Government.  Neither the United States
14  * Government nor the United States Department of Energy, nor Battelle,
15  * nor any of their employees, MAKES ANY WARRANTY, EXPRESS OR IMPLIED, OR
16  * ASSUMES ANY LEGAL LIABILITY OR RESPONSIBILITY FOR THE ACCURACY,
17  * COMPLETENESS, OR USEFULNESS OF ANY INFORMATION, APPARATUS, PRODUCT,
18  * SOFTWARE, OR PROCESS DISCLOSED, OR REPRESENTS THAT ITS USE WOULD NOT
19  * INFRINGE PRIVATELY OWNED RIGHTS.
20  *
21  * ACKNOWLEDGMENT
22  *
23  * This software and its documentation were produced with United States
24  * Government support under Contract Number DE-AC06-76RLO-1830 awarded by
25  * the United States Department of Energy.  The United States Government
26  * retains a paid-up non-exclusive, irrevocable worldwide license to
27  * reproduce, prepare derivative works, perform publicly and display
28  * publicly by or for the US Government, including the right to
29  * distribute to other US Government contractors.
30  */
31 #ifdef USE_LUSTRE
32 #include <lustre/lustre_user.h> /* for O_LOV_DELAY_CREATE, LL_IOC_LOV_SETSTRIPE */
33 #include <linux/lustre_idl.h> /* for struct lov_mds_md, LOV_MAGIC */
34 #include <sys/ioctl.h> /* for ioctl */
35 #endif
36 
37 #include "eliop.h"
38 
39 #if defined(WIN32)
40 #undef ERROR
41 #endif
42 
43 #include "../sf/coms.h"
44 
45 #if defined(CRAY) && defined(__crayx1)
46 #undef CRAY
47 #endif
48 
49 #if  defined(AIX) || defined(DECOSF) || defined(SGI64) || defined(CRAY) || defined(LINUXAIO)
50      /* systems with Asynchronous I/O */
51 #else
52 #    ifndef NOAIO
53 #      define NOAIO
54 #    endif
55 #endif
56 
57 /****************** Internal Constants and Parameters **********************/
58 
59 #define  MAX_AIO_REQ  4
60 #define  NULL_AIO    -123456
61 #define  FOPEN_MODE 0644
62 #define  MAX_ATTEMPTS 10
63 
64 
65 #ifndef NOAIO
66 #   define AIO 1
67 #endif
68 
69 
70 #ifdef FFIO
71 #  define WRITE  ffwrite
72 #  define WRITEA ffwritea
73 #  define READ   ffread
74 #  define READA  ffreada
75 #  define CLOSE  ffclose
76 #  define SEEK   ffseek
77 #  define OPEN   ffopens
78 #  define DEFARG FULL
79 #else
80 #  define WRITE  write
81 #  define WRITEA writea
82 #  define READ   read
83 #  define READA  reada
84 #  define CLOSE  close
85 #  define SEEK   lseek
86 #  define OPEN   open
87 #  define DEFARG 0
88 #endif
89 
90 
91 #ifdef WIN32
92 #define ELIO_FSYNC _commit
93 #else
94 #include <unistd.h>
95 #define ELIO_FSYNC fsync
96 #endif
97 
98 /* structure to emulate control block in Posix AIO */
99 #if defined (CRAY)
100 #   if defined(FFIO)
101        typedef struct { struct ffsw stat; int filedes; }io_status_t;
102 #   else
103 #      include <sys/iosw.h>
104        typedef struct { struct iosw stat; int filedes; }io_status_t;
105 #   endif
106     io_status_t cb_fout[MAX_AIO_REQ];
107     io_status_t *cb_fout_arr[MAX_AIO_REQ];
108 
109 #elif defined(AIO)
110 #   include <aio.h>
111 #   if defined(AIX)
112 #      define INPROGRESS EINPROG
113 #   else
114 #      define INPROGRESS EINPROGRESS
115 #   endif
116     struct aiocb          cb_fout[MAX_AIO_REQ];
117 #ifndef AIX
118     const
119 #endif
120            struct aiocb   *cb_fout_arr[MAX_AIO_REQ];
121 #endif
122 
123 #ifndef INPROGRESS
124 #   define INPROGRESS 1
125 #endif
126 
127 static long           aio_req[MAX_AIO_REQ]; /* array for AIO requests */
128 static int            first_elio_init = 1;  /* intialization status */
129 int                   _elio_Errors_Fatal=0; /* sets mode of handling errors */
130 
131 
132 /****************************** Internal Macros *****************************/
133 #if defined(AIO)
134 #  define AIO_LOOKUP(aio_i) {\
135       aio_i = 0;\
136       while(aio_req[aio_i] != NULL_AIO && aio_i < MAX_AIO_REQ) aio_i++;\
137 }
138 #else
139 #  define AIO_LOOKUP(aio_i) aio_i = MAX_AIO_REQ
140 #endif
141 
142 #define SYNC_EMULATE(op) *req_id = ELIO_DONE; \
143   if((stat= elio_ ## op (fd, offset, buf, bytes)) != bytes ){ \
144        ELIO_ERROR(stat,0);  \
145   }else \
146        stat    = 0;
147 
148 #ifndef MIN
149 #define PARIO_MIN(a,b) (((a) <= (b)) ? (a) : (b))
150 #endif
151 
152 /*
153  * Offsets bigger than ABSURDLY_LARGE generate a SEEKFAIL.
154  * The maximum no. of extents permitted for a file is MAX_EXTENT.
155  */
156 
157 #if defined(_LARGE_FILES) || defined(_LARGEFILE_SOURCE) || defined(_LARGEFILE64_SOURCE) || _FILE_OFFSET_BITS+0 == 64 || SIZEOF_VOIDP == 8
158 #   define LARGE_FILES
159 #endif
160 
161 #define MAX_EXTENT 127
162 #ifdef LARGE_FILES
163 #define ABSURDLY_LARGE 1e14
164 #else
165 #define ABSURDLY_LARGE (MAX_EXTENT*2147483648.0)
166 #endif
167 
168 /*****************************************************************************/
169 
elio_max_file_size(Fd_t fd)170 static Off_t elio_max_file_size(Fd_t fd)
171      /*
172       * Return the maximum size permitted for this PHYSICAL file.
173       * Presently not file dependent.
174       */
175 {
176 #ifdef LARGE_FILES
177   return ABSURDLY_LARGE;
178 #else
179   return (2047.0*1024.0*1024.0);        /* 2 GB - 1 MB */
180 #endif
181 }
182 
elio_get_next_extent(Fd_t fd)183 static Fd_t elio_get_next_extent(Fd_t fd)
184      /*
185       * Return a pointer to the file descriptor that forms
186       * the next extent of this file.  If the extension file
187       * does not exist then it is opened.  If the open fails
188       * then the usual error condition of elio_open is returned.
189       */
190 {
191   Fd_t next_fd = (Fd_t) fd->next;
192   if (!next_fd) {
193     /* Eventually need to replace this with user controllable naming
194      * and combine with similar logic in delete routine.
195      */
196     char fname[ELIO_FILENAME_MAX];
197     int len;
198     if (fd->extent >= MAX_EXTENT)
199       return 0;
200     strcpy(fname, fd->name);
201     len = strlen(fname);
202     if (fd->extent) len -= 4;
203     sprintf(fname+len,"x%3.3d",fd->extent+1);
204     /*printf("Opening extent %d with name '%s'\n",fd->extent+1,fname);*/
205     if ((next_fd = elio_open(fname, fd->type, fd->mode))) {
206       next_fd->extent = fd->extent + 1;
207       fd->next = (struct fd_struct *) next_fd;
208     }
209   }
210   return next_fd;
211 }
212 
elio_errors_fatal(int onoff)213 void elio_errors_fatal(int onoff)
214 {
215     _elio_Errors_Fatal = onoff;
216 }
217 
218 
219 /*\ Blocking Write
220  *    - returns number of bytes written or error code (<0) if failed
221 \*/
elio_write(Fd_t fd,Off_t doffset,const void * buf,Size_t bytes)222 Size_t elio_write(Fd_t fd, Off_t  doffset, const void* buf, Size_t bytes)
223 {
224   off_t offset;
225   Size_t stat, bytes_to_write = bytes;
226   Size_t nextbytes;
227 
228   if (doffset >= ABSURDLY_LARGE)
229     ELIO_ERROR(SEEKFAIL,0);
230 
231   /* Follow the linked list of extents down until we hit the file
232      that contains the offset */
233   if (doffset >= elio_max_file_size(fd)) {
234     Fd_t next_fd = elio_get_next_extent(fd);
235     if (!next_fd) ELIO_ERROR(OPENFAIL,0);
236     doffset -= elio_max_file_size(fd);
237     return elio_write(next_fd, doffset, buf, bytes);
238   }
239 
240   /* Figure out if the write continues onto the next extent */
241   offset = (off_t) doffset;
242   nextbytes = 0;
243   if ((doffset+bytes_to_write) >= elio_max_file_size(fd)) {
244     nextbytes = bytes_to_write;
245     bytes_to_write = (Size_t) (elio_max_file_size(fd)-doffset);
246     nextbytes -= bytes_to_write;
247   }
248   /*printf("TRYING TO WRITE AT doffset=%f offset=%lu bw=%lu nb=%lu\n", doffset, offset,
249     bytes_to_write, nextbytes);*/
250 
251   /* Write to this extent */
252 
253 #ifdef PABLO
254   int pablo_code = PABLO_elio_write;
255   PABLO_start( pablo_code );
256 #endif
257 
258   if(offset != SEEK(fd->fd,offset,SEEK_SET)) ELIO_ERROR(SEEKFAIL,0);
259 
260   while (bytes_to_write) {
261     stat = WRITE(fd->fd, buf, bytes_to_write);
262     if ((stat == -1) && ((errno == EINTR) || (errno == EAGAIN))) {
263       ; /* interrupted write should be restarted */
264     } else if (stat > 0) {
265       bytes_to_write -= stat;
266       buf = stat + (char*)buf; /*advance pointer by # bytes written*/
267     } else {
268       ELIO_ERROR(WRITFAIL, stat);
269     }
270   }
271 
272   /* Only get here if all has gone OK */
273 
274 #ifdef PABLO
275   PABLO_end(pablo_code);
276 #endif
277 
278   /* Write to next extent(s) ... relies on incrementing of buf */
279   if (nextbytes) {
280     Fd_t next_fd = elio_get_next_extent(fd);
281     if (!next_fd) ELIO_ERROR(OPENFAIL,0);
282     stat = elio_write(next_fd, (Off_t) 0, buf, nextbytes);
283     if (stat != nextbytes)
284       ELIO_ERROR(WRITFAIL, stat);
285   }
286 
287   return bytes;
288 }
289 
elio_set_cb(Fd_t fd,Off_t doffset,int reqn,void * buf,Size_t bytes)290 int elio_set_cb(Fd_t fd, Off_t doffset, int reqn, void *buf, Size_t bytes)
291 {
292 #if defined(AIO)
293     off_t offset = (off_t) doffset;
294 #   if defined(CRAY)
295        if(offset != SEEK(fd->fd, offset, SEEK_SET))return (SEEKFAIL);
296        cb_fout_arr[reqn] = cb_fout+reqn;
297        cb_fout[reqn].filedes    = fd->fd;
298 #   else
299        cb_fout[reqn].aio_offset = offset;
300        cb_fout_arr[reqn] = cb_fout+reqn;
301          cb_fout[reqn].aio_buf    = buf;
302          cb_fout[reqn].aio_nbytes = bytes;
303 #        if defined(AIX)
304            cb_fout[reqn].aio_whence = SEEK_SET;
305 #        else
306            cb_fout[reqn].aio_sigevent.sigev_notify = SIGEV_NONE;
307            cb_fout[reqn].aio_fildes    = fd->fd;
308 #        endif
309 #   endif
310 #endif
311     return ELIO_OK;
312 }
313 
314 
315 /*\ Asynchronous Write: returns 0 if succeded or err code if failed
316 \*/
elio_awrite(Fd_t fd,Off_t doffset,const void * buf,Size_t bytes,io_request_t * req_id)317 int elio_awrite(Fd_t fd, Off_t doffset, const void* buf, Size_t bytes, io_request_t * req_id)
318 {
319   off_t offset;
320   Size_t stat;
321 #ifdef AIO
322   int    aio_i;
323 #endif
324 
325   if (doffset >= ABSURDLY_LARGE)
326     ELIO_ERROR(SEEKFAIL,0);
327 
328   /* Follow the linked list of extents down until we hit the file
329      that contains the offset */
330   if (doffset >= elio_max_file_size(fd)) {
331     Fd_t next_fd = elio_get_next_extent(fd);
332     if (!next_fd) ELIO_ERROR(OPENFAIL,0);
333     doffset -= elio_max_file_size(fd);
334     return elio_awrite(next_fd, doffset, buf, bytes, req_id);
335   }
336 
337   /* Figure out if the write continues onto the next extent
338    * ... if so then force the entire request to be done synchronously
339    * so that we don't have to manage multiple async requests */
340 
341   if ((doffset+((Off_t) bytes)) >= elio_max_file_size(fd)) {
342     *req_id = ELIO_DONE;
343     if (elio_write(fd, doffset, buf, bytes) != bytes)
344       return -1;
345     else
346       return 0;
347   }
348 
349   offset = (off_t) doffset;
350 
351 #ifdef PABLO
352   int pablo_code = PABLO_elio_awrite;
353   PABLO_start( pablo_code );
354 #endif
355 
356   *req_id = ELIO_DONE;
357 
358 #ifdef AIO
359    AIO_LOOKUP(aio_i);
360 
361    /* blocking io when request table is full */
362    if(aio_i >= MAX_AIO_REQ){
363 #     if defined(DEBUG) && defined(ASYNC)
364          fprintf(stderr, "elio_awrite: Warning- asynch overflow\n");
365 #     endif
366       SYNC_EMULATE(write);
367    } else {
368       int rc;
369       *req_id = (io_request_t) aio_i;
370       if((rc=elio_set_cb(fd, offset, aio_i, (void*) buf, bytes)))
371                                                  ELIO_ERROR(rc,0);
372 
373 #    if defined(CRAY)
374        rc = WRITEA(fd->fd, (char*)buf, bytes, &cb_fout[aio_i].stat, DEFARG);
375        stat = (rc < 0)? -1 : 0;
376 #    elif defined(AIX)
377 #       if !defined(AIX52) && !defined(_AIO_AIX_SOURCE)
378        stat = aio_write(fd->fd, cb_fout + aio_i);
379 #       endif
380 #    else
381        stat = aio_write(cb_fout+aio_i);
382 #    endif
383      aio_req[aio_i] = *req_id;
384   }
385 
386 #else
387       /* call blocking write when AIO not available */
388       SYNC_EMULATE(write);
389 #endif
390 
391   if(stat ==-1) ELIO_ERROR(AWRITFAIL, 0);
392 
393 #ifdef PABLO
394   PABLO_end(pablo_code);
395 #endif
396 
397   return((int)stat);
398 }
399 
400 
401 /*\ Truncate the file at the specified length.
402 \*/
elio_truncate(Fd_t fd,Off_t dlength)403 int elio_truncate(Fd_t fd, Off_t dlength)
404 {
405   off_t length = (off_t) dlength;
406 #ifdef WIN32
407 #   define ftruncate _chsize
408 #endif
409 
410 #ifdef PABLO
411     int pablo_code = PABLO_elio_truncate;
412     PABLO_start( pablo_code );
413 #endif
414     if(dlength >= elio_max_file_size(fd)){
415       Fd_t next_fd = elio_get_next_extent(fd);
416       dlength -= elio_max_file_size(fd);
417 #       if defined(DEBUG)
418       printf(stderr," calling ftruncate with length = %f \n", dlength);
419 #endif
420       return elio_truncate(next_fd, dlength);
421     }
422     (void) SEEK(fd->fd, 0L, SEEK_SET);
423     if (ftruncate(fd->fd, length))
424     return TRUNFAIL;
425     else {
426     return ELIO_OK;
427     }
428 #ifdef PABLO
429     PABLO_end(pablo_code);
430 #endif
431 }
432 
433 
434 /*\ Return in length the length of the file
435 \*/
elio_length(Fd_t fd,Off_t * dlength)436 int elio_length(Fd_t fd, Off_t *dlength)
437 {
438   off_t length;
439   int status;
440 
441   /* Add up the lengths of any extents */
442   if (fd->next) {
443     status = elio_length((Fd_t) fd->next, dlength);
444     *dlength += elio_max_file_size(fd);
445     return status;
446   }
447   else {
448 #ifdef PABLO
449     int pablo_code = PABLO_elio_length;
450     PABLO_start( pablo_code );
451 #endif
452 
453     if ((length = SEEK(fd->fd, (off_t) 0, SEEK_END)) != -1)
454       status = ELIO_OK;
455     else
456       status = SEEKFAIL;
457 
458 #ifdef PABLO
459     PABLO_end(pablo_code);
460 #endif
461 
462     *dlength = (Off_t) length;
463     return status;
464   }
465 }
466 
467 
468 /*\ Blocking Read
469  *      - returns number of bytes read or error code (<0) if failed
470 \*/
elio_read(Fd_t fd,Off_t doffset,void * buf,Size_t bytes)471 Size_t elio_read(Fd_t fd, Off_t doffset, void* buf, Size_t bytes)
472 {
473 off_t offset;
474 Size_t stat, bytes_to_read = bytes;
475 Size_t nextbytes;
476 int    attempt=0;
477 
478  if (doffset >= ABSURDLY_LARGE)
479     ELIO_ERROR(SEEKFAIL,0);
480 
481   /* Follow the linked list of extents down until we hit the file
482      that contains the offset */
483   if (doffset >= elio_max_file_size(fd)) {
484     Fd_t next_fd = elio_get_next_extent(fd);
485     if (!next_fd) ELIO_ERROR(OPENFAIL,0);
486     doffset -= elio_max_file_size(fd);
487     return elio_read(next_fd, doffset, buf, bytes);
488   }
489 
490   /* Figure out if the read continues onto the next extent */
491   offset = (off_t) doffset;
492   nextbytes = 0;
493   if ((doffset+bytes_to_read) >= elio_max_file_size(fd)) {
494     nextbytes = bytes_to_read;
495     bytes_to_read = (Size_t) (elio_max_file_size(fd)-doffset);
496     nextbytes -= bytes_to_read;
497   }
498 
499 
500   /* Read from this physical file */
501 
502 #ifdef PABLO
503   int pablo_code = PABLO_elio_read;
504   PABLO_start( pablo_code );
505 #endif
506 
507   if(offset != SEEK(fd->fd,offset,SEEK_SET)) ELIO_ERROR(SEEKFAIL,0);
508 
509   while (bytes_to_read) {
510     stat = READ(fd->fd, buf, bytes_to_read);
511     if(stat==0){
512       ELIO_ERROR(EOFFAIL, stat);
513     } else if ((stat == -1) && ((errno == EINTR) || (errno == EAGAIN))) {
514       ; /* interrupted read should be restarted */
515     } else if (stat > 0) {
516       bytes_to_read -= stat;
517       buf = stat + (char*)buf; /*advance pointer by # bytes read*/
518     } else {
519       ELIO_ERROR(READFAIL, stat);
520     }
521     attempt++;
522   }
523 
524   /* Only get here if all went OK */
525 
526 #ifdef PABLO
527   PABLO_end(pablo_code);
528 #endif
529 
530   /* Read from next extent(s) ... relies on incrementing of buf */
531   if (nextbytes) {
532     Fd_t next_fd = elio_get_next_extent(fd);
533     if (!next_fd) ELIO_ERROR(OPENFAIL,0);
534     stat = elio_read(next_fd, (Off_t) 0, buf, nextbytes);
535     if (stat != nextbytes)
536       ELIO_ERROR(READFAIL, stat);
537   }
538 
539 
540   return bytes;
541 }
542 
543 
544 
545 /*\ Asynchronous Read: returns 0 if succeded or -1 if failed
546 \*/
elio_aread(Fd_t fd,Off_t doffset,void * buf,Size_t bytes,io_request_t * req_id)547 int elio_aread(Fd_t fd, Off_t doffset, void* buf, Size_t bytes, io_request_t * req_id)
548 {
549   off_t offset = (off_t) doffset;
550   Size_t stat;
551 #ifdef AIO
552   int    aio_i;
553 #endif
554 #ifdef CRAY
555   int rc;
556 #endif
557 
558   if (doffset >= ABSURDLY_LARGE)
559     ELIO_ERROR(SEEKFAIL,0);
560 
561   /* Follow the linked list of extents down until we hit the file
562      that contains the offset */
563   if (doffset >= elio_max_file_size(fd)) {
564     Fd_t next_fd = elio_get_next_extent(fd);
565     if (!next_fd) ELIO_ERROR(OPENFAIL,0);
566     doffset -= elio_max_file_size(fd);
567     return elio_aread(next_fd, doffset, buf, bytes, req_id);
568   }
569 
570   /* Figure out if the read continues onto the next extent
571    * ... if so then force the entire request to be done synchronously
572    * so that we don't have to manage multiple async requests */
573 
574   if ((doffset+((Off_t) bytes)) >= elio_max_file_size(fd)) {
575     *req_id = ELIO_DONE;
576     if (elio_read(fd, doffset, buf, bytes) != bytes)
577       return -1;
578     else
579       return 0;
580   }
581 
582   offset = (off_t) doffset;
583 
584 #ifdef PABLO
585   int pablo_code = PABLO_elio_aread;
586   PABLO_start( pablo_code );
587 #endif
588 
589   *req_id = ELIO_DONE;
590 
591 #ifdef AIO
592     AIO_LOOKUP(aio_i);
593 
594     /* blocking io when request table is full */
595     if(aio_i >= MAX_AIO_REQ){
596 #       if defined(DEBUG)
597            fprintf(stderr, "elio_read: Warning- asynch overflow\n");
598 #       endif
599         SYNC_EMULATE(read);
600 
601     } else {
602 
603        *req_id = (io_request_t) aio_i;
604         if((stat=elio_set_cb(fd, offset, aio_i, (void*) buf, bytes)))
605                                                  ELIO_ERROR((int)stat,0);
606 #       if defined(CRAY)
607           rc = READA(fd->fd, buf, bytes, &cb_fout[aio_i].stat, DEFARG);
608           stat = (rc < 0)? -1 : 0;
609 #       elif defined(AIX)
610 #if    !defined(AIX52) && !defined(_AIO_AIX_SOURCE)
611           stat = aio_read(fd->fd, cb_fout+aio_i);
612 #endif
613 #       else
614           stat = aio_read(cb_fout+aio_i);
615 #       endif
616         aio_req[aio_i] = *req_id;
617     }
618 #else
619 
620     /* call blocking write when AIO not available */
621     SYNC_EMULATE(read);
622 
623 #endif
624 
625     if(stat ==-1) ELIO_ERROR(AWRITFAIL, 0);
626 
627 #ifdef PABLO
628     PABLO_end(pablo_code);
629 #endif
630 
631     return((int)stat);
632 }
633 
634 
635 /*\ Wait for asynchronous I/O operation to complete. Invalidate id.
636 \*/
elio_wait(io_request_t * req_id)637 int elio_wait(io_request_t *req_id)
638 {
639   int  aio_i=0;
640   int  rc;
641 
642   rc=0; /* just to remove the compiler warning */
643 #ifdef PABLO
644   int pablo_code = PABLO_elio_wait;
645   PABLO_start( pablo_code );
646 #endif
647 
648   if(*req_id != ELIO_DONE ) {
649 
650 #    ifdef AIO
651 #      if defined(CRAY)
652 
653 #        if defined(FFIO)
654          {
655             struct ffsw dumstat, *prdstat=&(cb_fout[*req_id].stat);
656             fffcntl(cb_fout[*req_id].filedes, FC_RECALL, prdstat, &dumstat);
657             if (FFSTAT(*prdstat) == FFERR) ELIO_ERROR(SUSPFAIL,0);
658          }
659 #        else
660          {
661             struct iosw *statlist[1];
662             statlist[0] = &(cb_fout[*req_id].stat);
663             recall(cb_fout[*req_id].filedes, 1, statlist);
664          }
665 #        endif
666 
667 #      elif defined(AIX)
668 #         if    !defined(AIX52) && !defined(_AIO_AIX_SOURCE)
669               do {    /* I/O can be interrupted on SP through rcvncall ! */
670                    rc =(int)aio_suspend(1, cb_fout_arr+(int)*req_id);
671               } while(rc == -1 && errno == EINTR);
672 #         endif
673 
674 #  else
675       if((int)aio_suspend((const struct aiocb *const*)(cb_fout_arr+(int)*req_id), 1, NULL) != 0) rc =-1;
676 #  endif
677       if(rc ==-1) ELIO_ERROR(SUSPFAIL,0);
678 
679 #  if defined(DECOSF)
680       /* on DEC aio_return is required to clean internal data structures */
681       if(aio_return(cb_fout+(int)*req_id) == -1) ELIO_ERROR(RETUFAIL,0);
682 #  endif
683 #endif
684 
685       while(aio_req[aio_i] != *req_id && aio_i < MAX_AIO_REQ) aio_i++;
686       if(aio_i >= MAX_AIO_REQ) ELIO_ERROR(HANDFAIL, aio_i);
687 
688       aio_req[aio_i] = NULL_AIO;
689       *req_id = ELIO_DONE;
690    }
691 
692 #ifdef PABLO
693    PABLO_end(pablo_code);
694 #endif
695 
696    return ELIO_OK;
697 }
698 
699 
700 
701 /*\ Check if asynchronous I/O operation completed. If yes, invalidate id.
702 \*/
elio_probe(io_request_t * req_id,int * status)703 int elio_probe(io_request_t *req_id, int* status)
704 {
705   int    errval=-1;
706   int    aio_i = 0;
707 
708 #ifdef PABLO
709   int pablo_code = PABLO_elio_probe;
710   PABLO_start( pablo_code );
711 #endif
712 
713   if(*req_id == ELIO_DONE){
714       *status = ELIO_DONE;
715   } else {
716 
717 #ifdef AIO
718 #    if defined(CRAY)
719 
720 #     if defined(FFIO)
721       {
722          struct ffsw dumstat, *prdstat=&(cb_fout[*req_id].stat);
723          fffcntl(cb_fout[*req_id].filedes, FC_ASPOLL, prdstat, &dumstat);
724          errval = (FFSTAT(*prdstat) == 0) ? INPROGRESS: 0;
725       }
726 #     else
727 
728          errval = ( IO_DONE(cb_fout[*req_id].stat) == 0)? INPROGRESS: 0;
729 
730 #     endif
731 
732 #   elif defined(AIX)
733       errval = aio_error(cb_fout[(int)*req_id].aio_handle);
734 #   else
735       errval = aio_error(cb_fout+(int)*req_id);
736 #   endif
737 #endif
738       switch (errval) {
739       case 0:
740           while(aio_req[aio_i] != *req_id && aio_i < MAX_AIO_REQ) aio_i++;
741           if(aio_i >= MAX_AIO_REQ) ELIO_ERROR(HANDFAIL, aio_i);
742 
743       *req_id = ELIO_DONE;
744       *status = ELIO_DONE;
745       aio_req[aio_i] = NULL_AIO;
746       break;
747       case INPROGRESS:
748       *status = ELIO_PENDING;
749       break;
750       default:
751           return PROBFAIL;
752       }
753   }
754 
755 #ifdef PABLO
756   PABLO_end(pablo_code);
757 #endif
758 
759   return ELIO_OK;
760 }
761 
762 
763 #if defined(CRAY) && defined(FFIO)
cray_part_info(char * dirname,long * pparts,long * sparts)764 static int cray_part_info(char *dirname,long *pparts,long *sparts)
765 {
766   struct statfs stats;
767   long temp,count=0;
768 
769   if(statfs(dirname, &stats, sizeof(struct statfs), 0) == -1) return -1;
770 
771   temp = stats.f_priparts;
772   while(temp != 0){
773       count++;
774       temp <<= 1;
775   }
776  *pparts = count;
777 
778  if(stats.f_secparts != 0){
779 
780     temp = (stats.f_secparts << count);
781     count = 0;
782     while(temp != 0){
783            count++;
784            temp <<= 1;
785     }
786     *sparts = count;
787  }
788  return ELIO_OK;
789 
790 }
791 
792 #endif
793 
794 
795 /*\ Noncollective File Open
796 \*/
elio_open(const char * fname,int type,int mode)797 Fd_t  elio_open(const char* fname, int type, int mode)
798 {
799   Fd_t fd=NULL;
800   stat_t statinfo;
801   int ptype=0, rc;
802   char dirname[ELIO_FILENAME_MAX];
803 
804   /*
805     Create a file for writing to in lustre with
806     a specified pagesize and stripe.
807     pagesize = 1048576;
808     lustre_stripe_count = 32;
809     are good choices.
810   */
811 #ifdef USE_LUSTRE
812   struct lov_mds_md stripecfg;
813   int    lustre_file;
814   int  lustre_stripe_count;
815   int  pagesize;
816   pagesize = 1048576;
817   lustre_stripe_count = 32;
818 #endif
819 
820 #ifdef PABLO
821   int pablo_code = PABLO_elio_open;
822   PABLO_start( pablo_code );
823 #endif
824 
825   if(first_elio_init) elio_init();
826 
827    switch(type){
828      case ELIO_W:  ptype = O_CREAT | O_TRUNC | O_WRONLY;
829                    break;
830      case ELIO_R:  ptype = O_RDONLY;
831                    break;
832      case ELIO_RW: ptype = O_CREAT | O_RDWR;
833                    break;
834      default:
835                    ELIO_ERROR_NULL(MODEFAIL, type);
836    }
837 
838 #if defined(WIN32)
839    ptype |= O_BINARY;
840 #endif
841 
842   if((fd = (Fd_t ) malloc(sizeof(fd_struct)) ) == NULL)
843                    ELIO_ERROR_NULL(ALOCFAIL, 0);
844 
845   if( (rc = elio_dirname(fname, dirname, ELIO_FILENAME_MAX)) != ELIO_OK) {
846                    free(fd);
847                    ELIO_ERROR_NULL(rc, 0);
848   }
849 
850   if( (rc = elio_stat(dirname, &statinfo)) != ELIO_OK) {
851                    free(fd);
852                    ELIO_ERROR_NULL(rc, 0);
853   }
854 
855   fd->fs = statinfo.fs;
856   fd->mode = mode;
857   fd->type = type;
858   fd->extent = 0;
859   fd->next = NULL;
860 
861 #ifdef USE_LUSTRE
862   lustre_file = (strncmp(fname,"/dtemp",6) == 0) && (access(fname, F_OK) != 0) && (ME() == 0);
863   if (lustre_file) {
864     ptype = ptype | O_LOV_DELAY_CREATE ;
865   }
866 #endif
867 
868 #if defined(CRAY) && defined(FFIO)
869   {
870     struct ffsw ffstat;
871     long pparts, sparts, cbits, cblocks;
872     extern long _MPP_MY_PE;
873     char *ffio_str="cache:256"; /*  intern I/O buffer/cache 256*4096 bytes */
874                                 /*  JN: we do not want read-ahead write-behind*/
875 
876     if(cray_part_info(dirname,&pparts,&sparts) != ELIO_OK){
877                    free(fd);
878                    ELIO_ERROR_NULL(STATFAIL, 0);
879     }
880 
881     ptype |= ( O_BIG | O_PLACE | O_RAW );
882     cbits = (sparts != 0) ? 1 : 0;
883 
884     if( sparts != 0) {
885 
886       /* stripe is set so we only select secondary partitions with cbits */
887       if(mode == ELIO_SHARED){
888          cbits = ~((~0L)<<PARIO_MIN(32,sparts)); /* use all secondary partitions */
889          cblocks = 100;
890       }else{
891          cbits = 1 << (_MPP_MY_PE%sparts);  /* round robin over s part */
892       }
893 
894       cbits <<= pparts;        /* move us out of the primary partitions */
895 
896      }
897 
898 
899 /*     printf ("parts=%d cbits = %X\n",sparts,cbits);*/
900 
901      if(mode == ELIO_SHARED)
902       fd->fd = OPEN(fname, ptype, FOPEN_MODE, cbits, cblocks, &ffstat, NULL);
903      else
904       fd->fd = OPEN(fname, ptype, FOPEN_MODE, 0L   , 0      , &ffstat, ffio_str);
905 
906   }
907 #else
908   fd->fd = OPEN(fname, ptype, FOPEN_MODE );
909 #endif
910 
911   if( (int)fd->fd == -1) {
912                    free(fd);
913                    ELIO_ERROR_NULL(OPENFAIL, 0);
914   }
915 
916   fd->name = strdup(fname);
917 
918 #ifdef USE_LUSTRE
919     if (lustre_file) {
920       stripecfg.lmm_magic = LOV_MAGIC;
921       stripecfg.lmm_pattern = 0; /* Only available option for now. */
922       stripecfg.lmm_stripe_size = pagesize; /* Stripe size in bytes. */
923       stripecfg.lmm_stripe_count  = lustre_stripe_count;
924       if (ioctl((int)fd->fd, LL_IOC_LOV_SETSTRIPE, &stripecfg) < 0) {
925         fprintf(stderr,
926               "fp_create_out_filefp: Error: unable to stripe %s file.\n"
927               "error was %s\n",
928               fname,strerror(errno));
929         fflush(stderr);
930         free(fd);
931         ELIO_ERROR_NULL(OPENFAIL, 0);
932       }
933     } /* end if (luster_file) (is in /dtemp) */
934 #endif
935 
936 #ifdef PABLO
937   PABLO_end(pablo_code);
938 #endif
939 
940   return(fd);
941 }
942 
943 /*\ Close File
944 \*/
elio_close(Fd_t fd)945 int elio_close(Fd_t fd)
946 {
947     int status = ELIO_OK;
948 #ifdef PABLO
949     pablo_code = PABLO_elio_close;
950     PABLO_start( pablo_code );
951 #endif
952 
953     if (fd->next)
954       status = elio_close((Fd_t) fd->next);
955 
956     /*printf("Closing extent %d name %s\n", fd->extent, fd->name);*/
957     if(CLOSE(fd->fd)==-1 || (status != ELIO_OK))
958       ELIO_ERROR(CLOSFAIL, 0);
959 
960     free(fd->name);
961     free(fd);
962 
963 #ifdef PABLO
964     PABLO_end(pablo_code);
965 #endif
966     return ELIO_OK;
967 }
968 
969 
970 
971 /*\ Close File
972 \*/
elio_fsync(Fd_t fd)973 int elio_fsync(Fd_t fd)
974 {
975     int status = ELIO_OK;
976 
977 #ifdef ELIO_FSYNC
978     if (fd->next)
979       status = elio_fsync((Fd_t) fd->next);
980 
981     /* printf("syncing extent %d name %s\n", fd->extent, fd->name); */
982     /*   if(ELIO_FSYNC(fd->fd)==-1 || (status != ELIO_OK)) */
983 #ifndef WIN32
984     sync();
985 #endif
986     if(ELIO_FSYNC(fd->fd)==-1 )
987       ELIO_ERROR(FSYNCFAIL, 0);
988 #endif
989 
990     return ELIO_OK;
991 }
992 
993 
994 /*\ Delete File
995 \*/
elio_delete(const char * filename)996 int elio_delete(const char* filename)
997 {
998     int rc;
999 
1000     if (access(filename, F_OK) != 0) /* Succeed if the file does not exist */
1001       return ELIO_OK;
1002 
1003 #ifdef PABLO
1004     int pablo_code = PABLO_elio_delete;
1005     PABLO_start( pablo_code );
1006 #endif
1007 
1008     rc = unlink(filename);
1009 
1010     /* Remeber the first rc ... now delete possible extents until
1011        one fails */
1012 
1013     {
1014       int extent;
1015       for (extent=1; extent<MAX_EXTENT; extent++) {
1016     char fname[ELIO_FILENAME_MAX];
1017     sprintf(fname,"%sx%3.3d",filename,extent);
1018     /*printf("Deleting extent %d with name '%s'\n",extent,fname);*/
1019     if (unlink(fname)) break;
1020       }
1021     }
1022 
1023     if(rc ==-1) ELIO_ERROR(DELFAIL,0);
1024 
1025 #ifdef PABLO
1026     PABLO_end(pablo_code);
1027 #endif
1028     return(ELIO_OK);
1029 }
1030 
1031 
1032 
1033 /*\ Initialize ELIO
1034 \*/
elio_init(void)1035 void elio_init(void)
1036 {
1037   if(first_elio_init) {
1038 #     if defined(ASYNC)
1039            int i;
1040            for(i=0; i < MAX_AIO_REQ; i++)
1041          aio_req[i] = NULL_AIO;
1042 #     endif
1043       first_elio_init = 0;
1044   }
1045 }
1046 
1047 
1048 /*\ Return Error String Associated with Given Error Code
1049 \*/
elio_errmsg(int code,char * msg)1050 void elio_errmsg(int code, char *msg)
1051 {
1052      if(code==ELIO_OK){
1053          (void) strcpy(msg, ">OK");
1054          return;
1055      }
1056      else if(code == ELIO_PENDING_ERR) code = elio_pending_error;
1057 
1058      if(code<OFFSET || code >OFFSET+ERRLEN) *msg=(char)0;
1059      else (void) strcpy(msg, errtable[-OFFSET + code]);
1060 }
1061 
1062 
1063 int elio_pending_error=UNKNFAIL;
1064 
1065 char *errtable[ERRLEN] ={
1066 ">Unable to Seek",
1067 ">Write Failed",
1068 ">Asynchronous Write Failed",
1069 ">Read Failed",
1070 ">Asynchronous Read Failed",
1071 ">Suspend Failed",
1072 ">I/O Request Handle not in Table",
1073 ">Incorrect File Mode",
1074 ">Unable to Determine Directory",
1075 ">Stat For Specified File or Directory Failed",
1076 ">Open Failed",
1077 ">Unable To Allocate Internal Data Structure",
1078 ">Unsupported Feature",
1079 ">Unlink Failed",
1080 ">Close Failed",
1081 ">Operation Interrupted Too Many Times",
1082 ">AIO Return Failed",
1083 ">Name String too Long",
1084 ">Unable to Determine Filesystem Type",
1085 ">Numeric Conversion Error",
1086 ">Incorrect Filesystem/Device Type",
1087 ">Error in Probe",
1088 ">Unable to Truncate",
1089 ">End of File",
1090 ">Fsync Failed",
1091 ""};
1092