1 // ========================================================================== 2 // SeqAn - The Library for Sequence Analysis 3 // ========================================================================== 4 // Copyright (c) 2006-2010, Knut Reinert, FU Berlin 5 // All rights reserved. 6 // 7 // Redistribution and use in source and binary forms, with or without 8 // modification, are permitted provided that the following conditions are met: 9 // 10 // * Redistributions of source code must retain the above copyright 11 // notice, this list of conditions and the following disclaimer. 12 // * Redistributions in binary form must reproduce the above copyright 13 // notice, this list of conditions and the following disclaimer in the 14 // documentation and/or other materials provided with the distribution. 15 // * Neither the name of Knut Reinert or the FU Berlin nor the names of 16 // its contributors may be used to endorse or promote products derived 17 // from this software without specific prior written permission. 18 // 19 // THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 20 // AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 21 // IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 22 // ARE DISCLAIMED. IN NO EVENT SHALL KNUT REINERT OR THE FU BERLIN BE LIABLE 23 // FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 24 // DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR 25 // SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER 26 // CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT 27 // LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY 28 // OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH 29 // DAMAGE. 30 // 31 // ========================================================================== 32 // Author: David Weese <david.weese@fu-berlin.de> 33 // ========================================================================== 34 35 //SEQAN_NO_GENERATED_FORWARDS: no forwards are generated for this file 36 37 #ifndef SEQAN_HEADER_FILE_ASYNC_H 38 #define SEQAN_HEADER_FILE_ASYNC_H 39 40 namespace SEQAN_NAMESPACE_MAIN 41 { 42 43 44 template <typename TSpec /* = void */> 45 struct Async; 46 47 48 #ifdef PLATFORM_WINDOWS 49 50 51 static DWORD _transferedBytes; // for reporting 52 53 template <typename TSpec> 54 class File<Async<TSpec> > 55 { 56 public: 57 58 typedef LONGLONG FilePtr; 59 typedef ULONGLONG SizeType; 60 typedef DWORD SizeType_; 61 typedef HANDLE Handle; 62 63 Handle handle, handleAsync; 64 bool noBuffering; 65 File()66 File(): 67 handle(INVALID_HANDLE_VALUE) {} 68 File(void *)69 File(void *): // to be compatible with the FILE*(NULL) constructor 70 handle(INVALID_HANDLE_VALUE) {} 71 72 bool open(char const *fileName, int openMode = DefaultOpenMode<File>::VALUE) { 73 SEQAN_PROADD(SEQAN_PROOPENFILES, 1); 74 noBuffering = (getExtraFlags(openMode | OPEN_ASYNC) & (FILE_FLAG_NO_BUFFERING | FILE_FLAG_OVERLAPPED)) != 0; 75 handleAsync = CreateFileA(fileName, 76 getFileAccess(openMode | OPEN_ASYNC), 77 FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, 78 NULL, 79 getCreationFlags(openMode | OPEN_ASYNC), 80 getExtraFlags(openMode | OPEN_ASYNC), 81 NULL); 82 83 if (handleAsync == INVALID_HANDLE_VALUE) { 84 if (!(openMode & OPEN_QUIET)) 85 ::std::cerr << "Open failed on file " << fileName << ". (ErrNo=" << GetLastError() << ")" << ::std::endl; 86 return false; 87 } 88 #ifdef SEQAN_VERBOSE 89 if (!(openMode & OPEN_QUIET)) 90 ::std::cerr << "file opened asynchronously " << fileName << " handle " << ::std::hex << handleAsync << ::std::dec << ::std::endl; 91 #endif 92 93 if (noBuffering) { 94 handle = CreateFileA(fileName, // in this case io must be sector aligned 95 getFileAccess(openMode), // so we open a second file, for unaligned access 96 FILE_SHARE_DELETE | FILE_SHARE_READ | FILE_SHARE_WRITE, 97 NULL, 98 OPEN_EXISTING, 99 getExtraFlags(openMode & ~OPEN_ASYNC), 100 NULL); 101 if (handle == INVALID_HANDLE_VALUE) { 102 if (!(openMode & OPEN_QUIET)) 103 ::std::cerr << "Open failed on secondary file " << fileName << ". (ErrNo=" << GetLastError() << ")" << ::std::endl; 104 return false; 105 } 106 #ifdef SEQAN_VERBOSE 107 if (!(openMode & OPEN_QUIET)) 108 ::std::cerr << "async file opened " << fileName << " handle " << ::std::hex << handle << ::std::dec << ::std::endl; 109 #endif 110 } else 111 handle = handleAsync; 112 113 return true; 114 } 115 116 bool openTemp(int openMode = DefaultOpenTempMode<File>::VALUE) { 117 char szTempName[MAX_PATH]; 118 #ifdef SEQAN_DEFAULT_TMPDIR 119 static const char szTempPath[MAX_PATH] = SEQAN_DEFAULT_TMPDIR; 120 #else 121 char szTempPath[MAX_PATH]; 122 if (!GetTempPathA(MAX_PATH, szTempPath)) { 123 if (!(openMode & OPEN_QUIET)) 124 ::std::cerr << "Couldn't get a temporary path name. (ErrNo=" << GetLastError() << ")" << ::std::endl; 125 return false; 126 } 127 #endif 128 if (!GetTempFileNameA(szTempPath, "GNDX", 0, szTempName)) { 129 if (!(openMode & OPEN_QUIET)) 130 ::std::cerr << "Couldn't get a temporary file name. (ErrNo=" << GetLastError() << ")" << ::std::endl; 131 return false; 132 } 133 return open(szTempName, openMode | OPEN_TEMPORARY); 134 } 135 close()136 inline bool close() { 137 BOOL result = TRUE; 138 #ifdef SEQAN_VERBOSE 139 ::std::cerr << "files closed handles " << ::std::hex << handleAsync << " and " << handle << ::std::dec << ::std::endl; 140 #endif 141 if (handle != handleAsync) 142 result &= CloseHandle(handleAsync); 143 result &= CloseHandle(handle); 144 handleAsync = INVALID_HANDLE_VALUE; 145 handle = INVALID_HANDLE_VALUE; 146 SEQAN_PROSUB(SEQAN_PROOPENFILES, 1); 147 return result != FALSE; 148 } 149 read(void * memPtr,SizeType_ count)150 inline bool read(void *memPtr, SizeType_ count) const { 151 SEQAN_PROADD(SEQAN_PROIO, (count + SEQAN_PROPAGESIZE - 1) / SEQAN_PROPAGESIZE); 152 SEQAN_PROTIMESTART(tw); 153 bool result = ReadFile(handle, memPtr, count, &_transferedBytes, NULL) != 0; 154 SEQAN_PROADD(SEQAN_PROCWAIT, SEQAN_PROTIMEDIFF(tw)); 155 return result; 156 } 157 write(void const * memPtr,SizeType_ count)158 inline bool write(void const *memPtr, SizeType_ count) const { 159 SEQAN_PROADD(SEQAN_PROIO, (count + SEQAN_PROPAGESIZE - 1) / SEQAN_PROPAGESIZE); 160 SEQAN_PROTIMESTART(tw); 161 bool result = WriteFile(handle, memPtr, count, &_transferedBytes, NULL) != 0; 162 SEQAN_PROADD(SEQAN_PROCWAIT, SEQAN_PROTIMEDIFF(tw)); 163 return result; 164 } 165 166 inline FilePtr seek(FilePtr _pos, DWORD origin = FILE_BEGIN) { 167 // LARGE_INTEGER li = _pos; 168 // return SetFilePointer(handleAsync, li.LowPart, &li.HighPart, MoveMethod); 169 LARGE_INTEGER new_pos, pos; 170 pos.QuadPart = _pos; 171 SetFilePointerEx(handle, pos, &new_pos, origin); 172 // position = new_pos.QuadPart; 173 return new_pos.QuadPart; 174 } 175 tell()176 inline FilePtr tell() { 177 return seek(0, FILE_CURRENT); 178 } 179 size()180 inline FilePtr size() const { 181 LARGE_INTEGER result; 182 DWORD dwError, high; 183 result.LowPart = GetFileSize(handle, &high); 184 result.HighPart = high; 185 if (result.LowPart == INVALID_FILE_SIZE && (dwError = GetLastError()) != NO_ERROR) { 186 ::std::cerr << "Couldn't get file size. (ErrNo=" << dwError << ")" << ::std::endl; 187 return 0; 188 } 189 return result.QuadPart; 190 } 191 setEof()192 inline bool setEof() const { 193 return SetEndOfFile(handle) != FALSE; 194 } 195 error()196 inline static DWORD error() { 197 return GetLastError(); 198 } 199 200 operator bool () const { 201 return (handle != INVALID_HANDLE_VALUE) && (handleAsync != INVALID_HANDLE_VALUE); 202 } 203 204 protected: 205 getFileAccess(int openMode)206 DWORD getFileAccess(int openMode) { 207 switch (openMode & OPEN_MASK) { 208 case OPEN_RDONLY: 209 return GENERIC_READ; 210 case OPEN_WRONLY: 211 return GENERIC_WRITE; 212 case OPEN_RDWR: 213 return GENERIC_READ | GENERIC_WRITE; 214 default: 215 return 0; 216 } 217 } 218 getCreationFlags(int openMode)219 DWORD getCreationFlags(int openMode) { 220 if (openMode & OPEN_CREATE) 221 if (openMode & OPEN_APPEND) 222 return OPEN_ALWAYS; 223 else 224 return CREATE_ALWAYS; 225 else 226 return OPEN_EXISTING; 227 } 228 getExtraFlags(int openMode)229 DWORD getExtraFlags(int openMode) { 230 DWORD extra = FILE_ATTRIBUTE_NORMAL | FILE_FLAG_RANDOM_ACCESS;// | FILE_FLAG_WRITE_THROUGH; 231 if (openMode & OPEN_ASYNC) { 232 extra |= FILE_FLAG_OVERLAPPED; 233 #ifdef SEQAN_DIRECTIO 234 extra |= FILE_FLAG_NO_BUFFERING; 235 #endif 236 } 237 if (openMode & OPEN_TEMPORARY) extra |= FILE_FLAG_DELETE_ON_CLOSE; 238 return extra; 239 } 240 241 }; 242 243 244 ////////////////////////////////////////////////////////////////////////////// 245 // (SeqAn adaption) 246 ////////////////////////////////////////////////////////////////////////////// 247 248 struct aiocb_win32 { 249 OVERLAPPED overlapped; 250 Event xmitDone; 251 }; 252 253 template <typename TSpec> 254 struct AsyncRequest<File<Async<TSpec> > > 255 { 256 typedef aiocb_win32 Type; 257 }; 258 /* 259 template <typename TSpec> 260 struct aEvent<File<Async<TSpec> > > 261 { 262 typedef Event Type; 263 }; 264 265 266 template <typename TSpec> 267 struct aQueue<File<Async<TSpec> > > 268 { 269 typedef IOQueue Type; 270 }; 271 272 template <typename TSpec> 273 struct aHint<File<Async<TSpec> > > 274 { 275 typedef typename aQueue<File<Async<TSpec> > >::Type::aHint Type; 276 }; 277 278 template <typename TSpec> 279 struct aCallback<File<Async<TSpec> > > 280 { 281 typedef typename aQueue<File<Async<TSpec> > >::Type::aCallback Type; 282 };*/ 283 284 285 template <typename TSpec> 286 inline typename Size<File<Async<TSpec> > >::Type size(File<Async<TSpec> > &me) { 287 return me.size(); 288 } 289 290 template <typename TSpec> 291 inline bool setEof(File<Async<TSpec> > &me) { 292 return me.setEof(); 293 } 294 295 template <typename TSpec> 296 inline unsigned sectorSize(File<Async<TSpec> > const &) { 297 DWORD SpC, nofC, tnoC, aligning; 298 if (GetDiskFreeSpace(NULL, &SpC, &aligning, &nofC, &tnoC) == 0) { 299 ::std::cerr << "Error " << GetLastError() << " while querying cluster size" << ::std::endl; 300 return 4096; 301 } 302 return aligning; 303 } 304 305 306 template < typename TSpec, typename TValue, typename TSize, typename TPos > 307 inline bool asyncReadAt(File<Async<TSpec> > & me, TValue *memPtr, TSize const count, TPos const fileOfs, 308 aiocb_win32 &request) 309 { 310 SEQAN_PROTIMESTART(tw); 311 LARGE_INTEGER ofs; 312 ofs.QuadPart = fileOfs; 313 ofs.QuadPart *= sizeof(TValue); 314 request.overlapped.Offset = ofs.LowPart; 315 request.overlapped.OffsetHigh = ofs.HighPart; 316 if (!request.xmitDone) open(request.xmitDone); 317 request.overlapped.hEvent = request.xmitDone.hEvent; 318 if (ReadFile( 319 me.handleAsync, 320 memPtr, 321 count * sizeof(TValue), 322 &ofs.LowPart, 323 &request.overlapped) || (me.error() == ERROR_IO_PENDING)) 324 { 325 SEQAN_PROADD(SEQAN_PROIO, (sizeof(TValue) * count + SEQAN_PROPAGESIZE - 1) / SEQAN_PROPAGESIZE); 326 SEQAN_PROADD(SEQAN_PROIWAIT, SEQAN_PROTIMEDIFF(tw)); 327 return true; 328 } 329 if (me.error() == ERROR_NO_SYSTEM_RESOURCES) { // read synchronoulsy instead 330 #ifdef SEQAN_DEBUG_OR_TEST_ 331 ::std::cerr << "Warning: Falling back to sync. read. :( " << ::std::endl; 332 #endif 333 signal(request.xmitDone); 334 return readAt(me, memPtr, count, fileOfs); 335 } 336 return false; 337 } 338 339 template < typename TSpec, typename TValue, typename TSize, typename TPos > 340 inline bool asyncWriteAt(File<Async<TSpec> > & me, TValue const *memPtr, TSize const count, TPos const fileOfs, 341 aiocb_win32 &request) 342 { 343 SEQAN_PROTIMESTART(tw); 344 LARGE_INTEGER ofs; 345 ofs.QuadPart = fileOfs; 346 ofs.QuadPart *= sizeof(TValue); 347 request.overlapped.Offset = ofs.LowPart; 348 request.overlapped.OffsetHigh = ofs.HighPart; 349 if (!request.xmitDone) open(request.xmitDone); 350 request.overlapped.hEvent = request.xmitDone.hEvent; 351 if (WriteFile( 352 me.handleAsync, 353 memPtr, 354 count * sizeof(TValue), 355 &ofs.LowPart, 356 &request.overlapped) || (me.error() == ERROR_IO_PENDING)) 357 { 358 SEQAN_PROADD(SEQAN_PROIO, (sizeof(TValue) * count + SEQAN_PROPAGESIZE - 1) / SEQAN_PROPAGESIZE); 359 SEQAN_PROADD(SEQAN_PROIWAIT, SEQAN_PROTIMEDIFF(tw)); 360 return true; 361 } 362 if (me.error() == ERROR_NO_SYSTEM_RESOURCES) { // write synchronoulsy instead 363 #ifdef SEQAN_DEBUG_OR_TEST_ 364 ::std::cerr << "Warning: Falling back to sync. write. :( " << ::std::endl; 365 #endif 366 signal(request.xmitDone); 367 return writeAt(me, memPtr, count, fileOfs); 368 } 369 return false; 370 } 371 372 ////////////////////////////////////////////////////////////////////// 373 // queue specific functions 374 375 inline bool waitFor(aiocb_win32 &request) { 376 SEQAN_PROTIMESTART(tw); 377 if (!waitFor(request.xmitDone, 60000)) 378 ::std::cerr << "waitFor timeout" << ::std::endl; 379 SEQAN_PROADD(SEQAN_PROCWAIT, SEQAN_PROTIMEDIFF(tw)); 380 return true; 381 } 382 383 template < typename TTime > 384 inline bool waitFor(aiocb_win32 &request, TTime timeout_millis) { 385 SEQAN_PROTIMESTART(tw); 386 bool result = waitFor(request.xmitDone, timeout_millis); 387 SEQAN_PROADD(SEQAN_PROCWAIT, SEQAN_PROTIMEDIFF(tw)); 388 return result; 389 } 390 391 template < typename TSize > 392 inline TSize waitForAny(aiocb_win32 const * const contexts[], TSize count, DWORD timeout_millis = Event::Infinite) { 393 Event::Handle *handles = new Event::Handle[count]; 394 for(TSize i = 0; i < count; ++i) 395 handles[i] = contexts[i]->xmitDone.hEvent; 396 397 SEQAN_PROTIMESTART(tw); 398 DWORD result = WaitForMultipleObjects(count, handles, false, timeout_millis); 399 SEQAN_PROADD(SEQAN_PROCWAIT, SEQAN_PROTIMEDIFF(tw)); 400 delete[] handles; 401 if (/*result >= WAIT_OBJECT_0 && */result < WAIT_OBJECT_0 + count) 402 return result - WAIT_OBJECT_0; 403 return count; 404 } 405 406 template <typename TSpec> 407 inline bool cancel(File<Async<TSpec> > & me, aiocb_win32 const &request) { 408 return CancelIo(me.handleAsync); 409 } 410 411 template <typename TSpec> 412 inline bool flush(File<Async<TSpec> > & me) { 413 if (me.handle != me.handleAsync) // in case of equality no direct access was done -> no flush needed 414 return FlushFileBuffers(me.handle) != 0; 415 else 416 return true; 417 } 418 419 template < typename TSpec, typename AsyncRequest > 420 inline void release(File<Async<TSpec> > & me, AsyncRequest & request) { } 421 422 423 /* 424 ////////////////////////////////////////////////////////////////////// 425 // callback based read/write 426 427 template < typename TSpec, typename TValue, typename TSize, 428 typename aCallback, typename aHint > 429 inline typename AsyncRequest<File<Async<TSpec> > >::Type 430 asyncRead(File<Async<TSpec> > & me, TValue *memPtr, TSize const count, 431 aCallback* cb, aHint* hint) 432 { 433 DWORD bsize = (DWORD)(count * sizeof(TValue)); 434 typename AsyncRequest<File<Async<TSpec> > >::Type request = 435 me.queue->asyncReadAt( 436 me.handleAsync, 437 me.position, 438 memPtr, 439 bsize, 440 cb, 441 hint); 442 me.position += bsize; 443 return request; 444 } 445 446 template < typename TSpec, typename TValue, typename TSize, 447 typename aCallback, typename aHint > 448 inline typename AsyncRequest<File<Async<TSpec> > >::Type 449 asyncWrite(File<Async<TSpec> > & me, TValue const *memPtr, TSize const count, 450 aCallback* cb, aHint* hint) 451 { 452 DWORD bsize = (DWORD)(count * sizeof(TValue)); 453 typename AsyncRequest<File<Async<TSpec> > >::Type request = 454 me.queue->asyncWriteAt( 455 memPtr, 456 me.handleAsync, 457 me.position, 458 bsize, 459 cb, 460 hint); 461 me.position += bsize; 462 return request; 463 } 464 465 template < typename TSpec, typename TValue, typename TSize, typename TPos, 466 typename aCallback, typename aHint > 467 inline typename AsyncRequest<File<Async<TSpec> > >::Type 468 asyncReadAt(File<Async<TSpec> > & me, TValue *memPtr, TSize const count, TPos const fileOfs, 469 aCallback* cb, aHint* hint) 470 { 471 DWORD bsize = (DWORD)(count * sizeof(TValue)); 472 return me.queue->asyncReadAt( 473 me.handleAsync, 474 fileOfs * sizeof(TValue), 475 memPtr, 476 bsize, 477 cb, 478 hint); 479 } 480 481 template < typename TSpec, typename TValue, typename TSize, typename TPos, 482 typename aCallback, typename aHint > 483 inline typename AsyncRequest<File<Async<TSpec> > >::Type 484 asyncWriteAt(File<Async<TSpec> > & me, TValue *memPtr, TSize const count, TPos const fileOfs, 485 aCallback* cb, aHint* hint) 486 { 487 DWORD bsize = (DWORD)(count * sizeof(TValue)); 488 return me.queue->asyncWriteAt( 489 memPtr, 490 me.handleAsync, 491 fileOfs * sizeof(TValue), 492 bsize, 493 cb, 494 hint); 495 } 496 497 498 ////////////////////////////////////////////////////////////////////// 499 // event based read/write 500 501 template < typename TSpec, typename TValue, typename TSize, 502 typename aEvent > 503 inline typename AsyncRequest<File<Async<TSpec> > >::Type 504 asyncRead(File<Async<TSpec> > & me, TValue *memPtr, TSize const count, 505 aEvent &event) 506 { 507 DWORD bsize = (DWORD)(count * sizeof(TValue)); 508 typename AsyncRequest<File<Async<TSpec> > >::Type request = 509 me.queue->asyncReadAt( 510 me.handleAsync, 511 me.position, 512 memPtr, 513 bsize, 514 event); 515 me.position += bsize; 516 return request; 517 } 518 519 template < typename TSpec, typename TValue, typename TSize, 520 typename aEvent > 521 inline typename AsyncRequest<File<Async<TSpec> > >::Type 522 asyncWrite(File<Async<TSpec> > & me, TValue *memPtr, TSize const count, 523 aEvent &event) 524 { 525 DWORD bsize = (DWORD)(count * sizeof(TValue)); 526 typename AsyncRequest<File<Async<TSpec> > >::Type request = 527 me.queue->asyncWriteAt( 528 memPtr, 529 me.handleAsync, 530 me.position, 531 bsize, 532 event); 533 me.position += bsize; 534 return request; 535 } 536 537 template < typename TSpec, typename TValue, typename TSize, typename TPos, 538 typename aEvent > 539 inline typename AsyncRequest<File<Async<TSpec> > >::Type 540 asyncReadAt(File<Async<TSpec> > & me, TValue *memPtr, TSize const count, TPos const fileOfs, 541 aEvent &event) 542 { 543 DWORD bsize = (DWORD)(count * sizeof(TValue)); 544 return me.queue->asyncReadAt( 545 me.handleAsync, 546 fileOfs * sizeof(TValue), 547 memPtr, 548 bsize, 549 event); 550 } 551 552 template < typename TSpec, TValue, typename TSize, typename TPos, 553 typename aEvent > 554 inline typename AsyncRequest<File<Async<TSpec> > >::Type 555 asyncWriteAt(File<Async<TSpec> > & me, TValue *memPtr, TSize const count, TPos const fileOfs, 556 aEvent &event) 557 { 558 DWORD bsize = (DWORD)(count * sizeof(TValue)); 559 return me.queue->asyncWriteAt( 560 memPtr, 561 me.handleAsync, 562 fileOfs * sizeof(TValue), 563 bsize, 564 event); 565 } 566 567 568 ////////////////////////////////////////////////////////////////////// 569 // queue specific functions 570 571 template <typename TSpec> 572 inline void flush(File<Async<TSpec> > & me) { 573 me.queue->flush(); 574 } 575 576 template < typename TSpec, typename AsyncRequest > 577 inline void release(File<Async<TSpec> > & me, AsyncRequest & request) { 578 me.queue->release(request); 579 } 580 */ 581 582 ////////////////////////////////////////////////////////////////////////////// 583 // page aligned allocate for direct file io 584 585 struct TagAllocateAligned_; //< allocate page aligned memory for direct i/o access 586 typedef Tag<TagAllocateAligned_> const TagAllocateAligned; 587 588 template <typename T, typename TValue, typename TSize> 589 inline void 590 allocate(T const &, 591 TValue * & data, 592 TSize count, 593 TagAllocateAligned const) 594 { 595 data = (TValue *) VirtualAlloc(NULL, count * sizeof(TValue), MEM_COMMIT, PAGE_READWRITE); 596 if (data) 597 SEQAN_PROADD(SEQAN_PROMEMORY, count * sizeof(TValue)); 598 else 599 ::std::cerr << "AlignAllocator: Could not allocate memory of size " << ::std::hex << count * sizeof(TValue) << ::std::dec << ". (ErrNo=" << GetLastError() << ")" << ::std::endl; 600 } 601 602 ////////////////////////////////////////////////////////////////////////////// 603 // page aligned deallocate for direct file io 604 605 template <typename T, typename TValue, typename TSize> 606 inline void 607 deallocate( T const &, 608 TValue * data, 609 TSize count, 610 TagAllocateAligned const) 611 { 612 if (data) { 613 VirtualFree(data, 0, MEM_RELEASE); 614 if (count) // .. to use count if SEQAN_PROFILE is not defined 615 SEQAN_PROSUB(SEQAN_PROMEMORY, count * sizeof(TValue)); 616 } 617 } 618 619 #else 620 621 622 template <typename TSpec> 623 class File<Async<TSpec> > : public File<Sync<TSpec> > 624 { 625 public: 626 627 typedef File<Sync<TSpec> > Base; 628 629 typedef off_t FilePtr; 630 typedef off_t SizeType; // type of file size 631 typedef size_t SizeType_; // type of transfer size (for read or write) 632 typedef int Handle; 633 634 Handle handleAsync; 635 using Base::handle; 636 637 File(void * = NULL): // to be compatible with the FILE*(NULL) constructor 638 handleAsync(-1) {} 639 640 virtual ~File() {} 641 642 bool open(char const *fileName, int openMode = DefaultOpenMode<File>::VALUE) { 643 handle = ::open(fileName, Base::_getOFlag(openMode & ~OPEN_ASYNC), S_IREAD | S_IWRITE); 644 if (handle == -1) 645 { 646 handleAsync = handle; 647 if (!(openMode & OPEN_QUIET)) 648 ::std::cerr << "Open failed on file " << fileName << ". (" << ::strerror(errno) << ")" << ::std::endl; 649 return false; 650 } 651 652 if (Base::_getOFlag(openMode | OPEN_ASYNC) & O_DIRECT) 653 { 654 handleAsync = ::open(fileName, Base::_getOFlag(openMode | (OPEN_ASYNC & ~OPEN_CREATE)), S_IREAD | S_IWRITE); 655 if (handleAsync == -1 || errno == EINVAL) { // fall back to cached access 656 #ifdef SEQAN_DEBUG_OR_TEST_ 657 if (!(openMode & OPEN_QUIET)) 658 ::std::cerr << "Warning: Direct access openening failed. (" << ::strerror(errno) << ")" << ::std::endl; 659 #endif 660 handleAsync = handle; 661 } 662 #ifdef SEQAN_DEBUG_OR_TEST_ 663 else 664 if (!(openMode & OPEN_QUIET)) 665 ::std::cerr << "Direct access successfully initiated" << ::std::endl; 666 #endif 667 } else 668 handleAsync = handle; 669 670 if (sizeof(FilePtr) < 8 && !(openMode & OPEN_QUIET)) 671 // To remove this warning, you have to options: 672 // 1. include the following line before including anything in your application 673 // #define _FILE_OFFSET_BITS 64 674 // 2. include <seqan/platform.h> or <seqan/sequence.h> before any other include 675 ::std::cerr << "WARNING: FilePtr is not 64bit wide" << ::std::endl; 676 677 678 SEQAN_PROADD(SEQAN_PROOPENFILES, 1); 679 return true; 680 } 681 682 bool close() { 683 bool result = true; 684 if (handleAsync != handle && handleAsync != -1) 685 result &= (::close(handleAsync) == 0); 686 result &= (::close(handle) == 0); 687 handleAsync = -1; 688 handle = -1; 689 SEQAN_PROSUB(SEQAN_PROOPENFILES, 1); 690 return result; 691 } 692 }; 693 694 695 ////////////////////////////////////////////////////////////////////////////// 696 // (SeqAn adaption) 697 ////////////////////////////////////////////////////////////////////////////// 698 /* 699 template <typename TSpec> 700 struct aQueue<File<Async<TSpec> > > 701 { 702 typedef void* Type; 703 }; 704 */ 705 706 template <typename TSpec> 707 struct AsyncRequest<File<Async<TSpec> > > 708 { 709 typedef aiocb Type; 710 }; 711 /* 712 template <typename TSpec> 713 struct aEvent<File<Async<TSpec> > > 714 { 715 typedef aiocb Type; 716 }; 717 */ 718 719 ////////////////////////////////////////////////////////////////////// 720 // event based read/write 721 722 // enum { AsyncIOSignal_ = SIGIO }; 723 724 inline void printRequest(aiocb &request, const char *_hint = NULL) { 725 ::std::cerr << ::std::hex; 726 if (_hint) 727 ::std::cerr << _hint << ::std::endl; 728 ::std::cerr << "fildes: " << request.aio_fildes << ::std::endl; 729 ::std::cerr << "buffer: " << (unsigned long)request.aio_buf << ::std::endl; 730 ::std::cerr << "offset: " << request.aio_offset<< ::std::endl; 731 ::std::cerr << "nbytes: " << request.aio_nbytes << ::std::endl; 732 ::std::cerr << "event: " << request.aio_sigevent.sigev_notify << ::std::endl; 733 ::std::cerr << "Raddr: " << &request << ::std::endl; 734 ::std::cerr << ::std::dec; 735 } 736 737 template < typename TSpec, typename TValue, typename TSize, typename TPos > 738 bool asyncReadAt(File<Async<TSpec> > & me, TValue *memPtr, TSize const count, TPos const fileOfs, 739 aiocb &request) 740 { 741 SEQAN_PROTIMESTART(tw); 742 memset(&request, 0, sizeof(aiocb)); 743 request.aio_fildes = me.handleAsync; 744 request.aio_buf = memPtr; 745 request.aio_offset = fileOfs; 746 request.aio_offset *= sizeof(TValue); 747 request.aio_nbytes = count * sizeof(TValue); 748 request.aio_sigevent.sigev_notify = SIGEV_NONE; 749 /* request.aio_sigevent.sigev_notify = SIGEV_SIGNAL; 750 request.aio_sigevent.sigev_signo = AsyncIOSignal_; 751 request.aio_sigevent.sigev_value.sival_ptr = &request; 752 #ifdef SEQAN_VVERBOSE 753 printRequest(request, "aio_read():"); 754 #endif 755 */ if (request.aio_nbytes == 0) return true; 756 SEQAN_PROADD(SEQAN_PROIO, (request.aio_nbytes + SEQAN_PROPAGESIZE - 1) / SEQAN_PROPAGESIZE); 757 int result = aio_read(&request); 758 SEQAN_PROADD(SEQAN_PROIWAIT, SEQAN_PROTIMEDIFF(tw)); 759 if (result) 760 { 761 request.aio_nbytes = 0; 762 if (errno == EAGAIN) { // read synchronoulsy instead 763 #ifdef SEQAN_DEBUG_OR_TEST_ 764 ::std::cerr << "Warning: Falling back to sync. read. :( " << ::std::endl; 765 #endif 766 return readAt(me, memPtr, count, fileOfs); 767 } 768 #ifdef SEQAN_DEBUG 769 else 770 ::std::cerr << "asyncReadAt returned " << result << " and errno=" << errno << " " << ::strerror(errno) << ::std::endl; 771 #endif 772 } 773 return result == 0; 774 } 775 776 template < typename TSpec, typename TValue, typename TSize, typename TPos > 777 bool asyncWriteAt(File<Async<TSpec> > & me, const TValue *memPtr, TSize const count, TPos const fileOfs, 778 aiocb &request) 779 { 780 SEQAN_PROTIMESTART(tw); 781 memset(&request, 0, sizeof(aiocb)); 782 request.aio_fildes = me.handleAsync; 783 request.aio_buf = const_cast<TValue*>(memPtr); 784 request.aio_offset = fileOfs; 785 request.aio_offset *= sizeof(TValue); 786 request.aio_nbytes = count * sizeof(TValue); 787 request.aio_sigevent.sigev_notify = SIGEV_NONE; 788 /* request.aio_sigevent.sigev_notify = SIGEV_SIGNAL; 789 request.aio_sigevent.sigev_signo = AsyncIOSignal_; 790 request.aio_sigevent.sigev_value.sival_ptr = &request; 791 #ifdef SEQAN_VVERBOSE 792 printRequest(request, "aio_write():"); 793 #endif 794 */ if (request.aio_nbytes == 0) return true; 795 SEQAN_PROADD(SEQAN_PROIO, (request.aio_nbytes + SEQAN_PROPAGESIZE - 1) / SEQAN_PROPAGESIZE); 796 int result = aio_write(&request); 797 SEQAN_PROADD(SEQAN_PROIWAIT, SEQAN_PROTIMEDIFF(tw)); 798 if (result) 799 { 800 request.aio_nbytes = 0; 801 if (errno == EAGAIN) { // write synchronoulsy instead 802 #ifdef SEQAN_DEBUG_OR_TEST_ 803 ::std::cerr << "Warning: Falling back to sync. write. :( " << ::std::endl; 804 #endif 805 return writeAt(me, memPtr, count, fileOfs); 806 } 807 #ifdef SEQAN_DEBUG 808 else 809 ::std::cerr << "asyncWriteAt returned " << result << " and errno=" << errno << " " << ::strerror(errno) << ::std::endl; 810 #endif 811 } 812 return result == 0; 813 } 814 815 template <typename TSpec> 816 inline bool flush(File<Async<TSpec> > & me) { 817 #if _POSIX_SYNCHRONIZED_IO > 0 818 return me.handle == me.handleAsync || fdatasync(me.handle) == 0; 819 #else 820 return me.handle == me.handleAsync || fsync(me.handle) == 0; 821 #endif 822 } 823 824 ////////////////////////////////////////////////////////////////////// 825 // queue specific functions 826 827 inline bool waitFor(aiocb &request) { 828 /* #ifdef SEQAN_VVERBOSE 829 printRequest(request, "aio_suspend():"); 830 #endif 831 */ 832 if (request.aio_nbytes == 0) return true; 833 aiocb * cblist = &request; 834 SEQAN_PROTIMESTART(tw); 835 int result = aio_suspend(&cblist, 1, NULL); 836 SEQAN_PROADD(SEQAN_PROCWAIT, SEQAN_PROTIMEDIFF(tw)); 837 #ifdef SEQAN_DEBUG 838 if (result) { 839 int eno = aio_error(&request); 840 if (eno != EINPROGRESS) 841 ::std::cerr << "waitFor: aio_error returned " << ::strerror(eno) << " and errno=" << errno << " " << ::strerror(errno) << ::std::endl; 842 } 843 #endif 844 return result == 0; 845 } 846 847 inline bool waitFor(aiocb &request, long timeout_millis) { 848 /* #ifdef SEQAN_VVERBOSE 849 printRequest(request, "aio_suspend_timeout():"); 850 #endif 851 */ 852 if (request.aio_nbytes == 0) return true; 853 854 int result; 855 if (timeout_millis == 0) 856 result = aio_error(&request); 857 else { 858 aiocb * cblist = &request; 859 timespec ts; 860 ts.tv_sec = timeout_millis / 1000; 861 ts.tv_nsec = (timeout_millis % 1000) * 1000; 862 SEQAN_PROTIMESTART(tw); 863 result = aio_suspend(&cblist, 1, &ts); 864 SEQAN_PROADD(SEQAN_PROCWAIT, SEQAN_PROTIMEDIFF(tw)); 865 } 866 867 #ifdef SEQAN_DEBUG 868 if (result) { 869 int eno = aio_error(&request); 870 if (eno != EINPROGRESS) 871 ::std::cerr << "waitFor(timeOut=" << timeout_millis << "): aio_error returned " << ::strerror(eno) << " and errno=" << errno << " " << ::strerror(errno) << ::std::endl; 872 } 873 #endif 874 return result == 0; 875 } 876 877 template < typename TSize > 878 inline TSize waitForAny(aiocb const * const contexts[], TSize count) { 879 SEQAN_PROTIMESTART(tw); 880 bool result = aio_suspend(contexts, count, NULL); 881 SEQAN_PROADD(SEQAN_PROCWAIT, SEQAN_PROTIMEDIFF(tw)); 882 return result == 0; 883 } 884 885 template < typename TSize > 886 inline TSize waitForAny(aiocb const * const contexts[], TSize count, long timeout_millis) { 887 timespec ts; 888 ts.tv_sec = timeout_millis / 1000; 889 ts.tv_nsec = (timeout_millis % 1000) * 1000; 890 SEQAN_PROTIMESTART(tw); 891 bool result = aio_suspend(contexts, count, &ts); 892 SEQAN_PROADD(SEQAN_PROCWAIT, SEQAN_PROTIMEDIFF(tw)); 893 return result == 0; 894 } 895 896 template <typename TSpec> 897 inline bool cancel(File<Async<TSpec> > & me, aiocb &request) { 898 /* #ifdef SEQAN_VVERBOSE 899 printRequest(request, "aio_cancel():"); 900 #endif 901 */ return aio_cancel(me.handleAsync, &request) == 0; 902 } 903 904 inline int error(aiocb const & request) { 905 return aio_error(&request); 906 } 907 908 inline int _returnValue(aiocb & request) { 909 return aio_return(&request); 910 } 911 912 template <typename TSpec> 913 inline void release(File<Async<TSpec> > & /*me*/, aiocb const & /*request*/) {} 914 915 /* 916 typedef void (*sighandler_t)(int); 917 static unsigned AsyncIOHandlerRefCount_ = 0; 918 static struct sigaction AsyncIOOldSig_; 919 920 inline void AsyncIOHandler_(int sigNo, siginfo_t *info, void *hint) { 921 SEQAN_ASSERT(sigNo == AsyncIOSignal_); 922 // TODO: signal respective event 923 // currently we don't need async IO handlers because 924 // we only wait for single events 925 } 926 927 static sighandler_t _addAsyncIOHandler() { 928 struct sigaction newSig, oldSig; 929 newSig.sa_sigaction = AsyncIOHandler_; 930 sigemptyset(&newSig.sa_mask); 931 newSig.sa_flags = SA_RESTART + SA_SIGINFO; 932 if (sigaction(AsyncIOSignal_, &newSig, &oldSig) < 0) 933 return SIG_ERR; 934 return oldSig.sa_handler; 935 } 936 */ 937 938 ////////////////////////////////////////////////////////////////////////////// 939 // page aligned allocate for direct file io 940 941 struct TagAllocateAligned_; //< allocate page aligned memory for direct i/o access 942 typedef Tag<TagAllocateAligned_> const TagAllocateAligned; 943 944 template <typename T, typename TValue, typename TSize> 945 inline void 946 allocate(T const & /*me*/, 947 TValue * & data, 948 TSize count, 949 TagAllocateAligned const) 950 { 951 data = (TValue *) ::valloc(count * sizeof(TValue)); 952 #ifdef SEQAN_PROFILE 953 if (data) 954 SEQAN_PROADD(SEQAN_PROMEMORY, count * sizeof(TValue)); 955 else 956 ::std::cerr << "AlignAllocator: Could not allocate memory of size " << ::std::hex << 957 count * sizeof(TValue) << " with page alignment. (ErrNo=" << ::std::dec << 958 errno << ")" << ::std::endl; 959 #endif 960 } 961 962 ////////////////////////////////////////////////////////////////////////////// 963 // page aligned deallocate for direct file io 964 965 template <typename T, typename TValue, typename TSize> 966 inline void 967 deallocate( T const & /*me*/, 968 TValue * data, 969 TSize 970 #ifdef SEQAN_PROFILE 971 count 972 #endif 973 , 974 TagAllocateAligned const) 975 { 976 #ifdef SEQAN_PROFILE 977 if (data && count) // .. to use count if SEQAN_PROFILE is not defined 978 SEQAN_PROSUB(SEQAN_PROMEMORY, count * sizeof(TValue)); 979 #endif 980 ::free(data); 981 } 982 983 template < typename TSpec, typename TSize > 984 inline void resize(File<Async<TSpec> > &me, TSize new_length) { 985 me.resize(new_length); 986 } 987 988 989 #endif 990 991 ////////////////////////////////////////////////////////////////////////////// 992 // global functions 993 994 template <typename TSpec> 995 struct Size< File<Async<TSpec> > > 996 { 997 typedef typename File<Async<TSpec> >::SizeType Type; 998 }; 999 1000 template <typename TSpec> 1001 struct Position< File<Async<TSpec> > > 1002 { 1003 typedef typename File<Async<TSpec> >::FilePtr Type; 1004 }; 1005 1006 template <typename TSpec> 1007 struct Difference< File<Async<TSpec> > > 1008 { 1009 typedef typename File<Async<TSpec> >::FilePtr Type; 1010 }; 1011 1012 1013 1014 template < typename TSpec, typename TValue, typename TSize> 1015 inline void 1016 allocate( File<Async<TSpec> > const & me, 1017 TValue * & data, 1018 TSize count) 1019 { 1020 allocate(me, data, count, TagAllocateAligned()); 1021 } 1022 1023 template <typename TSpec, typename TValue, typename TSize> 1024 inline void 1025 deallocate( File<Async<TSpec> > const & me, 1026 TValue * data, 1027 TSize count) 1028 { 1029 deallocate(me, data, count, TagAllocateAligned()); 1030 } 1031 1032 1033 } 1034 1035 #endif 1036