1 /*
2  * Library for Posix async read operations with hints.
3  * Author: Don Capps
4  * Company: Iozone
5  * Date: 4/24/1998
6  *
7  * Two models are supported.  First model is a replacement for read() where the async
8  * operations are performed and the requested data is bcopy()-ed back into the users
9  * buffer. The second model is a new version of read() where the caller does not
10  * supply the address of the buffer but instead is returned an address to the
11  * location of the data. The second model eliminates a bcopy from the path.
12  *
13  * To use model #1:
14  * 1. Call async_init(&pointer_on_stack,fd,direct_flag);
15  *	The fd is the file descriptor for the async operations.
16  *	The direct_flag sets VX_DIRECT
17  *
18  * 2. Call async_read(gc, fd, ubuffer, offset, size, stride, max, depth)
19  *    	Where:
20  *	gc ............	is the pointer on the stack
21  *	fd ............	is the file descriptor
22  *	ubuffer .......	is the address of the user buffer.
23  *	offset ........	is the offset in the file to begin reading
24  *	size ..........	is the size of the transfer.
25  *	stride ........	is the distance, in size units, to space the async reads.
26  *	max ...........	is the max size of the file to be read.
27  *	depth .........	is the number of async operations to perform.
28  *
29  * 3. Call end_async(gc) when finished.
30  *	Where:
31  *	gc ............ is the pointer on the stack.
32  *
33  * To use model #2:
34  * 1. Call async_init(&pointer_on_stack,fd,direct_flag);
35  *	The fd is the file descriptor for the async operations.
36  *	The direct_flag sets VX_DIRECT
37  * 2. Call async_read(gc, fd, &ubuffer, offset, size, stride, max, depth)
38  *    	Where:
39  *	gc ............	is the pointer on the stack
40  *	fd ............	is the file descriptor
41  *	ubuffer .......	is the address of a pointer that will be filled in
42  *                      by the async library.
43  *	offset ........	is the offset in the file to begin reading
44  *	size ..........	is the size of the transfer.
45  *	stride ........	is the distance, in size units, to space the async reads.
46  *	max ...........	is the max size of the file to be read.
47  *	depth .........	is the number of async operations to perform.
48  *
49  * 3. Call async_release(gc) when finished with the data that was returned.
50  *    This allows the async library to reuse the memory that was filled in
51  *    and returned to the user.
52  *
53  * 4. Call end_async(gc) when finished.
54  *	Where:
55  *	gc ............ is the pointer on the stack.
56  *
57  * To use model #1: (WRITES)
58  * 1. Call async_init(&pointer_on_stack,fd,direct_flag);
59  *	The fd is the file descriptor for the async operations.
60  *
61  * 2. Call async_write(gc, fd, ubuffer, size, offset, depth)
62  *    	Where:
63  *	gc ............	is the pointer on the stack
64  *	fd ............	is the file descriptor
65  *	ubuffer .......	is the address of the user buffer.
66  *	size ..........	is the size of the transfer.
67  *	offset ........	is the offset in the file to begin reading
68  *	depth .........	is the number of async operations to perform.
69  *
70  * 4. Call end_async(gc) when finished.
71  *	Where:
72  *	gc ............ is the pointer on the stack.
73  *
74  * Notes:
75  *	The intended use is to replace calls to read() with calls to
76  *	async_read() and allow the user to make suggestions on
77  *	what kind of async read-ahead would be nice to have.
78  *	The first transfer requested is guarenteed to be complete
79  *	before returning to the caller. The async operations will
80  *	be started and will also be guarenteed to have completed
81  *	if the next call specifies its first request to be one
82  *	that was previously performed with an async operation.
83  *
84  *	The async_read_no_copy() function allows the async operations
85  *	to return the data to the user and not have to perform
86  *	a bcopy of the data back into the user specified buffer
87  *	location. This model is faster but assumes that the user
88  *	application has been modified to work with this model.
89  *
90  * 	The async_write() is intended to enhance the performance of
91  *	initial writes to a file. This is the slowest case in the write
92  *	path as it must perform meta-data allocations and wait.
93  */
94 
95 #include <sys/types.h>
96 #include <aio.h>
97 
98 #if defined(_LARGEFILE64_SOURCE) && !defined(__LP64__)
99 #	define aio_error	aio_error64
100 #	define aio_return	aio_return64
101 #	define aio_read 	aio_read64
102 #	define aio_cancel	aio_cancel64
103 #	define aio_write	aio_write64
104 #endif
105 
106 #if defined(solaris) || defined(linux) || defined(SCO_Unixware_gcc) || defined(__NetBSD__)
107 #else
108 #include <sys/timers.h>
109 #endif
110 #include <sys/errno.h>
111 #include <unistd.h>
112 #ifndef bsd4_4
113 #include <malloc.h>
114 #endif
115 #ifdef VXFS
116 #include <sys/fs/vx_ioctl.h>
117 #endif
118 
119 #if defined(OSFV5) || defined(linux)
120 #include <string.h>
121 #endif
122 
123 #if defined(linux)
124 #include <unistd.h>
125 #include <stdio.h>
126 #include <stdlib.h>
127 #endif
128 
129 #if (defined(solaris) && defined(__LP64__)) || defined(__s390x__) || defined(__FreeBSD__) || defined(__NetBSD__)
130 /* If we are building for 64-bit Solaris, all functions that return pointers
131  * must be declared before they are used; otherwise the compiler will assume
132  * that they return ints and the top 32 bits of the pointer will be lost,
133  * causing segmentation faults.  The following includes take care of this.
134  * It should be safe to add these for all other OSs too, but we're only
135  * doing it for Solaris now in case another OS turns out to be a special case.
136  */
137 #include <stdio.h>
138 #include <stdlib.h>
139 #include <strings.h> /* For the BSD string functions */
140 #endif
141 
142 static void mbcopy(const char *source, char *dest, size_t len);
143 
144 
145 #if !defined(solaris) && !defined(off64_t) && !defined(_OFF64_T) && !defined(__off64_t_defined) && !defined(SCO_Unixware_gcc)
146 #	if defined(bsd4_4)
147 typedef off_t off64_t;
148 #	else
149 typedef long long off64_t;
150 #	endif
151 #endif
152 #if defined(OSFV5)
153 #include <string.h>
154 #endif
155 
156 
157 extern long long page_size;
158 extern int one;
159 /*
160  * Internal cache entrys. Each entry on the global
161  * cache, pointed to by async_init(gc) will be of
162  * this structure type.
163  */
164 static const char version[] = "Libasync Version $Revision: 3.34 $";
165 struct cache_ent {
166 #if defined(_LARGEFILE64_SOURCE) && defined(__CrayX1__)
167 	aiocb64_t myaiocb;		/* For use in large file mode */
168 #elif defined(_LARGEFILE64_SOURCE) && !defined(__LP64__)
169 	struct aiocb64 myaiocb;		/* For use in large file mode */
170 #else
171 	struct aiocb myaiocb;
172 #endif
173 	long long fd;				/* File descriptor */
174 	long long size;				/* Size of the transfer */
175 	struct cache_ent *forward;		/* link to next element on cache list */
176 	struct cache_ent *back;			/* link to previous element on the cache list */
177 	long long direct;			/* flag to indicate if the buffer should be */
178 						/* de-allocated by library */
179 	char *real_address;			/* Real address to free */
180 
181 	volatile void *oldbuf;			/* Used for firewall to prevent in flight */
182 						/* accidents */
183 	int oldfd;				/* Used for firewall to prevent in flight */
184 						/* accidents */
185 	size_t oldsize;				/* Used for firewall to prevent in flight */
186 						/* accidents */
187 };
188 
189 /*
190  * Head of the cache list
191  */
192 struct cache {
193 	struct cache_ent *head;		/* Head of cache list */
194 	struct cache_ent *tail;		/* tail of cache list */
195 	struct cache_ent *inuse_head;	/* head of in-use list */
196 	long long count;		/* How many elements on the cache list */
197 	struct cache_ent *w_head;		/* Head of cache list */
198 	struct cache_ent *w_tail;		/* tail of cache list */
199 	long long w_count;		/* How many elements on the write list */
200 	};
201 
202 long long max_depth;
203 extern int errno;
204 static struct cache_ent *alloc_cache();
205 static struct cache_ent *incache();
206 
207 #ifdef HAVE_ANSIC_C
208 void async_init(struct cache **,int, int);
209 int async_suspend(struct cache_ent *);
210 void end_async(struct cache *);
211 void takeoff_cache(struct cache *, struct cache_ent *);
212 void del_cache(struct cache *);
213 void putoninuse(struct cache *,struct cache_ent *);
214 void takeoffinuse(struct cache *);
215 struct cache_ent * allocate_write_buffer( struct cache *, long long , long long ,long long, long long, long long, long long, char *, char *);
216 void async_put_on_write_queue(struct cache *, struct cache_ent *);
217 void async_write_finish(struct cache *);
218 void async_wait_for_write(struct cache *);
219 int async_read(struct cache *, long long , char *, off64_t, long long, long long, off64_t, long long);
220 struct cache_ent * alloc_cache(struct cache *gc,long long fd,off64_t offset,long long size,long long op);
221 struct cache_ent * incache(struct cache *, long long, off64_t, long long);
222 int async_read_no_copy(struct cache *, long long, char **, off64_t, long long, long long, off64_t, long long);
223 void async_release(struct cache *gc);
224 size_t async_write(struct cache *,long long, char *, long long, off64_t, long long);
225 size_t async_write_no_copy(struct cache *gc,long long fd,char *buffer,long long size,long long offset,long long depth,char *free_addr);
226 #else
227 void async_init();
228 void end_async();
229 int async_suspend();
230 int async_read();
231 void async_release();
232 struct cache_ent *allocate_write_buffer();
233 size_t async_write();
234 void async_wait_for_write();
235 void async_put_on_write_queue();
236 void async_write_finish();
237 struct cache_ent * alloc_cache();
238 #endif
239 
240 /* On Solaris _LP64 will be defined by <sys/types.h> if we're compiling
241  * as a 64-bit binary.  Make sure that __LP64__ gets defined in this case,
242  * too -- it should be defined on the compiler command line, but let's
243  * not rely on this.
244  */
245 #if defined(_LP64)
246 #if !defined(__LP64__)
247 #define __LP64__
248 #endif
249 #endif
250 
251 
252 /***********************************************/
253 /* Initialization routine to setup the library */
254 /***********************************************/
255 #ifdef HAVE_ANSIC_C
async_init(struct cache ** gc,int fd,int flag)256 void async_init(struct cache **gc,int fd,int flag)
257 #else
258 void
259 async_init(gc,fd,flag)
260 struct cache **gc;
261 int fd;
262 int flag;
263 #endif
264 {
265 #ifdef VXFS
266 	if(flag)
267 		ioctl(fd,VX_SETCACHE,VX_DIRECT);
268 #endif
269 	if(*gc)
270 	{
271 		printf("Warning calling async_init two times ?\n");
272 		return;
273 	}
274 	*gc=(struct cache *)malloc((size_t)sizeof(struct cache));
275 	if(*gc == 0)
276 	{
277 		printf("Malloc failed\n");
278 		exit(174);
279 	}
280 	bzero(*gc,sizeof(struct cache));
281 #if defined(__AIX__) || defined(SCO_Unixware_gcc)
282 	max_depth=500;
283 #else
284 	max_depth=sysconf(_SC_AIO_MAX);
285 #endif
286 }
287 
288 /***********************************************/
289 /* Tear down routine to shutdown the library   */
290 /***********************************************/
291 #ifdef HAVE_ANSIC_C
end_async(struct cache * gc)292 void end_async(struct cache *gc)
293 #else
294 void
295 end_async(gc)
296 struct cache *gc;
297 #endif
298 {
299 	del_cache(gc);
300 	if(gc && (gc->w_head !=NULL))
301 	   async_write_finish(gc);
302 
303 	if(gc != NULL)
304 	   free((void *)gc);
305 	gc = NULL;
306 }
307 
308 /***********************************************/
309 /* Wait for a request to finish                */
310 /***********************************************/
311 #ifdef HAVE_ANSIC_C
312 int
async_suspend(struct cache_ent * ce)313 async_suspend(struct cache_ent *ce)
314 #else
315 int
316 async_suspend(ce)
317 struct cache_ent *ce;
318 #endif
319 {
320 #ifdef _LARGEFILE64_SOURCE
321 #ifdef __LP64__
322 	const struct aiocb * const cblist[1] = {&ce->myaiocb};
323 #else
324 	const struct aiocb64 * const cblist[1] = {&ce->myaiocb};
325 #endif
326 #else
327 	const struct aiocb * const cblist[1] = {&ce->myaiocb};
328 #endif
329 
330 #ifdef _LARGEFILE64_SOURCE
331 #ifdef __LP64__
332 	return aio_suspend(cblist, 1, NULL);
333 #else
334 	return aio_suspend64(cblist, 1, NULL);
335 #endif
336 #else
337 	return aio_suspend(cblist, 1, NULL);
338 #endif
339 }
340 
341 /*************************************************************************
342  * This routine is a generic async reader assist funtion. It takes
343  * the same calling parameters as read() but also extends the
344  * interface to include:
345  * stride ..... For the async reads, what is the distance, in size units,
346  * 		to space the reads. Note: Stride of 0 indicates that
347  *		you do not want any read-ahead.
348  * max    ..... What is the maximum file offset for this operation.
349  * depth  ..... How much read-ahead do you want.
350  *
351  * The calls to this will guarentee to complete the read() operation
352  * before returning to the caller. The completion may occur in two
353  * ways. First the operation may be completed by calling aio_read()
354  * and then waiting for it to complete. Second  the operation may be
355  * completed by copying the data from a cache of previously completed
356  * async operations.
357  * In the event the read to be satisfied is not in the cache then a
358  * series of async operations will be scheduled and then the first
359  * async read will be completed. In the event that the read() can be
360  * satisfied from the cache then the data is copied back to the
361  * user buffer and a series of async reads will be initiated.  If a
362  * read is issued and the cache contains data and the read can not
363  * be satisfied from the cache, then the cache is discarded, and
364  * a new cache is constructed.
365  * Note: All operations are aio_read(). The series will be issued
366  * as asyncs in the order requested. After all are in flight
367  * then the code will wait for the manditory first read.
368  *************************************************************************/
369 
370 #ifdef HAVE_ANSIC_C
async_read(struct cache * gc,long long fd,char * ubuffer,off64_t offset,long long size,long long stride,off64_t max,long long depth)371 int async_read(struct cache *gc, long long fd, char *ubuffer, off64_t offset,
372 	long long size, long long stride, off64_t max, long long depth)
373 #else
374 int
375 async_read(gc, fd, ubuffer, offset, size, stride, max, depth)
376 struct cache *gc;
377 long long fd;
378 char *ubuffer;
379 off64_t offset;
380 long long size;
381 long long stride;
382 off64_t max;
383 long long depth;
384 #endif
385 {
386 	off64_t a_offset,r_offset;
387 	long long a_size;
388 	struct cache_ent *ce,*first_ce=0;
389 	long long i;
390 	ssize_t retval=0;
391 	ssize_t ret;
392 	long long start = 0;
393 	long long del_read=0;
394 
395 	a_offset=offset;
396 	a_size = size;
397 	/*
398 	 * Check to see if it can be completed from the cache
399 	 */
400 	if((ce=(struct cache_ent *)incache(gc,fd,offset,size)))
401 	{
402 		while((ret=aio_error(&ce->myaiocb))== EINPROGRESS)
403 		{
404 			async_suspend(ce);
405 		}
406 		if(ret)
407 		{
408 			printf("aio_error 1: ret %zd %d\n",ret,errno);
409 		}
410 		retval=aio_return(&ce->myaiocb);
411 		if(retval > 0)
412 		{
413 			mbcopy((char *)ce->myaiocb.aio_buf,(char *)ubuffer,(size_t)retval);
414 		}
415 		if(retval < ce->myaiocb.aio_nbytes)
416 		{
417 			printf("aio_return error1: ret %zd %d\n",retval,errno);
418 			printf("aio_return error1: fd %d offset %lld buffer %p size %zd Opcode %d\n",
419 				ce->myaiocb.aio_fildes,
420 				(long long)ce->myaiocb.aio_offset,
421 				ce->myaiocb.aio_buf,
422 				ce->myaiocb.aio_nbytes,
423 				ce->myaiocb.aio_lio_opcode
424 				);
425 		}
426 		ce->direct=0;
427 		takeoff_cache(gc,ce);
428 	}else
429 	{
430 		/*
431 		 * Clear the cache and issue the first request async()
432 		 */
433 		del_cache(gc);
434 		del_read++;
435 		first_ce=alloc_cache(gc,fd,offset,size,(long long)LIO_READ);
436 again:
437 		ret=aio_read(&first_ce->myaiocb);
438 		if(ret!=0)
439 		{
440 			if(errno==EAGAIN)
441 				goto again;
442 			else
443 				printf("error returned from aio_read(). Ret %zd errno %d\n",ret,errno);
444 		}
445 	}
446 	if(stride==0)	 /* User does not want read-ahead */
447 		goto out;
448 	if(a_offset<0)	/* Before beginning of file */
449 		goto out;
450 	if(a_offset+size>max)	/* After end of file */
451 		goto out;
452 	if(depth >=(max_depth-1))
453 		depth=max_depth-1;
454 	if(depth==0)
455 		goto out;
456 	if(gc->count > 1)
457 		start=depth-1;
458 	for(i=start;i<depth;i++)	/* Issue read-aheads for the depth specified */
459 	{
460 		r_offset=a_offset+((i+1)*(stride*a_size));
461 		if(r_offset<0)
462 			continue;
463 		if(r_offset+size > max)
464 			continue;
465 		if((ce=incache(gc,fd,r_offset,a_size)))
466 			continue;
467 		ce=alloc_cache(gc,fd,r_offset,a_size,(long long)LIO_READ);
468 		ret=aio_read(&ce->myaiocb);
469 		if(ret!=0)
470 		{
471 			takeoff_cache(gc,ce);
472 			break;
473 		}
474 	}
475 out:
476 	if(del_read)	/* Wait for the first read to complete */
477 	{
478 		while((ret=aio_error(&first_ce->myaiocb))== EINPROGRESS)
479 		{
480 			async_suspend(first_ce);
481 		}
482 		if(ret)
483 			printf("aio_error 2: ret %zd %d\n",ret,errno);
484 		retval=aio_return(&first_ce->myaiocb);
485 		if(retval < first_ce->myaiocb.aio_nbytes)
486 		{
487 			printf("aio_return error2: ret %zd %d\n",retval,errno);
488 			printf("aio_return error2: fd %d offset %lld buffer %p size %zd Opcode %d\n",
489 				first_ce->myaiocb.aio_fildes,
490 				(long long)first_ce->myaiocb.aio_offset,
491 				first_ce->myaiocb.aio_buf,
492 				first_ce->myaiocb.aio_nbytes,
493 				first_ce->myaiocb.aio_lio_opcode
494 				);
495 		}
496 		if(retval > 0)
497 		{
498 			mbcopy((char *)first_ce->myaiocb.aio_buf,(char *)ubuffer,(size_t)retval);
499 		}
500 		first_ce->direct=0;
501 		takeoff_cache(gc,first_ce);
502 	}
503 	return((int)retval);
504 }
505 
506 /************************************************************************
507  * This routine allocates a cache_entry. It contains the
508  * aiocb block as well as linkage for use in the cache mechanism.
509  * The space allocated here will be released after the cache entry
510  * has been consumed. The routine takeoff_cache() will be called
511  * after the data has been copied to user buffer or when the
512  * cache is purged. The routine takeoff_cache() will also release
513  * all memory associated with this cache entry.
514  ************************************************************************/
515 
516 #ifdef HAVE_ANSIC_C
alloc_cache(struct cache * gc,long long fd,off64_t offset,long long size,long long op)517 struct cache_ent * alloc_cache(struct cache *gc,long long fd,off64_t offset,long long size,long long op)
518 #else
519 struct cache_ent *
520 alloc_cache(gc,fd,offset,size,op)
521 struct cache *gc;
522 long long fd,size,op;
523 off64_t offset;
524 #endif
525 {
526 	struct cache_ent *ce;
527 	intptr_t temp;
528 	ce=(struct cache_ent *)malloc((size_t)sizeof(struct cache_ent));
529 	if(ce == (struct cache_ent *)0)
530 	{
531 		printf("Malloc failed\n");
532 		exit(175);
533 	}
534 	bzero(ce,sizeof(struct cache_ent));
535 	ce->myaiocb.aio_fildes=(int)fd;
536 	ce->myaiocb.aio_offset=(off64_t)offset;
537 	ce->real_address = malloc((size_t)(size+page_size));
538 printf("\nAllocate buffer2 %p Size %lld \n",ce->real_address,size+page_size);
539 	temp = (intptr_t)ce->real_address;
540 	temp = (temp+(page_size-1)) & ~(page_size-1);
541 	ce->myaiocb.aio_buf=(volatile void *)temp;
542 	if(ce->myaiocb.aio_buf == NULL)
543 	{
544 		printf("Malloc failed\n");
545 		exit(176);
546 	}
547 	/*bzero(ce->myaiocb.aio_buf,(size_t)size);*/
548 	ce->myaiocb.aio_reqprio=0;
549 	ce->myaiocb.aio_nbytes=(size_t)size;
550 	ce->myaiocb.aio_sigevent.sigev_notify=SIGEV_NONE;
551 	ce->myaiocb.aio_lio_opcode=(int)op;
552 	ce->fd=(int)fd;
553 	ce->forward=0;
554 	ce->back=gc->tail;
555 	if(gc->tail)
556 		gc->tail->forward = ce;
557 	gc->tail= ce;
558 	if(!gc->head)
559 		gc->head=ce;
560 	gc->count++;
561 	return(ce);
562 }
563 
564 /************************************************************************
565  * This routine checks to see if the requested data is in the
566  * cache.
567 *************************************************************************/
568 #ifdef HAVE_ANSIC_C
569 struct cache_ent *
incache(struct cache * gc,long long fd,off64_t offset,long long size)570 incache(struct cache *gc, long long fd, off64_t offset, long long size)
571 #else
572 struct cache_ent *
573 incache(gc,fd,offset,size)
574 struct cache *gc;
575 long long fd,size;
576 off64_t offset;
577 #endif
578 {
579 	struct cache_ent *move;
580 	if(gc->head==0)
581 	{
582 		return(0);
583 	}
584 	move=gc->head;
585 	while(move)
586 	{
587 		if((move->fd == fd) && (move->myaiocb.aio_offset==(off64_t)offset) &&
588 			((size_t)size==move->myaiocb.aio_nbytes))
589 			{
590 				return(move);
591 			}
592 		move=move->forward;
593 	}
594 	return(0);
595 }
596 
597 /************************************************************************
598  * This routine removes a specific cache entry from the cache, and
599  * releases all memory associated witht the cache entry (if not direct).
600 *************************************************************************/
601 
602 void
takeoff_cache(struct cache * gc,struct cache_ent * ce)603 takeoff_cache(struct cache *gc, struct cache_ent *ce)
604 {
605 	struct cache_ent *move;
606 	long long found;
607 	move=gc->head;
608 	if(move==ce) /* Head of list */
609 	{
610 
611 		gc->head=ce->forward;
612 		if(gc->head)
613 			gc->head->back=0;
614 		else
615 			gc->tail = 0;
616 		if(!ce->direct)
617 		{
618 			if(ce->real_address != NULL)
619 			   free((void *)(ce->real_address));
620 			ce->real_address = NULL;
621 			if(ce != NULL)
622 			   free((void *)ce);
623 			ce = NULL;
624 		}
625 		gc->count--;
626 		return;
627 	}
628 	found=0;
629 	while(move)
630 	{
631 		if(move==ce)
632 		{
633 			if(move->forward)
634 			{
635 				move->forward->back=move->back;
636 			}
637 			if(move->back)
638 			{
639 				move->back->forward=move->forward;
640 			}
641 			found=1;
642 			break;
643 		}
644 		else
645 		{
646 			move=move->forward;
647 		}
648 	}
649 	if(gc->head == ce)
650 		gc->tail = ce;
651 	if(!found)
652 		printf("Internal Error in takeoff cache\n");
653 	move=gc->head;
654 	if(!ce->direct)
655 	{
656 		if(ce->real_address != NULL)
657 		   free((void *)(ce->real_address));
658 		ce->real_address = NULL;
659 		if(ce != NULL)
660 		   free((void *)ce);
661 		ce = NULL;
662 	}
663 	gc->count--;
664 }
665 
666 /************************************************************************
667  * This routine is used to purge the entire cache. This is called when
668  * the cache contains data but the incomming read was not able to
669  * be satisfied from the cache. This indicates that the previous
670  * async read-ahead was not correct and a new pattern is emerging.
671  ************************************************************************/
672 #ifdef HAVE_ANSIC_C
673 void
del_cache(struct cache * gc)674 del_cache(struct cache *gc)
675 #else
676 void
677 del_cache(gc)
678 struct cache *gc;
679 #endif
680 {
681 	struct cache_ent *ce;
682 	ssize_t ret;
683 	ce=gc->head;
684 	while(1)
685 	{
686 		ce=gc->head;
687 		if(ce==0)
688 			return;
689 		while((ret = aio_cancel(0,&ce->myaiocb))==AIO_NOTCANCELED)
690 			;
691 
692 		ret = aio_return(&ce->myaiocb);
693 		ce->direct=0;
694 		takeoff_cache(gc,ce);	  /* remove from cache */
695 	}
696 }
697 
698 /************************************************************************
699  * Like its sister async_read() this function performs async I/O for
700  * all buffers but it differs in that it expects the caller to
701  * request a pointer to the data to be returned instead of handing
702  * the function a location to put the data. This will allow the
703  * async I/O to be performed and does not require any bcopy to be
704  * done to put the data back into the location specified by the caller.
705  ************************************************************************/
706 #ifdef HAVE_ANSIC_C
707 int
async_read_no_copy(struct cache * gc,long long fd,char ** ubuffer,off64_t offset,long long size,long long stride,off64_t max,long long depth)708 async_read_no_copy(struct cache *gc, long long fd, char **ubuffer, off64_t offset, long long size, long long stride, off64_t max, long long depth)
709 #else
710 int
711 async_read_no_copy(gc, fd, ubuffer, offset, size, stride, max, depth)
712 struct cache *gc;
713 long long fd;
714 char **ubuffer;
715 off64_t offset;
716 long long size;
717 long long stride;
718 off64_t max;
719 long long depth;
720 #endif
721 {
722 	off64_t a_offset,r_offset;
723 	long long a_size;
724 	struct cache_ent *ce,*first_ce=0;
725 	long long i;
726 	ssize_t retval=0;
727 	ssize_t ret;
728 	long long del_read=0;
729 	long long start=0;
730 
731 	a_offset=offset;
732 	a_size = size;
733 	/*
734 	 * Check to see if it can be completed from the cache
735 	 */
736 	if((ce=(struct cache_ent *)incache(gc,fd,offset,size)))
737 	{
738 		while((ret=aio_error(&ce->myaiocb))== EINPROGRESS)
739 		{
740 			async_suspend(ce);
741 		}
742 		if(ret)
743 			printf("aio_error 3: ret %zd %d\n",ret,errno);
744 			printf("It changed in flight\n");
745 
746 		retval=aio_return(&ce->myaiocb);
747 		if(retval > 0)
748 		{
749 			*ubuffer= (char *)ce->myaiocb.aio_buf;
750 		}else
751 			*ubuffer= NULL;
752 		if(retval < ce->myaiocb.aio_nbytes)
753 		{
754 			printf("aio_return error4: ret %zd %d\n",retval,errno);
755 			printf("aio_return error4: fd %d offset %lld buffer %p size %zd Opcode %d\n",
756 				ce->myaiocb.aio_fildes,
757 				(long long)ce->myaiocb.aio_offset,
758 				ce->myaiocb.aio_buf,
759 				ce->myaiocb.aio_nbytes,
760 				ce->myaiocb.aio_lio_opcode
761 				);
762 		}
763 		ce->direct=1;
764 		takeoff_cache(gc,ce); /* do not delete buffer*/
765 		putoninuse(gc,ce);
766 	}else
767 	{
768 		/*
769 		 * Clear the cache and issue the first request async()
770 		 */
771 		del_cache(gc);
772 		del_read++;
773 		first_ce=alloc_cache(gc,fd,offset,size,(long long)LIO_READ); /* allocate buffer */
774 		/*printf("allocated buffer/read %x offset %d\n",first_ce->myaiocb.aio_buf,offset);*/
775 again:
776 		first_ce->oldbuf=first_ce->myaiocb.aio_buf;
777 		first_ce->oldfd=first_ce->myaiocb.aio_fildes;
778 		first_ce->oldsize=first_ce->myaiocb.aio_nbytes;
779 		ret=aio_read(&first_ce->myaiocb);
780 		if(ret!=0)
781 		{
782 			if(errno==EAGAIN)
783 				goto again;
784 			else
785 				printf("error returned from aio_read(). Ret %zd errno %d\n",ret,errno);
786 		}
787 	}
788 	if(stride==0)	 /* User does not want read-ahead */
789 		goto out;
790 	if(a_offset<0)	/* Before beginning of file */
791 		goto out;
792 	if(a_offset+size>max)	/* After end of file */
793 		goto out;
794 	if(depth >=(max_depth-1))
795 		depth=max_depth-1;
796 	if(depth==0)
797 		goto out;
798 	if(gc->count > 1)
799 		start=depth-1;
800 	for(i=start;i<depth;i++)	/* Issue read-aheads for the depth specified */
801 	{
802 		r_offset=a_offset+((i+1)*(stride*a_size));
803 		if(r_offset<0)
804 			continue;
805 		if(r_offset+size > max)
806 			continue;
807 		if((ce=incache(gc,fd,r_offset,a_size)))
808 			continue;
809 		ce=alloc_cache(gc,fd,r_offset,a_size,(long long)LIO_READ);
810 		ce->oldbuf=ce->myaiocb.aio_buf;
811 		ce->oldfd=ce->myaiocb.aio_fildes;
812 		ce->oldsize=ce->myaiocb.aio_nbytes;
813 		ret=aio_read(&ce->myaiocb);
814 		if(ret!=0)
815 		{
816 			takeoff_cache(gc,ce);
817 			break;
818 		}
819 	}
820 out:
821 	if(del_read)	/* Wait for the first read to complete */
822 	{
823 		while((ret=aio_error(&first_ce->myaiocb))== EINPROGRESS)
824 		{
825 			async_suspend(first_ce);
826 		}
827 		if(ret)
828 			printf("aio_error 4: ret %zd %d\n",ret,errno);
829 		if(first_ce->oldbuf != first_ce->myaiocb.aio_buf ||
830 			first_ce->oldfd != first_ce->myaiocb.aio_fildes ||
831 			first_ce->oldsize != first_ce->myaiocb.aio_nbytes)
832 			printf("It changed in flight2\n");
833 		retval=aio_return(&first_ce->myaiocb);
834 		if(retval < first_ce->myaiocb.aio_nbytes)
835 		{
836 			printf("aio_return error5: ret %zd %d\n",retval,errno);
837 			printf("aio_return error5: fd %d offset %lld buffer %p size %zd Opcode %d\n",
838 				first_ce->myaiocb.aio_fildes,
839 				(long long)first_ce->myaiocb.aio_offset,
840 				first_ce->myaiocb.aio_buf,
841 				first_ce->myaiocb.aio_nbytes,
842 				first_ce->myaiocb.aio_lio_opcode
843 				);
844 		}
845 		if(retval > 0)
846 		{
847 			*ubuffer= (char *)first_ce->myaiocb.aio_buf;
848 		}else
849 			*ubuffer= NULL;
850 		first_ce->direct=1;	 /* do not delete the buffer */
851 		takeoff_cache(gc,first_ce);
852 		putoninuse(gc,first_ce);
853 	}
854 	return((int)retval);
855 }
856 
857 /************************************************************************
858  * The caller is now finished with the data that was provided so
859  * the library is now free to return the memory to the pool for later
860  * reuse.
861  ************************************************************************/
862 #ifdef HAVE_ANSIC_C
async_release(struct cache * gc)863 void async_release(struct cache *gc)
864 #else
865 void
866 async_release(gc)
867 struct cache *gc;
868 #endif
869 {
870 	takeoffinuse(gc);
871 }
872 
873 
874 /************************************************************************
875  * Put the buffer on the inuse list. When the user is finished with
876  * the buffer it will call back into async_release and the items on the
877  * inuse list will be deallocated.
878  ************************************************************************/
879 #ifdef HAVE_ANSIC_C
880 void
putoninuse(struct cache * gc,struct cache_ent * entry)881 putoninuse(struct cache *gc,struct cache_ent *entry)
882 #else
883 void
884 putoninuse(gc,entry)
885 struct cache *gc;
886 struct cache_ent *entry;
887 #endif
888 {
889 	if(gc->inuse_head)
890 		entry->forward=gc->inuse_head;
891 	else
892 		entry->forward=0;
893 	gc->inuse_head=entry;
894 }
895 
896 /************************************************************************
897  * This is called when the application is finished with the data that
898  * was provided. The memory may now be returned to the pool.
899  ************************************************************************/
900 #ifdef HAVE_ANSIC_C
901 void
takeoffinuse(struct cache * gc)902 takeoffinuse(struct cache *gc)
903 #else
904 void
905 takeoffinuse(gc)
906 struct cache *gc;
907 #endif
908 {
909 	struct cache_ent *ce;
910 	if(gc->inuse_head==0)
911 		printf("Takeoffinuse error\n");
912 	ce=gc->inuse_head;
913 	gc->inuse_head=gc->inuse_head->forward;
914 
915 	if(gc->inuse_head !=0)
916 		printf("Error in take off inuse\n");
917 	if(ce->real_address != NULL)
918 	   free((void*)(ce->real_address));
919 	ce->real_address = NULL;
920 	if(ce != NULL)
921 	   free(ce);
922 	ce = NULL;
923 }
924 
925 /*************************************************************************
926  * This routine is a generic async writer assist funtion. It takes
927  * the same calling parameters as write() but also extends the
928  * interface to include:
929  *
930  * offset ..... offset in the file.
931  * depth  ..... How much read-ahead do you want.
932  *
933  *************************************************************************/
934 #ifdef HAVE_ANSIC_C
935 size_t
async_write(struct cache * gc,long long fd,char * buffer,long long size,off64_t offset,long long depth)936 async_write(struct cache *gc,long long fd,char *buffer,long long size,off64_t offset,long long depth)
937 #else
938 size_t
939 async_write(gc,fd,buffer,size,offset,depth)
940 struct cache *gc;
941 long long fd,size;
942 char *buffer;
943 off64_t offset;
944 long long depth;
945 #endif
946 {
947 	struct cache_ent *ce;
948 	size_t ret;
949 	ce=allocate_write_buffer(gc,fd,offset,size,(long long)LIO_WRITE,depth,0LL,(char *)0,(char *)0);
950 	ce->direct=0;	 /* not direct. Lib supplies buffer and must free it */
951 	mbcopy(buffer,(char *)(ce->myaiocb.aio_buf),(size_t)size);
952 	async_put_on_write_queue(gc,ce);
953 	/*
954 	printf("asw: fd %d offset %lld, size %zd\n",ce->myaiocb.aio_fildes,
955 		ce->myaiocb.aio_offset,
956 		ce->myaiocb.aio_nbytes);
957 	*/
958 
959 again:
960 	ret=aio_write(&ce->myaiocb);
961 	if(ret==-1)
962 	{
963 		if(errno==EAGAIN)
964 		{
965 			async_wait_for_write(gc);
966 			goto again;
967 		}
968 		if(errno==0)
969 		{
970 			/* Compensate for bug in async library */
971 			async_wait_for_write(gc);
972 			goto again;
973 		}
974 		else
975 		{
976 			printf("Error in aio_write: ret %zd errno %d count %lld\n",ret,errno,gc->w_count);
977 			/*
978 			printf("aio_write_no_copy: fd %d buffer %x offset %lld size %zd\n",
979 				ce->myaiocb.aio_fildes,
980 				ce->myaiocb.aio_buf,
981 				ce->myaiocb.aio_offset,
982 				ce->myaiocb.aio_nbytes);
983 			*/
984 			exit(177);
985 		}
986 	}
987 	return((ssize_t)size);
988 }
989 
990 /*************************************************************************
991  * Allocate a write aiocb and write buffer of the size specified. Also
992  * put some extra buffer padding so that VX_DIRECT can do its job when
993  * needed.
994  *************************************************************************/
995 
996 #ifdef HAVE_ANSIC_C
997 struct cache_ent *
allocate_write_buffer(struct cache * gc,long long fd,long long offset,long long size,long long op,long long w_depth,long long direct,char * buffer,char * free_addr)998 allocate_write_buffer( struct cache *gc, long long fd, long long offset, long long size,long long op,
999 	long long w_depth, long long direct, char *buffer, char *free_addr)
1000 #else
1001 struct cache_ent *
1002 allocate_write_buffer(gc,fd,offset,size,op,w_depth,direct,buffer,free_addr)
1003 struct cache *gc;
1004 long long fd,size,op;
1005 off64_t offset;
1006 long long w_depth;
1007 long long direct;
1008 char *buffer,*free_addr;
1009 #endif
1010 {
1011 	struct cache_ent *ce;
1012 	intptr_t temp;
1013 	if(fd==0LL)
1014 	{
1015 		printf("Setting up write buffer insane\n");
1016 		exit(178);
1017 	}
1018 	if(gc->w_count > w_depth)
1019 		async_wait_for_write(gc);
1020 	ce=(struct cache_ent *)malloc((size_t)sizeof(struct cache_ent));
1021 	if(ce == (struct cache_ent *)0)
1022 	{
1023 		printf("Malloc failed 1\n");
1024 		exit(179);
1025 	}
1026 	bzero(ce,sizeof(struct cache_ent));
1027 	ce->myaiocb.aio_fildes=(int)fd;
1028 	ce->myaiocb.aio_offset=(off_t)offset;
1029 	if(!direct)
1030 	{
1031 		ce->real_address = malloc((size_t)(size+page_size));
1032 		temp = (intptr_t)ce->real_address;
1033 		temp = (temp+(page_size-1)) & ~(page_size-1);
1034 		ce->myaiocb.aio_buf=(volatile void *)temp;
1035 	}
1036 	else
1037 	{
1038 		ce->myaiocb.aio_buf=(volatile void *)buffer;
1039 		ce->real_address=(char *)free_addr;
1040 	}
1041 	if(ce->myaiocb.aio_buf == 0)
1042 	{
1043 		printf("Malloc failed 2\n");
1044 		exit(180);
1045 	}
1046 	ce->myaiocb.aio_reqprio=0;
1047 	ce->myaiocb.aio_nbytes=(size_t)size;
1048 	ce->myaiocb.aio_sigevent.sigev_notify=SIGEV_NONE;
1049 	ce->myaiocb.aio_lio_opcode=(int)op;
1050 	ce->fd=(int)fd;
1051 	return(ce);
1052 }
1053 
1054 /*************************************************************************
1055  * Put it on the outbound queue.
1056  *************************************************************************/
1057 
1058 #ifdef HAVE_ANSIC_C
1059 void
async_put_on_write_queue(struct cache * gc,struct cache_ent * ce)1060 async_put_on_write_queue(struct cache *gc,struct cache_ent *ce)
1061 #else
1062 void
1063 async_put_on_write_queue(gc,ce)
1064 struct cache *gc;
1065 struct cache_ent *ce;
1066 #endif
1067 {
1068 	ce->forward=0;
1069 	ce->back=gc->w_tail;
1070 	if(gc->w_tail)
1071 		gc->w_tail->forward = ce;
1072 	gc->w_tail= ce;
1073 	if(!gc->w_head)
1074 		gc->w_head=ce;
1075 	gc->w_count++;
1076 	return;
1077 }
1078 
1079 /*************************************************************************
1080  * Cleanup all outstanding writes
1081  *************************************************************************/
1082 #ifdef HAVE_AHSIC_C
1083 void
async_write_finish(struct cache * gc)1084 async_write_finish(struct cache *gc)
1085 #else
1086 void
1087 async_write_finish(gc)
1088 struct cache *gc;
1089 #endif
1090 {
1091 	while(gc->w_head)
1092 	{
1093 		async_wait_for_write(gc);
1094 	}
1095 }
1096 
1097 /*************************************************************************
1098  * Wait for an I/O to finish
1099  *************************************************************************/
1100 
1101 #ifdef HAVE_ANSIC_C
1102 void
async_wait_for_write(struct cache * gc)1103 async_wait_for_write(struct cache *gc)
1104 #else
1105 void
1106 async_wait_for_write(gc)
1107 struct cache *gc;
1108 #endif
1109 {
1110 	struct cache_ent *ce;
1111 	size_t ret;
1112 	int retval;
1113 	if(gc->w_head==0)
1114 		return;
1115 	ce=gc->w_head;
1116         if(ce == NULL)
1117 		return;
1118 	gc->w_head=ce->forward;
1119 	gc->w_count--;
1120 	ce->forward=NULL;
1121 	if(ce==gc->w_tail)
1122 		gc->w_tail=0;
1123 	/*printf("Wait for buffer %x  offset %lld  size %zd to finish\n",
1124 		ce->myaiocb.aio_buf,
1125 		ce->myaiocb.aio_offset,
1126 		ce->myaiocb.aio_nbytes);
1127 	printf("write count %lld \n",gc->w_count);
1128 	*/
1129 	while((ret=aio_error(&ce->myaiocb))== EINPROGRESS)
1130 	{
1131 		async_suspend(ce);
1132 	}
1133 	if(ret)
1134 	{
1135 		printf("aio_error 5: ret %zd %d\n",ret,errno);
1136 		printf("fd %d offset %lld size %zd\n",
1137 			ce->myaiocb.aio_fildes,
1138 			(long long)ce->myaiocb.aio_offset,
1139 			ce->myaiocb.aio_nbytes);
1140 		exit(181);
1141 	}
1142 
1143 	retval=aio_return(&ce->myaiocb);
1144 	if(retval < 0)
1145 	{
1146 		printf("aio_return error: %d\n",errno);
1147 	}
1148 
1149 	if(!ce->direct)
1150 	{
1151 		if(ce->real_address != NULL)
1152 		   free((void *)(ce->real_address)); /* Causes crash. */
1153 		ce->real_address=NULL;
1154 		if(ce != NULL)
1155 		   free((void *)ce);
1156 		ce=NULL;
1157 	}
1158 
1159 }
1160 
1161 /*************************************************************************
1162  * This routine is a generic async writer assist funtion. It takes
1163  * the same calling parameters as write() but also extends the
1164  * interface to include:
1165  *
1166  * offset ..... offset in the file.
1167  * depth  ..... How much read-ahead do you want.
1168  * free_addr .. address of memory to free after write is completed.
1169  *
1170  *************************************************************************/
1171 #ifdef HAVE_ANSIC_C
1172 size_t
async_write_no_copy(struct cache * gc,long long fd,char * buffer,long long size,long long offset,long long depth,char * free_addr)1173 async_write_no_copy(struct cache *gc,long long fd,char *buffer,long long size,long long offset,long long depth,char *free_addr)
1174 #else
1175 size_t
1176 async_write_no_copy(gc,fd,buffer,size,offset,depth,free_addr)
1177 struct cache *gc;
1178 long long fd,size;
1179 char *buffer;
1180 long long offset;
1181 long long depth;
1182 char *free_addr;
1183 #endif
1184 {
1185 	struct cache_ent *ce;
1186 	size_t ret;
1187 	long long direct = 1;
1188 	ce=allocate_write_buffer(gc,fd,offset,size,(long long)LIO_WRITE,depth,direct,buffer,free_addr);
1189 	ce->direct=0;	/* have library de-allocate the buffer */
1190 	async_put_on_write_queue(gc,ce);
1191 	/*
1192 	printf("awnc: fd %d offset %lld, size %zd\n",ce->myaiocb.aio_fildes,
1193 		ce->myaiocb.aio_offset,
1194 		ce->myaiocb.aio_nbytes);
1195 	*/
1196 
1197 again:
1198 	ret=aio_write(&ce->myaiocb);
1199 	if(ret==-1)
1200 	{
1201 		if(errno==EAGAIN)
1202 		{
1203 			async_wait_for_write(gc);
1204 			goto again;
1205 		}
1206 		if(errno==0)
1207 		{
1208 			/* Compensate for bug in async library */
1209 			async_wait_for_write(gc);
1210 			goto again;
1211 		}
1212 		else
1213 		{
1214 			printf("Error in aio_write: ret %zd errno %d\n",ret,errno);
1215 			printf("aio_write_no_copy: fd %d buffer %p offset %lld size %zd\n",
1216 				ce->myaiocb.aio_fildes,
1217 				ce->myaiocb.aio_buf,
1218 				(long long)ce->myaiocb.aio_offset,
1219 				ce->myaiocb.aio_nbytes);
1220 			exit(182);
1221 		}
1222 	}
1223 	else
1224 	{
1225 		return((ssize_t)size);
1226 	}
1227 }
1228 
mbcopy(source,dest,len)1229 void mbcopy(source, dest, len)
1230 const char *source;
1231 char *dest;
1232 size_t len;
1233 {
1234 	int i;
1235 	for(i=0;i<len;i++)
1236 		*dest++=*source++;
1237 }
1238 
1239