1 // ==========================================================================
2 //                 SeqAn - The Library for Sequence Analysis
3 // ==========================================================================
4 // Copyright (c) 2006-2010, Knut Reinert, FU Berlin
5 // All rights reserved.
6 //
7 // Redistribution and use in source and binary forms, with or without
8 // modification, are permitted provided that the following conditions are met:
9 //
10 //     * Redistributions of source code must retain the above copyright
11 //       notice, this list of conditions and the following disclaimer.
12 //     * Redistributions in binary form must reproduce the above copyright
13 //       notice, this list of conditions and the following disclaimer in the
14 //       documentation and/or other materials provided with the distribution.
15 //     * Neither the name of Knut Reinert or the FU Berlin nor the names of
16 //       its contributors may be used to endorse or promote products derived
17 //       from this software without specific prior written permission.
18 //
19 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
22 // ARE DISCLAIMED. IN NO EVENT SHALL KNUT REINERT OR THE FU BERLIN BE LIABLE
23 // FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24 // DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25 // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26 // CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
27 // LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY
28 // OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH
29 // DAMAGE.
30 //
31 // ==========================================================================
32 // Author: David Weese <david.weese@fu-berlin.de>
33 // ==========================================================================
34 
35 //SEQAN_NO_GENERATED_FORWARDS: no forwards are generated for this file
36 
37 #ifndef SEQAN_HEADER_FILE_ASYNC_H
38 #define SEQAN_HEADER_FILE_ASYNC_H
39 
40 namespace SEQAN_NAMESPACE_MAIN
41 {
42 
43 
44 	template <typename TSpec /* = void */>
45 	struct Async;
46 
47 
48 #ifdef PLATFORM_WINDOWS
49 
50 
51     static DWORD _transferedBytes;  // for reporting
52 
53 	template <typename TSpec>
54 	class File<Async<TSpec> >
55     {
56     public:
57 
58         typedef LONGLONG    FilePtr;
59         typedef ULONGLONG   SizeType;
60         typedef DWORD       SizeType_;
61         typedef HANDLE      Handle;
62 
63 		Handle              handle, handleAsync;
64         bool                noBuffering;
65 
File()66         File():
67             handle(INVALID_HANDLE_VALUE) {}
68 
File(void *)69         File(void *): // to be compatible with the FILE*(NULL) constructor
70             handle(INVALID_HANDLE_VALUE) {}
71 
72         bool open(char const *fileName, int openMode = DefaultOpenMode<File>::VALUE) {
73 			SEQAN_PROADD(SEQAN_PROOPENFILES, 1);
74             noBuffering = (getExtraFlags(openMode | OPEN_ASYNC) & (FILE_FLAG_NO_BUFFERING | FILE_FLAG_OVERLAPPED)) != 0;
75             handleAsync = CreateFileA(fileName,
76                                 getFileAccess(openMode | OPEN_ASYNC),
77                                 FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE,
78                                 NULL,
79                                 getCreationFlags(openMode | OPEN_ASYNC),
80                                 getExtraFlags(openMode | OPEN_ASYNC),
81                                 NULL);
82 
83             if (handleAsync == INVALID_HANDLE_VALUE) {
84 				if (!(openMode & OPEN_QUIET))
85 					::std::cerr << "Open failed on file " << fileName << ". (ErrNo=" << GetLastError() << ")" << ::std::endl;
86                 return false;
87             }
88             #ifdef SEQAN_VERBOSE
89 				if (!(openMode & OPEN_QUIET))
90 	                ::std::cerr << "file opened asynchronously " << fileName << " handle " << ::std::hex << handleAsync << ::std::dec << ::std::endl;
91             #endif
92 
93             if (noBuffering) {
94                 handle = CreateFileA(fileName,                // in this case io must be sector aligned
95                                 getFileAccess(openMode),    // so we open a second file, for unaligned access
96                                 FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE,
97                                 NULL,
98                                 OPEN_EXISTING,
99                                 getExtraFlags(openMode & ~OPEN_ASYNC),
100                                 NULL);
101                 if (handle == INVALID_HANDLE_VALUE) {
102 					if (!(openMode & OPEN_QUIET))
103 	                	::std::cerr << "Open failed on secondary file " << fileName << ". (ErrNo=" << GetLastError() << ")" << ::std::endl;
104                     return false;
105                 }
106 	            #ifdef SEQAN_VERBOSE
107 					if (!(openMode & OPEN_QUIET))
108 	                	::std::cerr << "async file opened  " << fileName << " handle " << ::std::hex << handle << ::std::dec << ::std::endl;
109                 #endif
110             } else
111                 handle = handleAsync;
112 
113             return true;
114         }
115 
116         bool openTemp(int openMode = DefaultOpenTempMode<File>::VALUE) {
117             char szTempName[MAX_PATH];
118 #ifdef SEQAN_DEFAULT_TMPDIR
119             static const char szTempPath[MAX_PATH] = SEQAN_DEFAULT_TMPDIR;
120 #else
121             char szTempPath[MAX_PATH];
122             if (!GetTempPathA(MAX_PATH, szTempPath)) {
123 				if (!(openMode & OPEN_QUIET))
124 					::std::cerr << "Couldn't get a temporary path name. (ErrNo=" << GetLastError() << ")" << ::std::endl;
125                 return false;
126             }
127 #endif
128             if (!GetTempFileNameA(szTempPath, "GNDX", 0, szTempName)) {
129 				if (!(openMode & OPEN_QUIET))
130 					::std::cerr << "Couldn't get a temporary file name. (ErrNo=" << GetLastError() << ")" << ::std::endl;
131                 return false;
132             }
133             return open(szTempName, openMode | OPEN_TEMPORARY);
134         }
135 
close()136         inline bool close() {
137             BOOL result = TRUE;
138             #ifdef SEQAN_VERBOSE
139                 ::std::cerr << "files closed handles " << ::std::hex << handleAsync << " and " << handle << ::std::dec << ::std::endl;
140             #endif
141             if (handle != handleAsync)
142                 result &= CloseHandle(handleAsync);
143             result &= CloseHandle(handle);
144             handleAsync = INVALID_HANDLE_VALUE;
145             handle = INVALID_HANDLE_VALUE;
146 			SEQAN_PROSUB(SEQAN_PROOPENFILES, 1);
147             return result != FALSE;
148         }
149 
read(void * memPtr,SizeType_ count)150         inline bool read(void *memPtr, SizeType_ count) const {
151             SEQAN_PROADD(SEQAN_PROIO, (count + SEQAN_PROPAGESIZE - 1) / SEQAN_PROPAGESIZE);
152             SEQAN_PROTIMESTART(tw);
153 		    bool result = ReadFile(handle, memPtr, count, &_transferedBytes, NULL) != 0;
154             SEQAN_PROADD(SEQAN_PROCWAIT, SEQAN_PROTIMEDIFF(tw));
155             return result;
156         }
157 
write(void const * memPtr,SizeType_ count)158         inline bool write(void const *memPtr, SizeType_ count) const {
159             SEQAN_PROADD(SEQAN_PROIO, (count + SEQAN_PROPAGESIZE - 1) / SEQAN_PROPAGESIZE);
160             SEQAN_PROTIMESTART(tw);
161 		    bool result = WriteFile(handle, memPtr, count, &_transferedBytes, NULL) != 0;
162             SEQAN_PROADD(SEQAN_PROCWAIT, SEQAN_PROTIMEDIFF(tw));
163             return result;
164         }
165 
166 		inline FilePtr seek(FilePtr _pos, DWORD origin = FILE_BEGIN) {
167 //          LARGE_INTEGER li = _pos;
168 //			return SetFilePointer(handleAsync, li.LowPart, &li.HighPart, MoveMethod);
169             LARGE_INTEGER new_pos, pos;
170             pos.QuadPart = _pos;
171             SetFilePointerEx(handle, pos, &new_pos, origin);
172 //            position = new_pos.QuadPart;
173             return new_pos.QuadPart;
174 		}
175 
tell()176 		inline FilePtr tell() {
177 			return seek(0, FILE_CURRENT);
178         }
179 
size()180 		inline FilePtr size() const {
181             LARGE_INTEGER result;
182             DWORD dwError, high;
183             result.LowPart = GetFileSize(handle, &high);
184             result.HighPart = high;
185             if (result.LowPart == INVALID_FILE_SIZE && (dwError = GetLastError()) != NO_ERROR) {
186 				::std::cerr << "Couldn't get file size. (ErrNo=" << dwError << ")" << ::std::endl;
187                 return 0;
188             }
189             return result.QuadPart;
190         }
191 
setEof()192         inline bool setEof() const {
193             return SetEndOfFile(handle) != FALSE;
194         }
195 
error()196 		inline static DWORD error() {
197 			return GetLastError();
198 		}
199 
200         operator bool () const {
201             return (handle != INVALID_HANDLE_VALUE) && (handleAsync != INVALID_HANDLE_VALUE);
202         }
203 
204     protected:
205 
getFileAccess(int openMode)206         DWORD getFileAccess(int openMode) {
207             switch (openMode & OPEN_MASK) {
208                 case OPEN_RDONLY:
209                     return GENERIC_READ;
210                 case OPEN_WRONLY:
211                     return GENERIC_WRITE;
212                 case OPEN_RDWR:
213                     return GENERIC_READ | GENERIC_WRITE;
214 				default:
215 					return 0;
216             }
217         }
218 
getCreationFlags(int openMode)219         DWORD getCreationFlags(int openMode) {
220             if (openMode & OPEN_CREATE)
221                 if (openMode & OPEN_APPEND)
222                     return OPEN_ALWAYS;
223                 else
224                     return CREATE_ALWAYS;
225             else
226                 return OPEN_EXISTING;
227         }
228 
getExtraFlags(int openMode)229         DWORD getExtraFlags(int openMode) {
230             DWORD extra = FILE_ATTRIBUTE_NORMAL | FILE_FLAG_RANDOM_ACCESS;// | FILE_FLAG_WRITE_THROUGH;
231             if (openMode & OPEN_ASYNC) {
232                 extra |= FILE_FLAG_OVERLAPPED;
233                 #ifdef SEQAN_DIRECTIO
234                     extra |= FILE_FLAG_NO_BUFFERING;
235                 #endif
236             }
237             if (openMode & OPEN_TEMPORARY)  extra |= FILE_FLAG_DELETE_ON_CLOSE;
238             return extra;
239         }
240 
241     };
242 
243 
244     //////////////////////////////////////////////////////////////////////////////
245     // (SeqAn adaption)
246     //////////////////////////////////////////////////////////////////////////////
247 
248     struct aiocb_win32 {
249         OVERLAPPED  overlapped;
250         Event       xmitDone;
251     };
252 
253 	template <typename TSpec>
254     struct AsyncRequest<File<Async<TSpec> > >
255     {
256         typedef aiocb_win32 Type;
257     };
258 /*
259 	template <typename TSpec>
260     struct aEvent<File<Async<TSpec> > >
261     {
262         typedef Event Type;
263     };
264 
265 
266 	template <typename TSpec>
267     struct aQueue<File<Async<TSpec> > >
268     {
269         typedef IOQueue Type;
270     };
271 
272 	template <typename TSpec>
273     struct aHint<File<Async<TSpec> > >
274     {
275         typedef typename aQueue<File<Async<TSpec> > >::Type::aHint Type;
276     };
277 
278 	template <typename TSpec>
279     struct aCallback<File<Async<TSpec> > >
280     {
281         typedef typename aQueue<File<Async<TSpec> > >::Type::aCallback Type;
282     };*/
283 
284 
285 	template <typename TSpec>
286     inline typename Size<File<Async<TSpec> > >::Type size(File<Async<TSpec> > &me) {
287         return me.size();
288     }
289 
290 	template <typename TSpec>
291     inline bool setEof(File<Async<TSpec> > &me) {
292         return me.setEof();
293     }
294 
295 	template <typename TSpec>
296     inline unsigned sectorSize(File<Async<TSpec> > const &) {
297         DWORD SpC, nofC, tnoC, aligning;
298         if (GetDiskFreeSpace(NULL, &SpC, &aligning, &nofC, &tnoC) == 0)  {
299             ::std::cerr << "Error " << GetLastError() << " while querying cluster size" << ::std::endl;
300             return 4096;
301         }
302         return aligning;
303     }
304 
305 
306     template < typename TSpec, typename TValue, typename TSize, typename TPos >
307     inline bool asyncReadAt(File<Async<TSpec> > & me, TValue *memPtr, TSize const count, TPos const fileOfs,
308         aiocb_win32 &request)
309     {
310         SEQAN_PROTIMESTART(tw);
311         LARGE_INTEGER ofs;
312         ofs.QuadPart = fileOfs;
313         ofs.QuadPart *= sizeof(TValue);
314         request.overlapped.Offset = ofs.LowPart;
315         request.overlapped.OffsetHigh = ofs.HighPart;
316         if (!request.xmitDone) open(request.xmitDone);
317         request.overlapped.hEvent = request.xmitDone.hEvent;
318         if (ReadFile(
319             me.handleAsync,
320             memPtr,
321             count * sizeof(TValue),
322             &ofs.LowPart,
323             &request.overlapped) || (me.error() == ERROR_IO_PENDING))
324         {
325             SEQAN_PROADD(SEQAN_PROIO, (sizeof(TValue) * count + SEQAN_PROPAGESIZE - 1) / SEQAN_PROPAGESIZE);
326             SEQAN_PROADD(SEQAN_PROIWAIT, SEQAN_PROTIMEDIFF(tw));
327             return true;
328         }
329         if (me.error() == ERROR_NO_SYSTEM_RESOURCES) {  // read synchronoulsy instead
330             #ifdef SEQAN_DEBUG_OR_TEST_
331             	::std::cerr << "Warning: Falling back to sync. read. :( " << ::std::endl;
332             #endif
333 			signal(request.xmitDone);
334             return readAt(me, memPtr, count, fileOfs);
335         }
336         return false;
337     }
338 
339     template < typename TSpec, typename TValue, typename TSize, typename TPos >
340     inline bool asyncWriteAt(File<Async<TSpec> > & me, TValue const *memPtr, TSize const count, TPos const fileOfs,
341         aiocb_win32 &request)
342     {
343         SEQAN_PROTIMESTART(tw);
344         LARGE_INTEGER ofs;
345         ofs.QuadPart = fileOfs;
346         ofs.QuadPart *= sizeof(TValue);
347         request.overlapped.Offset = ofs.LowPart;
348         request.overlapped.OffsetHigh = ofs.HighPart;
349         if (!request.xmitDone) open(request.xmitDone);
350         request.overlapped.hEvent = request.xmitDone.hEvent;
351         if (WriteFile(
352             me.handleAsync,
353             memPtr,
354             count * sizeof(TValue),
355             &ofs.LowPart,
356             &request.overlapped) || (me.error() == ERROR_IO_PENDING))
357         {
358             SEQAN_PROADD(SEQAN_PROIO, (sizeof(TValue) * count + SEQAN_PROPAGESIZE - 1) / SEQAN_PROPAGESIZE);
359             SEQAN_PROADD(SEQAN_PROIWAIT, SEQAN_PROTIMEDIFF(tw));
360             return true;
361         }
362         if (me.error() == ERROR_NO_SYSTEM_RESOURCES) {  // write synchronoulsy instead
363             #ifdef SEQAN_DEBUG_OR_TEST_
364             	::std::cerr << "Warning: Falling back to sync. write. :( " << ::std::endl;
365             #endif
366 			signal(request.xmitDone);
367             return writeAt(me, memPtr, count, fileOfs);
368         }
369         return false;
370     }
371 
372     //////////////////////////////////////////////////////////////////////
373     // queue specific functions
374 
375     inline bool waitFor(aiocb_win32 &request) {
376         SEQAN_PROTIMESTART(tw);
377 		if (!waitFor(request.xmitDone, 60000))
378             ::std::cerr << "waitFor timeout" << ::std::endl;
379         SEQAN_PROADD(SEQAN_PROCWAIT, SEQAN_PROTIMEDIFF(tw));
380         return true;
381 	}
382 
383     template < typename TTime >
384     inline bool waitFor(aiocb_win32 &request, TTime timeout_millis) {
385         SEQAN_PROTIMESTART(tw);
386 		bool result = waitFor(request.xmitDone, timeout_millis);
387         SEQAN_PROADD(SEQAN_PROCWAIT, SEQAN_PROTIMEDIFF(tw));
388         return result;
389 	}
390 
391 	template < typename TSize >
392 	inline TSize waitForAny(aiocb_win32 const * const contexts[], TSize count, DWORD timeout_millis = Event::Infinite) {
393         Event::Handle *handles = new Event::Handle[count];
394         for(TSize i = 0; i < count; ++i)
395             handles[i] = contexts[i]->xmitDone.hEvent;
396 
397         SEQAN_PROTIMESTART(tw);
398         DWORD result = WaitForMultipleObjects(count, handles, false, timeout_millis);
399         SEQAN_PROADD(SEQAN_PROCWAIT, SEQAN_PROTIMEDIFF(tw));
400 		delete[] handles;
401         if (/*result >= WAIT_OBJECT_0 && */result < WAIT_OBJECT_0 + count)
402     		return result - WAIT_OBJECT_0;
403         return count;
404 	}
405 
406 	template <typename TSpec>
407     inline bool cancel(File<Async<TSpec> > & me, aiocb_win32 const &request) {
408         return CancelIo(me.handleAsync);
409     }
410 
411 	template <typename TSpec>
412     inline bool flush(File<Async<TSpec> > & me) {
413 		if (me.handle != me.handleAsync)	// in case of equality no direct access was done -> no flush needed
414         	return FlushFileBuffers(me.handle) != 0;
415         else
416             return true;
417     }
418 
419     template < typename TSpec, typename AsyncRequest >
420     inline void release(File<Async<TSpec> > & me, AsyncRequest & request) { }
421 
422 
423 /*
424     //////////////////////////////////////////////////////////////////////
425     // callback based read/write
426 
427     template < typename TSpec, typename TValue, typename TSize,
428                typename aCallback, typename aHint >
429     inline typename AsyncRequest<File<Async<TSpec> > >::Type
430     asyncRead(File<Async<TSpec> > & me, TValue *memPtr, TSize const count,
431         aCallback* cb, aHint* hint)
432     {
433         DWORD bsize = (DWORD)(count * sizeof(TValue));
434         typename AsyncRequest<File<Async<TSpec> > >::Type request =
435             me.queue->asyncReadAt(
436                 me.handleAsync,
437                 me.position,
438                 memPtr,
439                 bsize,
440                 cb,
441                 hint);
442         me.position += bsize;
443         return request;
444     }
445 
446     template < typename TSpec, typename TValue, typename TSize,
447                typename aCallback, typename aHint >
448     inline typename AsyncRequest<File<Async<TSpec> > >::Type
449     asyncWrite(File<Async<TSpec> > & me, TValue const *memPtr, TSize const count,
450         aCallback* cb, aHint* hint)
451     {
452         DWORD bsize = (DWORD)(count * sizeof(TValue));
453         typename AsyncRequest<File<Async<TSpec> > >::Type request =
454             me.queue->asyncWriteAt(
455                 memPtr,
456                 me.handleAsync,
457                 me.position,
458                 bsize,
459                 cb,
460                 hint);
461         me.position += bsize;
462         return request;
463     }
464 
465     template < typename TSpec, typename TValue, typename TSize, typename TPos,
466                typename aCallback, typename aHint >
467     inline typename AsyncRequest<File<Async<TSpec> > >::Type
468     asyncReadAt(File<Async<TSpec> > & me, TValue *memPtr, TSize const count, TPos const fileOfs,
469         aCallback* cb, aHint* hint)
470     {
471         DWORD bsize = (DWORD)(count * sizeof(TValue));
472         return me.queue->asyncReadAt(
473             me.handleAsync,
474             fileOfs * sizeof(TValue),
475             memPtr,
476             bsize,
477             cb,
478             hint);
479     }
480 
481     template < typename TSpec, typename TValue, typename TSize, typename TPos,
482                typename aCallback, typename aHint >
483     inline typename AsyncRequest<File<Async<TSpec> > >::Type
484     asyncWriteAt(File<Async<TSpec> > & me, TValue *memPtr, TSize const count, TPos const fileOfs,
485         aCallback* cb, aHint* hint)
486     {
487         DWORD bsize = (DWORD)(count * sizeof(TValue));
488         return me.queue->asyncWriteAt(
489             memPtr,
490             me.handleAsync,
491             fileOfs * sizeof(TValue),
492             bsize,
493             cb,
494             hint);
495     }
496 
497 
498     //////////////////////////////////////////////////////////////////////
499     // event based read/write
500 
501     template < typename TSpec, typename TValue, typename TSize,
502                typename aEvent >
503     inline typename AsyncRequest<File<Async<TSpec> > >::Type
504     asyncRead(File<Async<TSpec> > & me, TValue *memPtr, TSize const count,
505         aEvent &event)
506     {
507         DWORD bsize = (DWORD)(count * sizeof(TValue));
508         typename AsyncRequest<File<Async<TSpec> > >::Type request =
509             me.queue->asyncReadAt(
510                 me.handleAsync,
511                 me.position,
512                 memPtr,
513                 bsize,
514                 event);
515         me.position += bsize;
516         return request;
517     }
518 
519     template < typename TSpec, typename TValue, typename TSize,
520                typename aEvent >
521     inline typename AsyncRequest<File<Async<TSpec> > >::Type
522     asyncWrite(File<Async<TSpec> > & me, TValue *memPtr, TSize const count,
523         aEvent &event)
524     {
525         DWORD bsize = (DWORD)(count * sizeof(TValue));
526         typename AsyncRequest<File<Async<TSpec> > >::Type request =
527             me.queue->asyncWriteAt(
528                 memPtr,
529                 me.handleAsync,
530                 me.position,
531                 bsize,
532                 event);
533         me.position += bsize;
534         return request;
535     }
536 
537     template < typename TSpec, typename TValue, typename TSize, typename TPos,
538                typename aEvent >
539     inline typename AsyncRequest<File<Async<TSpec> > >::Type
540     asyncReadAt(File<Async<TSpec> > & me, TValue *memPtr, TSize const count, TPos const fileOfs,
541         aEvent &event)
542     {
543         DWORD bsize = (DWORD)(count * sizeof(TValue));
544         return me.queue->asyncReadAt(
545             me.handleAsync,
546             fileOfs * sizeof(TValue),
547             memPtr,
548             bsize,
549             event);
550     }
551 
552     template < typename TSpec, TValue, typename TSize, typename TPos,
553                typename aEvent >
554     inline typename AsyncRequest<File<Async<TSpec> > >::Type
555     asyncWriteAt(File<Async<TSpec> > & me, TValue *memPtr, TSize const count, TPos const fileOfs,
556         aEvent &event)
557     {
558         DWORD bsize = (DWORD)(count * sizeof(TValue));
559         return me.queue->asyncWriteAt(
560             memPtr,
561             me.handleAsync,
562             fileOfs * sizeof(TValue),
563             bsize,
564             event);
565     }
566 
567 
568     //////////////////////////////////////////////////////////////////////
569     // queue specific functions
570 
571 	template <typename TSpec>
572     inline void flush(File<Async<TSpec> > & me) {
573         me.queue->flush();
574     }
575 
576     template < typename TSpec, typename AsyncRequest >
577     inline void release(File<Async<TSpec> > & me, AsyncRequest & request) {
578         me.queue->release(request);
579     }
580 */
581 
582 	//////////////////////////////////////////////////////////////////////////////
583 	// page aligned allocate for direct file io
584 
585     struct TagAllocateAligned_;	//< allocate page aligned memory for direct i/o access
586     typedef Tag<TagAllocateAligned_> const TagAllocateAligned;
587 
588 	template <typename T, typename TValue, typename TSize>
589 	inline void
590 	allocate(T const &,
591 			 TValue * & data,
592 			 TSize count,
593 			 TagAllocateAligned const)
594 	{
595 		data = (TValue *) VirtualAlloc(NULL, count * sizeof(TValue), MEM_COMMIT, PAGE_READWRITE);
596         if (data)
597             SEQAN_PROADD(SEQAN_PROMEMORY, count * sizeof(TValue));
598         else
599 			::std::cerr << "AlignAllocator: Could not allocate memory of size " << ::std::hex << count * sizeof(TValue) << ::std::dec << ". (ErrNo=" << GetLastError() << ")" << ::std::endl;
600 	}
601 
602 	//////////////////////////////////////////////////////////////////////////////
603 	// page aligned deallocate for direct file io
604 
605 	template <typename T, typename TValue, typename TSize>
606 	inline void
607 	deallocate( T const &,
608 				TValue * data,
609 				TSize count,
610 				TagAllocateAligned const)
611 	{
612 		if (data) {
613 			VirtualFree(data, 0, MEM_RELEASE);
614 			if (count)	// .. to use count if SEQAN_PROFILE is not defined
615 				SEQAN_PROSUB(SEQAN_PROMEMORY, count * sizeof(TValue));
616 		}
617 	}
618 
619 #else
620 
621 
622 	template <typename TSpec>
623     class File<Async<TSpec> > : public File<Sync<TSpec> >
624     {
625     public:
626 
627         typedef File<Sync<TSpec> >  Base;
628 
629         typedef off_t			FilePtr;
630 		typedef off_t           SizeType;   // type of file size
631         typedef size_t          SizeType_;  // type of transfer size (for read or write)
632 		typedef int				Handle;
633 
634         Handle handleAsync;
635 		using Base::handle;
636 
637 		File(void * = NULL): 	// to be compatible with the FILE*(NULL) constructor
638 			handleAsync(-1) {}
639 
640         virtual ~File() {}
641 
642         bool open(char const *fileName, int openMode = DefaultOpenMode<File>::VALUE) {
643             handle = ::open(fileName, Base::_getOFlag(openMode & ~OPEN_ASYNC), S_IREAD | S_IWRITE);
644 			if (handle == -1)
645 			{
646 				handleAsync = handle;
647 				if (!(openMode & OPEN_QUIET))
648 					::std::cerr << "Open failed on file " << fileName << ". (" << ::strerror(errno) << ")" << ::std::endl;
649 				return false;
650 			}
651 
652 			if (Base::_getOFlag(openMode | OPEN_ASYNC) & O_DIRECT)
653 			{
654 				handleAsync = ::open(fileName, Base::_getOFlag(openMode | (OPEN_ASYNC & ~OPEN_CREATE)), S_IREAD | S_IWRITE);
655 				if (handleAsync == -1 || errno == EINVAL) {	// fall back to cached access
656 					#ifdef SEQAN_DEBUG_OR_TEST_
657 						if (!(openMode & OPEN_QUIET))
658 							::std::cerr << "Warning: Direct access openening failed. (" << ::strerror(errno) << ")" << ::std::endl;
659 					#endif
660 					handleAsync = handle;
661 				}
662 				#ifdef SEQAN_DEBUG_OR_TEST_
663 				    else
664 						if (!(openMode & OPEN_QUIET))
665 							::std::cerr << "Direct access successfully initiated" << ::std::endl;
666 				#endif
667 			} else
668 				handleAsync = handle;
669 
670 			if (sizeof(FilePtr) < 8 && !(openMode & OPEN_QUIET))
671 				// To remove this warning, you have to options:
672 				// 1. include the following line before including anything in your application
673 				//    #define _FILE_OFFSET_BITS 64
674 				// 2. include <seqan/platform.h> or <seqan/sequence.h> before any other include
675 				::std::cerr << "WARNING: FilePtr is not 64bit wide" << ::std::endl;
676 
677 
678 			SEQAN_PROADD(SEQAN_PROOPENFILES, 1);
679             return true;
680         }
681 
682 		bool close() {
683 			bool result = true;
684 			if (handleAsync != handle && handleAsync != -1)
685 	            result &= (::close(handleAsync) == 0);
686             result &= (::close(handle) == 0);
687             handleAsync = -1;
688             handle = -1;
689 			SEQAN_PROSUB(SEQAN_PROOPENFILES, 1);
690             return result;
691         }
692     };
693 
694 
695     //////////////////////////////////////////////////////////////////////////////
696     // (SeqAn adaption)
697     //////////////////////////////////////////////////////////////////////////////
698 /*
699 	template <typename TSpec>
700     struct aQueue<File<Async<TSpec> > >
701     {
702         typedef void* Type;
703     };
704 */
705 
706 	template <typename TSpec>
707     struct AsyncRequest<File<Async<TSpec> > >
708     {
709 		typedef aiocb Type;
710     };
711 /*
712 	template <typename TSpec>
713     struct aEvent<File<Async<TSpec> > >
714     {
715         typedef aiocb Type;
716     };
717 */
718 
719     //////////////////////////////////////////////////////////////////////
720     // event based read/write
721 
722 //    enum { AsyncIOSignal_ = SIGIO };
723 
724 	inline void printRequest(aiocb &request, const char *_hint = NULL) {
725 		::std::cerr << ::std::hex;
726 		if (_hint)
727 			::std::cerr << _hint << ::std::endl;
728 		::std::cerr << "fildes:  " << request.aio_fildes << ::std::endl;
729 		::std::cerr << "buffer:  " << (unsigned long)request.aio_buf << ::std::endl;
730 		::std::cerr << "offset:  " << request.aio_offset<< ::std::endl;
731 		::std::cerr << "nbytes:  " << request.aio_nbytes << ::std::endl;
732 		::std::cerr << "event:   " << request.aio_sigevent.sigev_notify << ::std::endl;
733 		::std::cerr << "Raddr:   " << &request << ::std::endl;
734 		::std::cerr << ::std::dec;
735 	}
736 
737     template < typename TSpec, typename TValue, typename TSize, typename TPos >
738     bool asyncReadAt(File<Async<TSpec> > & me, TValue *memPtr, TSize const count, TPos const fileOfs,
739         aiocb &request)
740     {
741         SEQAN_PROTIMESTART(tw);
742         memset(&request, 0, sizeof(aiocb));
743         request.aio_fildes = me.handleAsync;
744         request.aio_buf = memPtr;
745         request.aio_offset = fileOfs;
746         request.aio_offset *= sizeof(TValue);
747         request.aio_nbytes = count * sizeof(TValue);
748         request.aio_sigevent.sigev_notify = SIGEV_NONE;
749 /*      request.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
750         request.aio_sigevent.sigev_signo = AsyncIOSignal_;
751         request.aio_sigevent.sigev_value.sival_ptr = &request;
752 		#ifdef SEQAN_VVERBOSE
753 			printRequest(request, "aio_read():");
754 		#endif
755 */      if (request.aio_nbytes == 0) return true;
756 		SEQAN_PROADD(SEQAN_PROIO, (request.aio_nbytes + SEQAN_PROPAGESIZE - 1) / SEQAN_PROPAGESIZE);
757 		int result = aio_read(&request);
758         SEQAN_PROADD(SEQAN_PROIWAIT, SEQAN_PROTIMEDIFF(tw));
759         if (result)
760 		{
761 			request.aio_nbytes = 0;
762 			if (errno == EAGAIN) {  // read synchronoulsy instead
763 #ifdef SEQAN_DEBUG_OR_TEST_
764             	::std::cerr << "Warning: Falling back to sync. read. :( " << ::std::endl;
765 #endif
766 				return readAt(me, memPtr, count, fileOfs);
767 			}
768 #ifdef SEQAN_DEBUG
769 			else
770 				::std::cerr << "asyncReadAt returned " << result << " and errno=" << errno << " " << ::strerror(errno) << ::std::endl;
771 #endif
772         }
773 		return result == 0;
774     }
775 
776     template < typename TSpec, typename TValue, typename TSize, typename TPos >
777     bool asyncWriteAt(File<Async<TSpec> > & me, const TValue *memPtr, TSize const count, TPos const fileOfs,
778         aiocb &request)
779     {
780         SEQAN_PROTIMESTART(tw);
781         memset(&request, 0, sizeof(aiocb));
782         request.aio_fildes = me.handleAsync;
783         request.aio_buf = const_cast<TValue*>(memPtr);
784         request.aio_offset = fileOfs;
785         request.aio_offset *= sizeof(TValue);
786         request.aio_nbytes = count * sizeof(TValue);
787         request.aio_sigevent.sigev_notify = SIGEV_NONE;
788 /*      request.aio_sigevent.sigev_notify = SIGEV_SIGNAL;
789         request.aio_sigevent.sigev_signo = AsyncIOSignal_;
790         request.aio_sigevent.sigev_value.sival_ptr = &request;
791 		#ifdef SEQAN_VVERBOSE
792 			printRequest(request, "aio_write():");
793 		#endif
794 */      if (request.aio_nbytes == 0) return true;
795 		SEQAN_PROADD(SEQAN_PROIO, (request.aio_nbytes + SEQAN_PROPAGESIZE - 1) / SEQAN_PROPAGESIZE);
796 		int result = aio_write(&request);
797         SEQAN_PROADD(SEQAN_PROIWAIT, SEQAN_PROTIMEDIFF(tw));
798         if (result)
799 		{
800 			request.aio_nbytes = 0;
801 			if (errno == EAGAIN) {  // write synchronoulsy instead
802 #ifdef SEQAN_DEBUG_OR_TEST_
803             	::std::cerr << "Warning: Falling back to sync. write. :( " << ::std::endl;
804 #endif
805 				return writeAt(me, memPtr, count, fileOfs);
806 			}
807 #ifdef SEQAN_DEBUG
808 			else
809 				::std::cerr << "asyncWriteAt returned " << result << " and errno=" << errno << " " << ::strerror(errno) << ::std::endl;
810 #endif
811         }
812         return result == 0;
813     }
814 
815 	template <typename TSpec>
816     inline bool flush(File<Async<TSpec> > & me) {
817 		#if _POSIX_SYNCHRONIZED_IO > 0
818 			return me.handle == me.handleAsync || fdatasync(me.handle) == 0;
819 		#else
820 			return me.handle == me.handleAsync || fsync(me.handle) == 0;
821 		#endif
822     }
823 
824     //////////////////////////////////////////////////////////////////////
825     // queue specific functions
826 
827 	inline bool waitFor(aiocb &request) {
828 /*		#ifdef SEQAN_VVERBOSE
829 			printRequest(request, "aio_suspend():");
830 		#endif
831 */
832 		if (request.aio_nbytes == 0) return true;
833 		aiocb * cblist = &request;
834         SEQAN_PROTIMESTART(tw);
835 		int result = aio_suspend(&cblist, 1, NULL);
836         SEQAN_PROADD(SEQAN_PROCWAIT, SEQAN_PROTIMEDIFF(tw));
837         #ifdef SEQAN_DEBUG
838 			if (result) {
839 	 			int eno = aio_error(&request);
840 				if (eno != EINPROGRESS)
841 					::std::cerr << "waitFor: aio_error returned " << ::strerror(eno) << " and errno=" << errno << " " << ::strerror(errno) << ::std::endl;
842 			}
843 		#endif
844 		return result == 0;
845 	}
846 
847 	inline bool waitFor(aiocb &request, long timeout_millis) {
848 /*		#ifdef SEQAN_VVERBOSE
849 			printRequest(request, "aio_suspend_timeout():");
850 		#endif
851 */
852 		if (request.aio_nbytes == 0) return true;
853 
854 		int result;
855 		if (timeout_millis == 0)
856 			result = aio_error(&request);
857 		else {
858 			aiocb * cblist = &request;
859 			timespec ts;
860 			ts.tv_sec = timeout_millis / 1000;
861 			ts.tv_nsec = (timeout_millis % 1000) * 1000;
862 			SEQAN_PROTIMESTART(tw);
863 			result = aio_suspend(&cblist, 1, &ts);
864 			SEQAN_PROADD(SEQAN_PROCWAIT, SEQAN_PROTIMEDIFF(tw));
865 		}
866 
867         #ifdef SEQAN_DEBUG
868 			if (result) {
869 	 			int eno = aio_error(&request);
870 				if (eno != EINPROGRESS)
871 					::std::cerr << "waitFor(timeOut=" << timeout_millis << "): aio_error returned " << ::strerror(eno) << " and errno=" << errno << " " << ::strerror(errno) << ::std::endl;
872 			}
873 		#endif
874         return result == 0;
875 	}
876 
877 	template < typename TSize >
878 	inline TSize waitForAny(aiocb const * const contexts[], TSize count) {
879         SEQAN_PROTIMESTART(tw);
880 		bool result = aio_suspend(contexts, count, NULL);
881         SEQAN_PROADD(SEQAN_PROCWAIT, SEQAN_PROTIMEDIFF(tw));
882         return result == 0;
883 	}
884 
885 	template < typename TSize >
886 	inline TSize waitForAny(aiocb const * const contexts[], TSize count, long timeout_millis) {
887         timespec ts;
888         ts.tv_sec = timeout_millis / 1000;
889         ts.tv_nsec = (timeout_millis % 1000) * 1000;
890         SEQAN_PROTIMESTART(tw);
891 		bool result = aio_suspend(contexts, count, &ts);
892         SEQAN_PROADD(SEQAN_PROCWAIT, SEQAN_PROTIMEDIFF(tw));
893         return result == 0;
894 	}
895 
896 	template <typename TSpec>
897     inline bool cancel(File<Async<TSpec> > & me, aiocb &request) {
898 /*		#ifdef SEQAN_VVERBOSE
899 			printRequest(request, "aio_cancel():");
900 		#endif
901 */      return aio_cancel(me.handleAsync, &request) == 0;
902     }
903 
904     inline int error(aiocb const & request) {
905         return aio_error(&request);
906     }
907 
908     inline int _returnValue(aiocb & request) {
909         return aio_return(&request);
910     }
911 
912 	template <typename TSpec>
913     inline void release(File<Async<TSpec> > & /*me*/, aiocb const & /*request*/) {}
914 
915 /*
916     typedef void (*sighandler_t)(int);
917     static unsigned AsyncIOHandlerRefCount_ = 0;
918     static struct sigaction AsyncIOOldSig_;
919 
920     inline void AsyncIOHandler_(int sigNo, siginfo_t *info, void *hint) {
921         SEQAN_ASSERT(sigNo == AsyncIOSignal_);
922         // TODO: signal respective event
923         // currently we don't need async IO handlers because
924         // we only wait for single events
925     }
926 
927     static sighandler_t _addAsyncIOHandler() {
928         struct sigaction newSig, oldSig;
929         newSig.sa_sigaction = AsyncIOHandler_;
930         sigemptyset(&newSig.sa_mask);
931         newSig.sa_flags = SA_RESTART + SA_SIGINFO;
932         if (sigaction(AsyncIOSignal_, &newSig, &oldSig) < 0)
933             return SIG_ERR;
934         return oldSig.sa_handler;
935     }
936 */
937 
938 	//////////////////////////////////////////////////////////////////////////////
939 	// page aligned allocate for direct file io
940 
941     struct TagAllocateAligned_;	//< allocate page aligned memory for direct i/o access
942     typedef Tag<TagAllocateAligned_> const TagAllocateAligned;
943 
944 	template <typename T, typename TValue, typename TSize>
945 	inline void
946 	allocate(T const & /*me*/,
947 			 TValue * & data,
948 			 TSize count,
949 			 TagAllocateAligned const)
950 	{
951 		data = (TValue *) ::valloc(count * sizeof(TValue));
952 #ifdef SEQAN_PROFILE
953         if (data)
954 			SEQAN_PROADD(SEQAN_PROMEMORY, count * sizeof(TValue));
955 		else
956 			::std::cerr << "AlignAllocator: Could not allocate memory of size " << ::std::hex <<
957 				count * sizeof(TValue) << " with page alignment. (ErrNo=" << ::std::dec <<
958 				errno << ")" << ::std::endl;
959 #endif
960 	}
961 
962 	//////////////////////////////////////////////////////////////////////////////
963 	// page aligned deallocate for direct file io
964 
965 	template <typename T, typename TValue, typename TSize>
966 	inline void
967 	deallocate( T const & /*me*/,
968 				TValue * data,
969 				TSize
970 #ifdef SEQAN_PROFILE
971 					count
972 #endif
973 					,
974 				TagAllocateAligned const)
975 	{
976 #ifdef SEQAN_PROFILE
977         if (data && count)	// .. to use count if SEQAN_PROFILE is not defined
978 			SEQAN_PROSUB(SEQAN_PROMEMORY, count * sizeof(TValue));
979 #endif
980 		::free(data);
981 	}
982 
983     template < typename TSpec, typename TSize >
984     inline void resize(File<Async<TSpec> > &me, TSize new_length) {
985 		me.resize(new_length);
986     }
987 
988 
989 #endif
990 
991     //////////////////////////////////////////////////////////////////////////////
992     // global functions
993 
994 	template <typename TSpec>
995     struct Size< File<Async<TSpec> > >
996     {
997         typedef typename File<Async<TSpec> >::SizeType Type;
998     };
999 
1000 	template <typename TSpec>
1001     struct Position< File<Async<TSpec> > >
1002     {
1003         typedef typename File<Async<TSpec> >::FilePtr Type;
1004     };
1005 
1006 	template <typename TSpec>
1007     struct Difference< File<Async<TSpec> > >
1008     {
1009         typedef typename File<Async<TSpec> >::FilePtr Type;
1010     };
1011 
1012 
1013 
1014     template < typename TSpec, typename TValue, typename TSize>
1015 	inline void
1016 	allocate( File<Async<TSpec> > const & me,
1017 			  TValue * & data,
1018 			  TSize count)
1019 	{
1020 		allocate(me, data, count, TagAllocateAligned());
1021 	}
1022 
1023     template <typename TSpec, typename TValue, typename TSize>
1024 	inline void
1025 	deallocate( File<Async<TSpec> > const & me,
1026 				TValue * data,
1027 				TSize count)
1028 	{
1029 		deallocate(me, data, count, TagAllocateAligned());
1030 	}
1031 
1032 
1033 }
1034 
1035 #endif
1036