1 /*
2 * Library for Posix async read operations with hints.
3 * Author: Don Capps
4 * Company: Iozone
5 * Date: 4/24/1998
6 *
7 * Two models are supported. First model is a replacement for read() where the async
8 * operations are performed and the requested data is bcopy()-ed back into the users
9 * buffer. The second model is a new version of read() where the caller does not
10 * supply the address of the buffer but instead is returned an address to the
11 * location of the data. The second model eliminates a bcopy from the path.
12 *
13 * To use model #1:
14 * 1. Call async_init(&pointer_on_stack,fd,direct_flag);
15 * The fd is the file descriptor for the async operations.
16 * The direct_flag sets VX_DIRECT
17 *
18 * 2. Call async_read(gc, fd, ubuffer, offset, size, stride, max, depth)
19 * Where:
20 * gc ............ is the pointer on the stack
21 * fd ............ is the file descriptor
22 * ubuffer ....... is the address of the user buffer.
23 * offset ........ is the offset in the file to begin reading
24 * size .......... is the size of the transfer.
25 * stride ........ is the distance, in size units, to space the async reads.
26 * max ........... is the max size of the file to be read.
27 * depth ......... is the number of async operations to perform.
28 *
29 * 3. Call end_async(gc) when finished.
30 * Where:
31 * gc ............ is the pointer on the stack.
32 *
33 * To use model #2:
34 * 1. Call async_init(&pointer_on_stack,fd,direct_flag);
35 * The fd is the file descriptor for the async operations.
36 * The direct_flag sets VX_DIRECT
37 * 2. Call async_read(gc, fd, &ubuffer, offset, size, stride, max, depth)
38 * Where:
39 * gc ............ is the pointer on the stack
40 * fd ............ is the file descriptor
41 * ubuffer ....... is the address of a pointer that will be filled in
42 * by the async library.
43 * offset ........ is the offset in the file to begin reading
44 * size .......... is the size of the transfer.
45 * stride ........ is the distance, in size units, to space the async reads.
46 * max ........... is the max size of the file to be read.
47 * depth ......... is the number of async operations to perform.
48 *
49 * 3. Call async_release(gc) when finished with the data that was returned.
50 * This allows the async library to reuse the memory that was filled in
51 * and returned to the user.
52 *
53 * 4. Call end_async(gc) when finished.
54 * Where:
55 * gc ............ is the pointer on the stack.
56 *
57 * To use model #1: (WRITES)
58 * 1. Call async_init(&pointer_on_stack,fd,direct_flag);
59 * The fd is the file descriptor for the async operations.
60 *
61 * 2. Call async_write(gc, fd, ubuffer, size, offset, depth)
62 * Where:
63 * gc ............ is the pointer on the stack
64 * fd ............ is the file descriptor
65 * ubuffer ....... is the address of the user buffer.
66 * size .......... is the size of the transfer.
67 * offset ........ is the offset in the file to begin reading
68 * depth ......... is the number of async operations to perform.
69 *
70 * 4. Call end_async(gc) when finished.
71 * Where:
72 * gc ............ is the pointer on the stack.
73 *
74 * Notes:
75 * The intended use is to replace calls to read() with calls to
76 * async_read() and allow the user to make suggestions on
77 * what kind of async read-ahead would be nice to have.
78 * The first transfer requested is guarenteed to be complete
79 * before returning to the caller. The async operations will
80 * be started and will also be guarenteed to have completed
81 * if the next call specifies its first request to be one
82 * that was previously performed with an async operation.
83 *
84 * The async_read_no_copy() function allows the async operations
85 * to return the data to the user and not have to perform
86 * a bcopy of the data back into the user specified buffer
87 * location. This model is faster but assumes that the user
88 * application has been modified to work with this model.
89 *
90 * The async_write() is intended to enhance the performance of
91 * initial writes to a file. This is the slowest case in the write
92 * path as it must perform meta-data allocations and wait.
93 */
94
95 #include <sys/types.h>
96 #include <aio.h>
97
98 #if defined(_LARGEFILE64_SOURCE) && !defined(__LP64__)
99 # define aio_error aio_error64
100 # define aio_return aio_return64
101 # define aio_read aio_read64
102 # define aio_cancel aio_cancel64
103 # define aio_write aio_write64
104 #endif
105
106 #if defined(solaris) || defined(linux) || defined(SCO_Unixware_gcc) || defined(__NetBSD__)
107 #else
108 #include <sys/timers.h>
109 #endif
110 #include <sys/errno.h>
111 #include <unistd.h>
112 #ifndef bsd4_4
113 #include <malloc.h>
114 #endif
115 #ifdef VXFS
116 #include <sys/fs/vx_ioctl.h>
117 #endif
118
119 #if defined(OSFV5) || defined(linux)
120 #include <string.h>
121 #endif
122
123 #if defined(linux)
124 #include <unistd.h>
125 #include <stdio.h>
126 #include <stdlib.h>
127 #endif
128
129 #if (defined(solaris) && defined(__LP64__)) || defined(__s390x__) || defined(__FreeBSD__) || defined(__NetBSD__)
130 /* If we are building for 64-bit Solaris, all functions that return pointers
131 * must be declared before they are used; otherwise the compiler will assume
132 * that they return ints and the top 32 bits of the pointer will be lost,
133 * causing segmentation faults. The following includes take care of this.
134 * It should be safe to add these for all other OSs too, but we're only
135 * doing it for Solaris now in case another OS turns out to be a special case.
136 */
137 #include <stdio.h>
138 #include <stdlib.h>
139 #include <strings.h> /* For the BSD string functions */
140 #endif
141
142 static void mbcopy(const char *source, char *dest, size_t len);
143
144
145 #if !defined(solaris) && !defined(off64_t) && !defined(_OFF64_T) && !defined(__off64_t_defined) && !defined(SCO_Unixware_gcc)
146 # if defined(bsd4_4)
147 typedef off_t off64_t;
148 # else
149 typedef long long off64_t;
150 # endif
151 #endif
152 #if defined(OSFV5)
153 #include <string.h>
154 #endif
155
156
157 extern long long page_size;
158 extern int one;
159 /*
160 * Internal cache entrys. Each entry on the global
161 * cache, pointed to by async_init(gc) will be of
162 * this structure type.
163 */
164 static const char version[] = "Libasync Version $Revision: 3.34 $";
165 struct cache_ent {
166 #if defined(_LARGEFILE64_SOURCE) && defined(__CrayX1__)
167 aiocb64_t myaiocb; /* For use in large file mode */
168 #elif defined(_LARGEFILE64_SOURCE) && !defined(__LP64__)
169 struct aiocb64 myaiocb; /* For use in large file mode */
170 #else
171 struct aiocb myaiocb;
172 #endif
173 long long fd; /* File descriptor */
174 long long size; /* Size of the transfer */
175 struct cache_ent *forward; /* link to next element on cache list */
176 struct cache_ent *back; /* link to previous element on the cache list */
177 long long direct; /* flag to indicate if the buffer should be */
178 /* de-allocated by library */
179 char *real_address; /* Real address to free */
180
181 volatile void *oldbuf; /* Used for firewall to prevent in flight */
182 /* accidents */
183 int oldfd; /* Used for firewall to prevent in flight */
184 /* accidents */
185 size_t oldsize; /* Used for firewall to prevent in flight */
186 /* accidents */
187 };
188
189 /*
190 * Head of the cache list
191 */
192 struct cache {
193 struct cache_ent *head; /* Head of cache list */
194 struct cache_ent *tail; /* tail of cache list */
195 struct cache_ent *inuse_head; /* head of in-use list */
196 long long count; /* How many elements on the cache list */
197 struct cache_ent *w_head; /* Head of cache list */
198 struct cache_ent *w_tail; /* tail of cache list */
199 long long w_count; /* How many elements on the write list */
200 };
201
202 long long max_depth;
203 extern int errno;
204 static struct cache_ent *alloc_cache();
205 static struct cache_ent *incache();
206
207 #ifdef HAVE_ANSIC_C
208 void async_init(struct cache **,int, int);
209 int async_suspend(struct cache_ent *);
210 void end_async(struct cache *);
211 void takeoff_cache(struct cache *, struct cache_ent *);
212 void del_cache(struct cache *);
213 void putoninuse(struct cache *,struct cache_ent *);
214 void takeoffinuse(struct cache *);
215 struct cache_ent * allocate_write_buffer( struct cache *, long long , long long ,long long, long long, long long, long long, char *, char *);
216 void async_put_on_write_queue(struct cache *, struct cache_ent *);
217 void async_write_finish(struct cache *);
218 void async_wait_for_write(struct cache *);
219 int async_read(struct cache *, long long , char *, off64_t, long long, long long, off64_t, long long);
220 struct cache_ent * alloc_cache(struct cache *gc,long long fd,off64_t offset,long long size,long long op);
221 struct cache_ent * incache(struct cache *, long long, off64_t, long long);
222 int async_read_no_copy(struct cache *, long long, char **, off64_t, long long, long long, off64_t, long long);
223 void async_release(struct cache *gc);
224 size_t async_write(struct cache *,long long, char *, long long, off64_t, long long);
225 size_t async_write_no_copy(struct cache *gc,long long fd,char *buffer,long long size,long long offset,long long depth,char *free_addr);
226 #else
227 void async_init();
228 void end_async();
229 int async_suspend();
230 int async_read();
231 void async_release();
232 struct cache_ent *allocate_write_buffer();
233 size_t async_write();
234 void async_wait_for_write();
235 void async_put_on_write_queue();
236 void async_write_finish();
237 struct cache_ent * alloc_cache();
238 #endif
239
240 /* On Solaris _LP64 will be defined by <sys/types.h> if we're compiling
241 * as a 64-bit binary. Make sure that __LP64__ gets defined in this case,
242 * too -- it should be defined on the compiler command line, but let's
243 * not rely on this.
244 */
245 #if defined(_LP64)
246 #if !defined(__LP64__)
247 #define __LP64__
248 #endif
249 #endif
250
251
252 /***********************************************/
253 /* Initialization routine to setup the library */
254 /***********************************************/
255 #ifdef HAVE_ANSIC_C
async_init(struct cache ** gc,int fd,int flag)256 void async_init(struct cache **gc,int fd,int flag)
257 #else
258 void
259 async_init(gc,fd,flag)
260 struct cache **gc;
261 int fd;
262 int flag;
263 #endif
264 {
265 #ifdef VXFS
266 if(flag)
267 ioctl(fd,VX_SETCACHE,VX_DIRECT);
268 #endif
269 if(*gc)
270 {
271 printf("Warning calling async_init two times ?\n");
272 return;
273 }
274 *gc=(struct cache *)malloc((size_t)sizeof(struct cache));
275 if(*gc == 0)
276 {
277 printf("Malloc failed\n");
278 exit(174);
279 }
280 bzero(*gc,sizeof(struct cache));
281 #if defined(__AIX__) || defined(SCO_Unixware_gcc)
282 max_depth=500;
283 #else
284 max_depth=sysconf(_SC_AIO_MAX);
285 #endif
286 }
287
288 /***********************************************/
289 /* Tear down routine to shutdown the library */
290 /***********************************************/
291 #ifdef HAVE_ANSIC_C
end_async(struct cache * gc)292 void end_async(struct cache *gc)
293 #else
294 void
295 end_async(gc)
296 struct cache *gc;
297 #endif
298 {
299 del_cache(gc);
300 if(gc && (gc->w_head !=NULL))
301 async_write_finish(gc);
302
303 if(gc != NULL)
304 free((void *)gc);
305 gc = NULL;
306 }
307
308 /***********************************************/
309 /* Wait for a request to finish */
310 /***********************************************/
311 #ifdef HAVE_ANSIC_C
312 int
async_suspend(struct cache_ent * ce)313 async_suspend(struct cache_ent *ce)
314 #else
315 int
316 async_suspend(ce)
317 struct cache_ent *ce;
318 #endif
319 {
320 #ifdef _LARGEFILE64_SOURCE
321 #ifdef __LP64__
322 const struct aiocb * const cblist[1] = {&ce->myaiocb};
323 #else
324 const struct aiocb64 * const cblist[1] = {&ce->myaiocb};
325 #endif
326 #else
327 const struct aiocb * const cblist[1] = {&ce->myaiocb};
328 #endif
329
330 #ifdef _LARGEFILE64_SOURCE
331 #ifdef __LP64__
332 return aio_suspend(cblist, 1, NULL);
333 #else
334 return aio_suspend64(cblist, 1, NULL);
335 #endif
336 #else
337 return aio_suspend(cblist, 1, NULL);
338 #endif
339 }
340
341 /*************************************************************************
342 * This routine is a generic async reader assist funtion. It takes
343 * the same calling parameters as read() but also extends the
344 * interface to include:
345 * stride ..... For the async reads, what is the distance, in size units,
346 * to space the reads. Note: Stride of 0 indicates that
347 * you do not want any read-ahead.
348 * max ..... What is the maximum file offset for this operation.
349 * depth ..... How much read-ahead do you want.
350 *
351 * The calls to this will guarentee to complete the read() operation
352 * before returning to the caller. The completion may occur in two
353 * ways. First the operation may be completed by calling aio_read()
354 * and then waiting for it to complete. Second the operation may be
355 * completed by copying the data from a cache of previously completed
356 * async operations.
357 * In the event the read to be satisfied is not in the cache then a
358 * series of async operations will be scheduled and then the first
359 * async read will be completed. In the event that the read() can be
360 * satisfied from the cache then the data is copied back to the
361 * user buffer and a series of async reads will be initiated. If a
362 * read is issued and the cache contains data and the read can not
363 * be satisfied from the cache, then the cache is discarded, and
364 * a new cache is constructed.
365 * Note: All operations are aio_read(). The series will be issued
366 * as asyncs in the order requested. After all are in flight
367 * then the code will wait for the manditory first read.
368 *************************************************************************/
369
370 #ifdef HAVE_ANSIC_C
async_read(struct cache * gc,long long fd,char * ubuffer,off64_t offset,long long size,long long stride,off64_t max,long long depth)371 int async_read(struct cache *gc, long long fd, char *ubuffer, off64_t offset,
372 long long size, long long stride, off64_t max, long long depth)
373 #else
374 int
375 async_read(gc, fd, ubuffer, offset, size, stride, max, depth)
376 struct cache *gc;
377 long long fd;
378 char *ubuffer;
379 off64_t offset;
380 long long size;
381 long long stride;
382 off64_t max;
383 long long depth;
384 #endif
385 {
386 off64_t a_offset,r_offset;
387 long long a_size;
388 struct cache_ent *ce,*first_ce=0;
389 long long i;
390 ssize_t retval=0;
391 ssize_t ret;
392 long long start = 0;
393 long long del_read=0;
394
395 a_offset=offset;
396 a_size = size;
397 /*
398 * Check to see if it can be completed from the cache
399 */
400 if((ce=(struct cache_ent *)incache(gc,fd,offset,size)))
401 {
402 while((ret=aio_error(&ce->myaiocb))== EINPROGRESS)
403 {
404 async_suspend(ce);
405 }
406 if(ret)
407 {
408 printf("aio_error 1: ret %zd %d\n",ret,errno);
409 }
410 retval=aio_return(&ce->myaiocb);
411 if(retval > 0)
412 {
413 mbcopy((char *)ce->myaiocb.aio_buf,(char *)ubuffer,(size_t)retval);
414 }
415 if(retval < ce->myaiocb.aio_nbytes)
416 {
417 printf("aio_return error1: ret %zd %d\n",retval,errno);
418 printf("aio_return error1: fd %d offset %lld buffer %p size %zd Opcode %d\n",
419 ce->myaiocb.aio_fildes,
420 (long long)ce->myaiocb.aio_offset,
421 ce->myaiocb.aio_buf,
422 ce->myaiocb.aio_nbytes,
423 ce->myaiocb.aio_lio_opcode
424 );
425 }
426 ce->direct=0;
427 takeoff_cache(gc,ce);
428 }else
429 {
430 /*
431 * Clear the cache and issue the first request async()
432 */
433 del_cache(gc);
434 del_read++;
435 first_ce=alloc_cache(gc,fd,offset,size,(long long)LIO_READ);
436 again:
437 ret=aio_read(&first_ce->myaiocb);
438 if(ret!=0)
439 {
440 if(errno==EAGAIN)
441 goto again;
442 else
443 printf("error returned from aio_read(). Ret %zd errno %d\n",ret,errno);
444 }
445 }
446 if(stride==0) /* User does not want read-ahead */
447 goto out;
448 if(a_offset<0) /* Before beginning of file */
449 goto out;
450 if(a_offset+size>max) /* After end of file */
451 goto out;
452 if(depth >=(max_depth-1))
453 depth=max_depth-1;
454 if(depth==0)
455 goto out;
456 if(gc->count > 1)
457 start=depth-1;
458 for(i=start;i<depth;i++) /* Issue read-aheads for the depth specified */
459 {
460 r_offset=a_offset+((i+1)*(stride*a_size));
461 if(r_offset<0)
462 continue;
463 if(r_offset+size > max)
464 continue;
465 if((ce=incache(gc,fd,r_offset,a_size)))
466 continue;
467 ce=alloc_cache(gc,fd,r_offset,a_size,(long long)LIO_READ);
468 ret=aio_read(&ce->myaiocb);
469 if(ret!=0)
470 {
471 takeoff_cache(gc,ce);
472 break;
473 }
474 }
475 out:
476 if(del_read) /* Wait for the first read to complete */
477 {
478 while((ret=aio_error(&first_ce->myaiocb))== EINPROGRESS)
479 {
480 async_suspend(first_ce);
481 }
482 if(ret)
483 printf("aio_error 2: ret %zd %d\n",ret,errno);
484 retval=aio_return(&first_ce->myaiocb);
485 if(retval < first_ce->myaiocb.aio_nbytes)
486 {
487 printf("aio_return error2: ret %zd %d\n",retval,errno);
488 printf("aio_return error2: fd %d offset %lld buffer %p size %zd Opcode %d\n",
489 first_ce->myaiocb.aio_fildes,
490 (long long)first_ce->myaiocb.aio_offset,
491 first_ce->myaiocb.aio_buf,
492 first_ce->myaiocb.aio_nbytes,
493 first_ce->myaiocb.aio_lio_opcode
494 );
495 }
496 if(retval > 0)
497 {
498 mbcopy((char *)first_ce->myaiocb.aio_buf,(char *)ubuffer,(size_t)retval);
499 }
500 first_ce->direct=0;
501 takeoff_cache(gc,first_ce);
502 }
503 return((int)retval);
504 }
505
506 /************************************************************************
507 * This routine allocates a cache_entry. It contains the
508 * aiocb block as well as linkage for use in the cache mechanism.
509 * The space allocated here will be released after the cache entry
510 * has been consumed. The routine takeoff_cache() will be called
511 * after the data has been copied to user buffer or when the
512 * cache is purged. The routine takeoff_cache() will also release
513 * all memory associated with this cache entry.
514 ************************************************************************/
515
516 #ifdef HAVE_ANSIC_C
alloc_cache(struct cache * gc,long long fd,off64_t offset,long long size,long long op)517 struct cache_ent * alloc_cache(struct cache *gc,long long fd,off64_t offset,long long size,long long op)
518 #else
519 struct cache_ent *
520 alloc_cache(gc,fd,offset,size,op)
521 struct cache *gc;
522 long long fd,size,op;
523 off64_t offset;
524 #endif
525 {
526 struct cache_ent *ce;
527 intptr_t temp;
528 ce=(struct cache_ent *)malloc((size_t)sizeof(struct cache_ent));
529 if(ce == (struct cache_ent *)0)
530 {
531 printf("Malloc failed\n");
532 exit(175);
533 }
534 bzero(ce,sizeof(struct cache_ent));
535 ce->myaiocb.aio_fildes=(int)fd;
536 ce->myaiocb.aio_offset=(off64_t)offset;
537 ce->real_address = malloc((size_t)(size+page_size));
538 printf("\nAllocate buffer2 %p Size %lld \n",ce->real_address,size+page_size);
539 temp = (intptr_t)ce->real_address;
540 temp = (temp+(page_size-1)) & ~(page_size-1);
541 ce->myaiocb.aio_buf=(volatile void *)temp;
542 if(ce->myaiocb.aio_buf == NULL)
543 {
544 printf("Malloc failed\n");
545 exit(176);
546 }
547 /*bzero(ce->myaiocb.aio_buf,(size_t)size);*/
548 ce->myaiocb.aio_reqprio=0;
549 ce->myaiocb.aio_nbytes=(size_t)size;
550 ce->myaiocb.aio_sigevent.sigev_notify=SIGEV_NONE;
551 ce->myaiocb.aio_lio_opcode=(int)op;
552 ce->fd=(int)fd;
553 ce->forward=0;
554 ce->back=gc->tail;
555 if(gc->tail)
556 gc->tail->forward = ce;
557 gc->tail= ce;
558 if(!gc->head)
559 gc->head=ce;
560 gc->count++;
561 return(ce);
562 }
563
564 /************************************************************************
565 * This routine checks to see if the requested data is in the
566 * cache.
567 *************************************************************************/
568 #ifdef HAVE_ANSIC_C
569 struct cache_ent *
incache(struct cache * gc,long long fd,off64_t offset,long long size)570 incache(struct cache *gc, long long fd, off64_t offset, long long size)
571 #else
572 struct cache_ent *
573 incache(gc,fd,offset,size)
574 struct cache *gc;
575 long long fd,size;
576 off64_t offset;
577 #endif
578 {
579 struct cache_ent *move;
580 if(gc->head==0)
581 {
582 return(0);
583 }
584 move=gc->head;
585 while(move)
586 {
587 if((move->fd == fd) && (move->myaiocb.aio_offset==(off64_t)offset) &&
588 ((size_t)size==move->myaiocb.aio_nbytes))
589 {
590 return(move);
591 }
592 move=move->forward;
593 }
594 return(0);
595 }
596
597 /************************************************************************
598 * This routine removes a specific cache entry from the cache, and
599 * releases all memory associated witht the cache entry (if not direct).
600 *************************************************************************/
601
602 void
takeoff_cache(struct cache * gc,struct cache_ent * ce)603 takeoff_cache(struct cache *gc, struct cache_ent *ce)
604 {
605 struct cache_ent *move;
606 long long found;
607 move=gc->head;
608 if(move==ce) /* Head of list */
609 {
610
611 gc->head=ce->forward;
612 if(gc->head)
613 gc->head->back=0;
614 else
615 gc->tail = 0;
616 if(!ce->direct)
617 {
618 if(ce->real_address != NULL)
619 free((void *)(ce->real_address));
620 ce->real_address = NULL;
621 if(ce != NULL)
622 free((void *)ce);
623 ce = NULL;
624 }
625 gc->count--;
626 return;
627 }
628 found=0;
629 while(move)
630 {
631 if(move==ce)
632 {
633 if(move->forward)
634 {
635 move->forward->back=move->back;
636 }
637 if(move->back)
638 {
639 move->back->forward=move->forward;
640 }
641 found=1;
642 break;
643 }
644 else
645 {
646 move=move->forward;
647 }
648 }
649 if(gc->head == ce)
650 gc->tail = ce;
651 if(!found)
652 printf("Internal Error in takeoff cache\n");
653 move=gc->head;
654 if(!ce->direct)
655 {
656 if(ce->real_address != NULL)
657 free((void *)(ce->real_address));
658 ce->real_address = NULL;
659 if(ce != NULL)
660 free((void *)ce);
661 ce = NULL;
662 }
663 gc->count--;
664 }
665
666 /************************************************************************
667 * This routine is used to purge the entire cache. This is called when
668 * the cache contains data but the incomming read was not able to
669 * be satisfied from the cache. This indicates that the previous
670 * async read-ahead was not correct and a new pattern is emerging.
671 ************************************************************************/
672 #ifdef HAVE_ANSIC_C
673 void
del_cache(struct cache * gc)674 del_cache(struct cache *gc)
675 #else
676 void
677 del_cache(gc)
678 struct cache *gc;
679 #endif
680 {
681 struct cache_ent *ce;
682 ssize_t ret;
683 ce=gc->head;
684 while(1)
685 {
686 ce=gc->head;
687 if(ce==0)
688 return;
689 while((ret = aio_cancel(0,&ce->myaiocb))==AIO_NOTCANCELED)
690 ;
691
692 ret = aio_return(&ce->myaiocb);
693 ce->direct=0;
694 takeoff_cache(gc,ce); /* remove from cache */
695 }
696 }
697
698 /************************************************************************
699 * Like its sister async_read() this function performs async I/O for
700 * all buffers but it differs in that it expects the caller to
701 * request a pointer to the data to be returned instead of handing
702 * the function a location to put the data. This will allow the
703 * async I/O to be performed and does not require any bcopy to be
704 * done to put the data back into the location specified by the caller.
705 ************************************************************************/
706 #ifdef HAVE_ANSIC_C
707 int
async_read_no_copy(struct cache * gc,long long fd,char ** ubuffer,off64_t offset,long long size,long long stride,off64_t max,long long depth)708 async_read_no_copy(struct cache *gc, long long fd, char **ubuffer, off64_t offset, long long size, long long stride, off64_t max, long long depth)
709 #else
710 int
711 async_read_no_copy(gc, fd, ubuffer, offset, size, stride, max, depth)
712 struct cache *gc;
713 long long fd;
714 char **ubuffer;
715 off64_t offset;
716 long long size;
717 long long stride;
718 off64_t max;
719 long long depth;
720 #endif
721 {
722 off64_t a_offset,r_offset;
723 long long a_size;
724 struct cache_ent *ce,*first_ce=0;
725 long long i;
726 ssize_t retval=0;
727 ssize_t ret;
728 long long del_read=0;
729 long long start=0;
730
731 a_offset=offset;
732 a_size = size;
733 /*
734 * Check to see if it can be completed from the cache
735 */
736 if((ce=(struct cache_ent *)incache(gc,fd,offset,size)))
737 {
738 while((ret=aio_error(&ce->myaiocb))== EINPROGRESS)
739 {
740 async_suspend(ce);
741 }
742 if(ret)
743 printf("aio_error 3: ret %zd %d\n",ret,errno);
744 printf("It changed in flight\n");
745
746 retval=aio_return(&ce->myaiocb);
747 if(retval > 0)
748 {
749 *ubuffer= (char *)ce->myaiocb.aio_buf;
750 }else
751 *ubuffer= NULL;
752 if(retval < ce->myaiocb.aio_nbytes)
753 {
754 printf("aio_return error4: ret %zd %d\n",retval,errno);
755 printf("aio_return error4: fd %d offset %lld buffer %p size %zd Opcode %d\n",
756 ce->myaiocb.aio_fildes,
757 (long long)ce->myaiocb.aio_offset,
758 ce->myaiocb.aio_buf,
759 ce->myaiocb.aio_nbytes,
760 ce->myaiocb.aio_lio_opcode
761 );
762 }
763 ce->direct=1;
764 takeoff_cache(gc,ce); /* do not delete buffer*/
765 putoninuse(gc,ce);
766 }else
767 {
768 /*
769 * Clear the cache and issue the first request async()
770 */
771 del_cache(gc);
772 del_read++;
773 first_ce=alloc_cache(gc,fd,offset,size,(long long)LIO_READ); /* allocate buffer */
774 /*printf("allocated buffer/read %x offset %d\n",first_ce->myaiocb.aio_buf,offset);*/
775 again:
776 first_ce->oldbuf=first_ce->myaiocb.aio_buf;
777 first_ce->oldfd=first_ce->myaiocb.aio_fildes;
778 first_ce->oldsize=first_ce->myaiocb.aio_nbytes;
779 ret=aio_read(&first_ce->myaiocb);
780 if(ret!=0)
781 {
782 if(errno==EAGAIN)
783 goto again;
784 else
785 printf("error returned from aio_read(). Ret %zd errno %d\n",ret,errno);
786 }
787 }
788 if(stride==0) /* User does not want read-ahead */
789 goto out;
790 if(a_offset<0) /* Before beginning of file */
791 goto out;
792 if(a_offset+size>max) /* After end of file */
793 goto out;
794 if(depth >=(max_depth-1))
795 depth=max_depth-1;
796 if(depth==0)
797 goto out;
798 if(gc->count > 1)
799 start=depth-1;
800 for(i=start;i<depth;i++) /* Issue read-aheads for the depth specified */
801 {
802 r_offset=a_offset+((i+1)*(stride*a_size));
803 if(r_offset<0)
804 continue;
805 if(r_offset+size > max)
806 continue;
807 if((ce=incache(gc,fd,r_offset,a_size)))
808 continue;
809 ce=alloc_cache(gc,fd,r_offset,a_size,(long long)LIO_READ);
810 ce->oldbuf=ce->myaiocb.aio_buf;
811 ce->oldfd=ce->myaiocb.aio_fildes;
812 ce->oldsize=ce->myaiocb.aio_nbytes;
813 ret=aio_read(&ce->myaiocb);
814 if(ret!=0)
815 {
816 takeoff_cache(gc,ce);
817 break;
818 }
819 }
820 out:
821 if(del_read) /* Wait for the first read to complete */
822 {
823 while((ret=aio_error(&first_ce->myaiocb))== EINPROGRESS)
824 {
825 async_suspend(first_ce);
826 }
827 if(ret)
828 printf("aio_error 4: ret %zd %d\n",ret,errno);
829 if(first_ce->oldbuf != first_ce->myaiocb.aio_buf ||
830 first_ce->oldfd != first_ce->myaiocb.aio_fildes ||
831 first_ce->oldsize != first_ce->myaiocb.aio_nbytes)
832 printf("It changed in flight2\n");
833 retval=aio_return(&first_ce->myaiocb);
834 if(retval < first_ce->myaiocb.aio_nbytes)
835 {
836 printf("aio_return error5: ret %zd %d\n",retval,errno);
837 printf("aio_return error5: fd %d offset %lld buffer %p size %zd Opcode %d\n",
838 first_ce->myaiocb.aio_fildes,
839 (long long)first_ce->myaiocb.aio_offset,
840 first_ce->myaiocb.aio_buf,
841 first_ce->myaiocb.aio_nbytes,
842 first_ce->myaiocb.aio_lio_opcode
843 );
844 }
845 if(retval > 0)
846 {
847 *ubuffer= (char *)first_ce->myaiocb.aio_buf;
848 }else
849 *ubuffer= NULL;
850 first_ce->direct=1; /* do not delete the buffer */
851 takeoff_cache(gc,first_ce);
852 putoninuse(gc,first_ce);
853 }
854 return((int)retval);
855 }
856
857 /************************************************************************
858 * The caller is now finished with the data that was provided so
859 * the library is now free to return the memory to the pool for later
860 * reuse.
861 ************************************************************************/
862 #ifdef HAVE_ANSIC_C
async_release(struct cache * gc)863 void async_release(struct cache *gc)
864 #else
865 void
866 async_release(gc)
867 struct cache *gc;
868 #endif
869 {
870 takeoffinuse(gc);
871 }
872
873
874 /************************************************************************
875 * Put the buffer on the inuse list. When the user is finished with
876 * the buffer it will call back into async_release and the items on the
877 * inuse list will be deallocated.
878 ************************************************************************/
879 #ifdef HAVE_ANSIC_C
880 void
putoninuse(struct cache * gc,struct cache_ent * entry)881 putoninuse(struct cache *gc,struct cache_ent *entry)
882 #else
883 void
884 putoninuse(gc,entry)
885 struct cache *gc;
886 struct cache_ent *entry;
887 #endif
888 {
889 if(gc->inuse_head)
890 entry->forward=gc->inuse_head;
891 else
892 entry->forward=0;
893 gc->inuse_head=entry;
894 }
895
896 /************************************************************************
897 * This is called when the application is finished with the data that
898 * was provided. The memory may now be returned to the pool.
899 ************************************************************************/
900 #ifdef HAVE_ANSIC_C
901 void
takeoffinuse(struct cache * gc)902 takeoffinuse(struct cache *gc)
903 #else
904 void
905 takeoffinuse(gc)
906 struct cache *gc;
907 #endif
908 {
909 struct cache_ent *ce;
910 if(gc->inuse_head==0)
911 printf("Takeoffinuse error\n");
912 ce=gc->inuse_head;
913 gc->inuse_head=gc->inuse_head->forward;
914
915 if(gc->inuse_head !=0)
916 printf("Error in take off inuse\n");
917 if(ce->real_address != NULL)
918 free((void*)(ce->real_address));
919 ce->real_address = NULL;
920 if(ce != NULL)
921 free(ce);
922 ce = NULL;
923 }
924
925 /*************************************************************************
926 * This routine is a generic async writer assist funtion. It takes
927 * the same calling parameters as write() but also extends the
928 * interface to include:
929 *
930 * offset ..... offset in the file.
931 * depth ..... How much read-ahead do you want.
932 *
933 *************************************************************************/
934 #ifdef HAVE_ANSIC_C
935 size_t
async_write(struct cache * gc,long long fd,char * buffer,long long size,off64_t offset,long long depth)936 async_write(struct cache *gc,long long fd,char *buffer,long long size,off64_t offset,long long depth)
937 #else
938 size_t
939 async_write(gc,fd,buffer,size,offset,depth)
940 struct cache *gc;
941 long long fd,size;
942 char *buffer;
943 off64_t offset;
944 long long depth;
945 #endif
946 {
947 struct cache_ent *ce;
948 size_t ret;
949 ce=allocate_write_buffer(gc,fd,offset,size,(long long)LIO_WRITE,depth,0LL,(char *)0,(char *)0);
950 ce->direct=0; /* not direct. Lib supplies buffer and must free it */
951 mbcopy(buffer,(char *)(ce->myaiocb.aio_buf),(size_t)size);
952 async_put_on_write_queue(gc,ce);
953 /*
954 printf("asw: fd %d offset %lld, size %zd\n",ce->myaiocb.aio_fildes,
955 ce->myaiocb.aio_offset,
956 ce->myaiocb.aio_nbytes);
957 */
958
959 again:
960 ret=aio_write(&ce->myaiocb);
961 if(ret==-1)
962 {
963 if(errno==EAGAIN)
964 {
965 async_wait_for_write(gc);
966 goto again;
967 }
968 if(errno==0)
969 {
970 /* Compensate for bug in async library */
971 async_wait_for_write(gc);
972 goto again;
973 }
974 else
975 {
976 printf("Error in aio_write: ret %zd errno %d count %lld\n",ret,errno,gc->w_count);
977 /*
978 printf("aio_write_no_copy: fd %d buffer %x offset %lld size %zd\n",
979 ce->myaiocb.aio_fildes,
980 ce->myaiocb.aio_buf,
981 ce->myaiocb.aio_offset,
982 ce->myaiocb.aio_nbytes);
983 */
984 exit(177);
985 }
986 }
987 return((ssize_t)size);
988 }
989
990 /*************************************************************************
991 * Allocate a write aiocb and write buffer of the size specified. Also
992 * put some extra buffer padding so that VX_DIRECT can do its job when
993 * needed.
994 *************************************************************************/
995
996 #ifdef HAVE_ANSIC_C
997 struct cache_ent *
allocate_write_buffer(struct cache * gc,long long fd,long long offset,long long size,long long op,long long w_depth,long long direct,char * buffer,char * free_addr)998 allocate_write_buffer( struct cache *gc, long long fd, long long offset, long long size,long long op,
999 long long w_depth, long long direct, char *buffer, char *free_addr)
1000 #else
1001 struct cache_ent *
1002 allocate_write_buffer(gc,fd,offset,size,op,w_depth,direct,buffer,free_addr)
1003 struct cache *gc;
1004 long long fd,size,op;
1005 off64_t offset;
1006 long long w_depth;
1007 long long direct;
1008 char *buffer,*free_addr;
1009 #endif
1010 {
1011 struct cache_ent *ce;
1012 intptr_t temp;
1013 if(fd==0LL)
1014 {
1015 printf("Setting up write buffer insane\n");
1016 exit(178);
1017 }
1018 if(gc->w_count > w_depth)
1019 async_wait_for_write(gc);
1020 ce=(struct cache_ent *)malloc((size_t)sizeof(struct cache_ent));
1021 if(ce == (struct cache_ent *)0)
1022 {
1023 printf("Malloc failed 1\n");
1024 exit(179);
1025 }
1026 bzero(ce,sizeof(struct cache_ent));
1027 ce->myaiocb.aio_fildes=(int)fd;
1028 ce->myaiocb.aio_offset=(off_t)offset;
1029 if(!direct)
1030 {
1031 ce->real_address = malloc((size_t)(size+page_size));
1032 temp = (intptr_t)ce->real_address;
1033 temp = (temp+(page_size-1)) & ~(page_size-1);
1034 ce->myaiocb.aio_buf=(volatile void *)temp;
1035 }
1036 else
1037 {
1038 ce->myaiocb.aio_buf=(volatile void *)buffer;
1039 ce->real_address=(char *)free_addr;
1040 }
1041 if(ce->myaiocb.aio_buf == 0)
1042 {
1043 printf("Malloc failed 2\n");
1044 exit(180);
1045 }
1046 ce->myaiocb.aio_reqprio=0;
1047 ce->myaiocb.aio_nbytes=(size_t)size;
1048 ce->myaiocb.aio_sigevent.sigev_notify=SIGEV_NONE;
1049 ce->myaiocb.aio_lio_opcode=(int)op;
1050 ce->fd=(int)fd;
1051 return(ce);
1052 }
1053
1054 /*************************************************************************
1055 * Put it on the outbound queue.
1056 *************************************************************************/
1057
1058 #ifdef HAVE_ANSIC_C
1059 void
async_put_on_write_queue(struct cache * gc,struct cache_ent * ce)1060 async_put_on_write_queue(struct cache *gc,struct cache_ent *ce)
1061 #else
1062 void
1063 async_put_on_write_queue(gc,ce)
1064 struct cache *gc;
1065 struct cache_ent *ce;
1066 #endif
1067 {
1068 ce->forward=0;
1069 ce->back=gc->w_tail;
1070 if(gc->w_tail)
1071 gc->w_tail->forward = ce;
1072 gc->w_tail= ce;
1073 if(!gc->w_head)
1074 gc->w_head=ce;
1075 gc->w_count++;
1076 return;
1077 }
1078
1079 /*************************************************************************
1080 * Cleanup all outstanding writes
1081 *************************************************************************/
1082 #ifdef HAVE_AHSIC_C
1083 void
async_write_finish(struct cache * gc)1084 async_write_finish(struct cache *gc)
1085 #else
1086 void
1087 async_write_finish(gc)
1088 struct cache *gc;
1089 #endif
1090 {
1091 while(gc->w_head)
1092 {
1093 async_wait_for_write(gc);
1094 }
1095 }
1096
1097 /*************************************************************************
1098 * Wait for an I/O to finish
1099 *************************************************************************/
1100
1101 #ifdef HAVE_ANSIC_C
1102 void
async_wait_for_write(struct cache * gc)1103 async_wait_for_write(struct cache *gc)
1104 #else
1105 void
1106 async_wait_for_write(gc)
1107 struct cache *gc;
1108 #endif
1109 {
1110 struct cache_ent *ce;
1111 size_t ret;
1112 int retval;
1113 if(gc->w_head==0)
1114 return;
1115 ce=gc->w_head;
1116 if(ce == NULL)
1117 return;
1118 gc->w_head=ce->forward;
1119 gc->w_count--;
1120 ce->forward=NULL;
1121 if(ce==gc->w_tail)
1122 gc->w_tail=0;
1123 /*printf("Wait for buffer %x offset %lld size %zd to finish\n",
1124 ce->myaiocb.aio_buf,
1125 ce->myaiocb.aio_offset,
1126 ce->myaiocb.aio_nbytes);
1127 printf("write count %lld \n",gc->w_count);
1128 */
1129 while((ret=aio_error(&ce->myaiocb))== EINPROGRESS)
1130 {
1131 async_suspend(ce);
1132 }
1133 if(ret)
1134 {
1135 printf("aio_error 5: ret %zd %d\n",ret,errno);
1136 printf("fd %d offset %lld size %zd\n",
1137 ce->myaiocb.aio_fildes,
1138 (long long)ce->myaiocb.aio_offset,
1139 ce->myaiocb.aio_nbytes);
1140 exit(181);
1141 }
1142
1143 retval=aio_return(&ce->myaiocb);
1144 if(retval < 0)
1145 {
1146 printf("aio_return error: %d\n",errno);
1147 }
1148
1149 if(!ce->direct)
1150 {
1151 if(ce->real_address != NULL)
1152 free((void *)(ce->real_address)); /* Causes crash. */
1153 ce->real_address=NULL;
1154 if(ce != NULL)
1155 free((void *)ce);
1156 ce=NULL;
1157 }
1158
1159 }
1160
1161 /*************************************************************************
1162 * This routine is a generic async writer assist funtion. It takes
1163 * the same calling parameters as write() but also extends the
1164 * interface to include:
1165 *
1166 * offset ..... offset in the file.
1167 * depth ..... How much read-ahead do you want.
1168 * free_addr .. address of memory to free after write is completed.
1169 *
1170 *************************************************************************/
1171 #ifdef HAVE_ANSIC_C
1172 size_t
async_write_no_copy(struct cache * gc,long long fd,char * buffer,long long size,long long offset,long long depth,char * free_addr)1173 async_write_no_copy(struct cache *gc,long long fd,char *buffer,long long size,long long offset,long long depth,char *free_addr)
1174 #else
1175 size_t
1176 async_write_no_copy(gc,fd,buffer,size,offset,depth,free_addr)
1177 struct cache *gc;
1178 long long fd,size;
1179 char *buffer;
1180 long long offset;
1181 long long depth;
1182 char *free_addr;
1183 #endif
1184 {
1185 struct cache_ent *ce;
1186 size_t ret;
1187 long long direct = 1;
1188 ce=allocate_write_buffer(gc,fd,offset,size,(long long)LIO_WRITE,depth,direct,buffer,free_addr);
1189 ce->direct=0; /* have library de-allocate the buffer */
1190 async_put_on_write_queue(gc,ce);
1191 /*
1192 printf("awnc: fd %d offset %lld, size %zd\n",ce->myaiocb.aio_fildes,
1193 ce->myaiocb.aio_offset,
1194 ce->myaiocb.aio_nbytes);
1195 */
1196
1197 again:
1198 ret=aio_write(&ce->myaiocb);
1199 if(ret==-1)
1200 {
1201 if(errno==EAGAIN)
1202 {
1203 async_wait_for_write(gc);
1204 goto again;
1205 }
1206 if(errno==0)
1207 {
1208 /* Compensate for bug in async library */
1209 async_wait_for_write(gc);
1210 goto again;
1211 }
1212 else
1213 {
1214 printf("Error in aio_write: ret %zd errno %d\n",ret,errno);
1215 printf("aio_write_no_copy: fd %d buffer %p offset %lld size %zd\n",
1216 ce->myaiocb.aio_fildes,
1217 ce->myaiocb.aio_buf,
1218 (long long)ce->myaiocb.aio_offset,
1219 ce->myaiocb.aio_nbytes);
1220 exit(182);
1221 }
1222 }
1223 else
1224 {
1225 return((ssize_t)size);
1226 }
1227 }
1228
mbcopy(source,dest,len)1229 void mbcopy(source, dest, len)
1230 const char *source;
1231 char *dest;
1232 size_t len;
1233 {
1234 int i;
1235 for(i=0;i<len;i++)
1236 *dest++=*source++;
1237 }
1238
1239