1 // zipstream Library License: 2 // -------------------------- 3 // 4 // The zlib/libpng License Copyright (c) 2003 Jonathan de Halleux. 5 // 6 // This software is provided 'as-is', without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software. 7 // 8 // Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions: 9 // 10 // 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required. 11 // 12 // 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software. 13 // 14 // 3. This notice may not be removed or altered from any source distribution 15 // 16 // 17 // Author: Jonathan de Halleux, dehalleux@pelikhan.com, 2003 (original zlib stream) 18 // Author: David Weese, dave.weese@gmail.com, 2014 (extension to parallel block-wise compression in bgzf format) 19 20 #ifndef INCLUDE_SEQAN_STREAM_IOSTREAM_BGZF_H_ 21 #define INCLUDE_SEQAN_STREAM_IOSTREAM_BGZF_H_ 22 23 #ifndef SEQAN_BGZF_NUM_THREADS 24 #define SEQAN_BGZF_NUM_THREADS 16 25 #endif 26 27 namespace seqan { 28 29 const unsigned BGZF_MAX_BLOCK_SIZE = 64 * 1024; 30 const unsigned BGZF_BLOCK_HEADER_LENGTH = 18; 31 const unsigned BGZF_BLOCK_FOOTER_LENGTH = 8; 32 const unsigned ZLIB_BLOCK_OVERHEAD = 5; // 5 bytes block overhead (see 3.2.4. at http://www.gzip.org/zlib/rfc-deflate.html) 33 34 // Reduce the maximal input size, such that the compressed data 35 // always fits in one block even for level Z_NO_COMPRESSION. 36 const unsigned BGZF_BLOCK_SIZE = BGZF_MAX_BLOCK_SIZE - BGZF_BLOCK_HEADER_LENGTH - BGZF_BLOCK_FOOTER_LENGTH - ZLIB_BLOCK_OVERHEAD; 37 38 // =========================================================================== 39 // Classes 40 // =========================================================================== 41 42 // -------------------------------------------------------------------------- 43 // Class basic_bgzf_streambuf 44 // -------------------------------------------------------------------------- 45 46 template< 47 typename Elem, 48 typename Tr = std::char_traits<Elem>, 49 typename ElemA = std::allocator<Elem>, 50 typename ByteT = char, 51 typename ByteAT = std::allocator<ByteT> 52 > 53 class basic_bgzf_streambuf : public std::basic_streambuf<Elem, Tr> 54 { 55 public: 56 typedef std::basic_ostream<Elem, Tr>& ostream_reference; 57 typedef ElemA char_allocator_type; 58 typedef ByteT byte_type; 59 typedef ByteAT byte_allocator_type; 60 typedef byte_type* byte_buffer_type; 61 typedef typename Tr::char_type char_type; 62 typedef typename Tr::int_type int_type; 63 64 typedef ConcurrentQueue<size_t, Suspendable<Limit> > TJobQueue; 65 66 struct OutputBuffer 67 { 68 char buffer[BGZF_MAX_BLOCK_SIZE]; 69 size_t size; 70 }; 71 72 struct BufferWriter 73 { 74 ostream_reference ostream; 75 BufferWriterBufferWriter76 BufferWriter(ostream_reference ostream) : 77 ostream(ostream) 78 {} 79 operatorBufferWriter80 bool operator() (OutputBuffer const & outputBuffer) 81 { 82 ostream.write(outputBuffer.buffer, outputBuffer.size); 83 return ostream.good(); 84 } 85 }; 86 87 struct CompressionJob 88 { 89 typedef std::vector<char_type, char_allocator_type> TBuffer; 90 91 TBuffer buffer; 92 size_t size; 93 OutputBuffer *outputBuffer; 94 CompressionJobCompressionJob95 CompressionJob() : 96 buffer(BGZF_BLOCK_SIZE / sizeof(char_type), 0), 97 size(0), 98 outputBuffer(NULL) 99 {} 100 }; 101 102 // string of recycable jobs 103 size_t numThreads; 104 size_t numJobs; 105 String<CompressionJob> jobs; 106 TJobQueue jobQueue; 107 TJobQueue idleQueue; 108 Serializer< 109 OutputBuffer, 110 BufferWriter> serializer; 111 112 size_t currentJobId; 113 bool currentJobAvail; 114 115 116 struct CompressionThread 117 { 118 basic_bgzf_streambuf *streamBuf; 119 CompressionContext<BgzfFile> compressionCtx; 120 size_t threadNum; 121 operatorCompressionThread122 void operator()() 123 { 124 ScopedReadLock<TJobQueue> readLock(streamBuf->jobQueue); 125 ScopedWriteLock<TJobQueue> writeLock(streamBuf->idleQueue); 126 127 // wait for a new job to become available 128 bool success = true; 129 while (success) 130 { 131 size_t jobId = -1; 132 if (!popFront(jobId, streamBuf->jobQueue)) 133 return; 134 135 CompressionJob &job = streamBuf->jobs[jobId]; 136 137 // compress block with zlib 138 job.outputBuffer->size = _compressBlock( 139 job.outputBuffer->buffer, sizeof(job.outputBuffer->buffer), 140 &job.buffer[0], job.size, compressionCtx); 141 142 success = releaseValue(streamBuf->serializer, job.outputBuffer); 143 appendValue(streamBuf->idleQueue, jobId); 144 } 145 } 146 }; 147 148 // array of worker threads 149 using TFuture = decltype(std::async(CompressionThread{nullptr, CompressionContext<BgzfFile>{}, static_cast<size_t>(0)})); 150 std::vector<TFuture> threads; 151 152 basic_bgzf_streambuf(ostream_reference ostream_, 153 size_t numThreads = SEQAN_BGZF_NUM_THREADS, 154 size_t jobsPerThread = 8) : numThreads(numThreads)155 numThreads(numThreads), 156 numJobs(numThreads * jobsPerThread), 157 jobQueue(numJobs), 158 idleQueue(numJobs), 159 serializer(ostream_, numThreads * jobsPerThread) 160 { 161 resize(jobs, numJobs, Exact()); 162 currentJobId = 0; 163 164 lockWriting(jobQueue); 165 lockReading(idleQueue); 166 setReaderWriterCount(jobQueue, numThreads, 1); 167 setReaderWriterCount(idleQueue, 1, numThreads); 168 169 for (unsigned i = 0; i < numJobs; ++i) 170 { 171 bool success = appendValue(idleQueue, i); 172 ignoreUnusedVariableWarning(success); 173 SEQAN_ASSERT(success); 174 } 175 176 for (size_t i = 0; i < numThreads; ++i) 177 { 178 threads.push_back(std::async(std::launch::async, CompressionThread{this, CompressionContext<BgzfFile>{}, i})); 179 } 180 181 currentJobAvail = popFront(currentJobId, idleQueue); 182 SEQAN_ASSERT(currentJobAvail); 183 184 CompressionJob &job = jobs[currentJobId]; 185 job.outputBuffer = aquireValue(serializer); 186 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1)); 187 } 188 ~basic_bgzf_streambuf()189 ~basic_bgzf_streambuf() 190 { 191 // the buffer is now (after addFooter()) and flush will append the empty EOF marker 192 flush(true); 193 194 unlockWriting(jobQueue); 195 unlockReading(idleQueue); 196 } 197 compressBuffer(size_t size)198 bool compressBuffer(size_t size) 199 { 200 // submit current job 201 if (currentJobAvail) 202 { 203 jobs[currentJobId].size = size; 204 appendValue(jobQueue, currentJobId); 205 } 206 207 // recycle existing idle job 208 if (!(currentJobAvail = popFront(currentJobId, idleQueue))) 209 return false; 210 211 jobs[currentJobId].outputBuffer = aquireValue(serializer); 212 213 return serializer; 214 } 215 overflow(int_type c)216 int_type overflow(int_type c) 217 { 218 int w = static_cast<int>(this->pptr() - this->pbase()); 219 if (c != EOF) 220 { 221 *this->pptr() = c; 222 ++w; 223 } 224 if (compressBuffer(w)) 225 { 226 CompressionJob &job = jobs[currentJobId]; 227 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1)); 228 return c; 229 } 230 else 231 { 232 return EOF; 233 } 234 } 235 236 std::streamsize flush(bool flushEmptyBuffer = false) 237 { 238 int w = static_cast<int>(this->pptr() - this->pbase()); 239 if ((w != 0 || flushEmptyBuffer) && compressBuffer(w)) 240 { 241 CompressionJob &job = jobs[currentJobId]; 242 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1)); 243 } 244 else 245 { 246 w = 0; 247 } 248 249 // wait for running compressor threads 250 waitForMinSize(idleQueue, numJobs - 1); 251 252 serializer.worker.ostream.flush(); 253 return w; 254 } 255 sync()256 int sync() 257 { 258 if (this->pptr() != this->pbase()) 259 { 260 int c = overflow(EOF); 261 if (c == EOF) 262 return -1; 263 } 264 return 0; 265 } 266 addFooter()267 void addFooter() 268 { 269 // we flush the filled buffer here, so that an empty (EOF) buffer is flushed in the d'tor 270 if (this->pptr() != this->pbase()) 271 overflow(EOF); 272 } 273 274 // returns a reference to the output stream get_ostream()275 ostream_reference get_ostream() const { return serializer.worker.ostream; }; 276 }; 277 278 // -------------------------------------------------------------------------- 279 // Class basic_unbgzf_streambuf 280 // -------------------------------------------------------------------------- 281 282 template< 283 typename Elem, 284 typename Tr = std::char_traits<Elem>, 285 typename ElemA = std::allocator<Elem>, 286 typename ByteT = char, 287 typename ByteAT = std::allocator<ByteT> 288 > 289 class basic_unbgzf_streambuf : 290 public std::basic_streambuf<Elem, Tr> 291 { 292 public: 293 typedef std::basic_istream<Elem, Tr>& istream_reference; 294 typedef ElemA char_allocator_type; 295 typedef ByteT byte_type; 296 typedef ByteAT byte_allocator_type; 297 typedef byte_type* byte_buffer_type; 298 typedef typename Tr::char_type char_type; 299 typedef typename Tr::int_type int_type; 300 typedef typename Tr::off_type off_type; 301 typedef typename Tr::pos_type pos_type; 302 303 typedef std::vector<char_type, char_allocator_type> TBuffer; 304 typedef ConcurrentQueue<int, Suspendable<Limit> > TJobQueue; 305 306 static const size_t MAX_PUTBACK = 4; 307 308 struct Serializer 309 { 310 istream_reference istream; 311 std::mutex lock; 312 IOError *error; 313 off_type fileOfs; 314 SerializerSerializer315 Serializer(istream_reference istream) : 316 istream(istream), 317 error(NULL), 318 fileOfs(0u) 319 {} 320 ~SerializerSerializer321 ~Serializer() 322 { 323 delete error; 324 } 325 }; 326 327 Serializer serializer; 328 329 struct DecompressionJob 330 { 331 typedef std::vector<byte_type, byte_allocator_type> TInputBuffer; 332 333 TInputBuffer inputBuffer; 334 TBuffer buffer; 335 off_type fileOfs; 336 int size; 337 unsigned compressedSize; 338 339 std::mutex cs; 340 std::condition_variable readyEvent; 341 bool ready; 342 bool bgzfEofMarker; 343 DecompressionJobDecompressionJob344 DecompressionJob() : 345 inputBuffer(BGZF_MAX_BLOCK_SIZE, 0), 346 buffer(MAX_PUTBACK + BGZF_MAX_BLOCK_SIZE / sizeof(char_type), 0), 347 fileOfs(), 348 size(0), 349 cs(), 350 readyEvent(), 351 ready(true), 352 bgzfEofMarker(false) 353 {} 354 355 // TODO(rrahn): Do we need a copy constructor for the decompression job. DecompressionJobDecompressionJob356 DecompressionJob(DecompressionJob const &other) : 357 inputBuffer(other.inputBuffer), 358 buffer(other.buffer), 359 fileOfs(other.fileOfs), 360 size(other.size), 361 cs(), 362 readyEvent(), 363 ready(other.ready), 364 bgzfEofMarker(other.bgzfEofMarker) 365 {} 366 }; 367 368 // string of recycable jobs 369 size_t numThreads; 370 size_t numJobs; 371 String<DecompressionJob> jobs; 372 TJobQueue runningQueue; 373 TJobQueue todoQueue; 374 int currentJobId; 375 376 struct DecompressionThread 377 { 378 basic_unbgzf_streambuf *streamBuf; 379 CompressionContext<BgzfFile> compressionCtx; 380 operatorDecompressionThread381 void operator()() 382 { 383 ScopedReadLock<TJobQueue> readLock(streamBuf->todoQueue); 384 ScopedWriteLock<TJobQueue> writeLock(streamBuf->runningQueue); 385 386 // wait for a new job to become available 387 while (true) 388 { 389 int jobId = -1; 390 if (!popFront(jobId, streamBuf->todoQueue)) 391 return; 392 393 DecompressionJob &job = streamBuf->jobs[jobId]; 394 size_t tailLen = 0; 395 396 // typically the idle queue contains only ready jobs 397 // however, if seek() fast forwards running jobs into the todoQueue 398 // the caller defers the task of waiting to the decompression threads 399 if (!job.ready) 400 { 401 std::unique_lock<std::mutex> lock(job.cs); 402 job.readyEvent.wait(lock, [&job]{return job.ready;}); 403 SEQAN_ASSERT_EQ(job.ready, true); 404 } 405 406 { 407 std::lock_guard<std::mutex> scopedLock(streamBuf->serializer.lock); 408 409 job.bgzfEofMarker = false; 410 if (streamBuf->serializer.error != NULL) 411 return; 412 413 // remember start offset (for tellg later) 414 job.fileOfs = streamBuf->serializer.fileOfs; 415 job.size = -1; 416 job.compressedSize = 0; 417 418 // only load if not at EOF 419 if (job.fileOfs != -1) 420 { 421 // read header 422 streamBuf->serializer.istream.read( 423 (char*)&job.inputBuffer[0], 424 BGZF_BLOCK_HEADER_LENGTH); 425 426 if (!streamBuf->serializer.istream.good()) 427 { 428 streamBuf->serializer.fileOfs = -1; 429 if (streamBuf->serializer.istream.eof()) 430 goto eofSkip; 431 streamBuf->serializer.error = new IOError("Stream read error."); 432 return; 433 } 434 435 // check header 436 if (!_bgzfCheckHeader(&job.inputBuffer[0])) 437 { 438 streamBuf->serializer.fileOfs = -1; 439 streamBuf->serializer.error = new IOError("Invalid BGZF block header."); 440 return; 441 } 442 443 // extract length of compressed data 444 tailLen = _bgzfUnpack16(&job.inputBuffer[0] + 16) + 1u - BGZF_BLOCK_HEADER_LENGTH; 445 446 // read compressed data and tail 447 streamBuf->serializer.istream.read( 448 (char*)&job.inputBuffer[0] + BGZF_BLOCK_HEADER_LENGTH, 449 tailLen); 450 451 // Check if end-of-file marker is set 452 if (memcmp(reinterpret_cast<uint8_t const *>(&job.inputBuffer[0]), 453 reinterpret_cast<uint8_t const *>(&BGZF_END_OF_FILE_MARKER[0]), 454 28) == 0) 455 { 456 job.bgzfEofMarker = true; 457 } 458 459 if (!streamBuf->serializer.istream.good()) 460 { 461 streamBuf->serializer.fileOfs = -1; 462 if (streamBuf->serializer.istream.eof()) 463 goto eofSkip; 464 streamBuf->serializer.error = new IOError("Stream read error."); 465 return; 466 } 467 468 job.compressedSize = BGZF_BLOCK_HEADER_LENGTH + tailLen; 469 streamBuf->serializer.fileOfs += job.compressedSize; 470 job.ready = false; 471 472 eofSkip: 473 streamBuf->serializer.istream.clear( 474 streamBuf->serializer.istream.rdstate() & ~std::ios_base::failbit); 475 } 476 477 if (!appendValue(streamBuf->runningQueue, jobId)) 478 { 479 // signal that job is ready 480 { 481 std::unique_lock<std::mutex> lock(job.cs); 482 job.ready = true; 483 } 484 job.readyEvent.notify_all(); 485 return; // Terminate this thread. 486 } 487 } 488 489 if (!job.ready) 490 { 491 // decompress block 492 job.size = _decompressBlock( 493 &job.buffer[0] + MAX_PUTBACK, capacity(job.buffer), 494 &job.inputBuffer[0], job.compressedSize, compressionCtx); 495 496 // signal that job is ready 497 { 498 std::unique_lock<std::mutex> lock(job.cs); 499 job.ready = true; 500 } 501 job.readyEvent.notify_all(); 502 } 503 } 504 } 505 }; 506 507 // array of worker threads 508 using TFuture = decltype(std::async(DecompressionThread{nullptr, CompressionContext<BgzfFile>{}})); 509 std::vector<TFuture> threads; 510 TBuffer putbackBuffer; 511 512 basic_unbgzf_streambuf(istream_reference istream_, 513 size_t numThreads = SEQAN_BGZF_NUM_THREADS, 514 size_t jobsPerThread = 8) : serializer(istream_)515 serializer(istream_), 516 numThreads(numThreads), 517 numJobs(numThreads * jobsPerThread), 518 runningQueue(numJobs), 519 todoQueue(numJobs), 520 putbackBuffer(MAX_PUTBACK) 521 { 522 resize(jobs, numJobs, Exact()); 523 currentJobId = -1; 524 525 lockReading(runningQueue); 526 lockWriting(todoQueue); 527 setReaderWriterCount(runningQueue, 1, numThreads); 528 setReaderWriterCount(todoQueue, numThreads, 1); 529 530 for (unsigned i = 0; i < numJobs; ++i) 531 { 532 bool success = appendValue(todoQueue, i); 533 ignoreUnusedVariableWarning(success); 534 SEQAN_ASSERT(success); 535 } 536 537 for (unsigned i = 0; i < numThreads; ++i) 538 { 539 threads.push_back(std::async(std::launch::async, DecompressionThread{this, CompressionContext<BgzfFile>{}})); 540 } 541 } 542 ~basic_unbgzf_streambuf()543 ~basic_unbgzf_streambuf() 544 { 545 unlockWriting(todoQueue); 546 unlockReading(runningQueue); 547 } 548 underflow()549 int_type underflow() 550 { 551 // no need to use the next buffer? 552 if (this->gptr() && this->gptr() < this->egptr()) 553 return Tr::to_int_type(*this->gptr()); 554 555 size_t putback = this->gptr() - this->eback(); 556 if (putback > MAX_PUTBACK) 557 putback = MAX_PUTBACK; 558 559 // save at most MAX_PUTBACK characters from previous page to putback buffer 560 if (putback != 0) 561 std::copy( 562 this->gptr() - putback, 563 this->gptr(), 564 &putbackBuffer[0]); 565 566 if (currentJobId >= 0) 567 appendValue(todoQueue, currentJobId); 568 569 while (true) 570 { 571 if (!popFront(currentJobId, runningQueue)) 572 { 573 currentJobId = -1; 574 SEQAN_ASSERT(serializer.error != NULL); 575 if (serializer.error != NULL) 576 throw *serializer.error; 577 return EOF; 578 } 579 580 DecompressionJob &job = jobs[currentJobId]; 581 582 // restore putback buffer 583 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1)); 584 if (putback != 0) 585 std::copy( 586 &putbackBuffer[0], 587 &putbackBuffer[0] + putback, 588 &job.buffer[0] + (MAX_PUTBACK - putback)); 589 590 // wait for the end of decompression 591 { 592 std::unique_lock<std::mutex> lock(job.cs); 593 job.readyEvent.wait(lock, [&job]{return job.ready;}); 594 } 595 596 size_t size = (job.size != -1)? job.size : 0; 597 598 // reset buffer pointers 599 this->setg( 600 &job.buffer[0] + (MAX_PUTBACK - putback), // beginning of putback area 601 &job.buffer[0] + MAX_PUTBACK, // read position 602 &job.buffer[0] + (MAX_PUTBACK + size)); // end of buffer 603 604 // The end of the bgzf file is reached, either if there was an error, or if the 605 // end-of-file marker was reached, while the uncompressed block had zero size. 606 if (job.size == -1 || (job.size == 0 && job.bgzfEofMarker)) 607 return EOF; 608 else if (job.size > 0) 609 return Tr::to_int_type(*this->gptr()); // return next character 610 611 throw IOError("BGZF: Invalid end condition in decompression. " 612 "Most likely due to an empty bgzf block without end-of-file marker."); 613 } 614 } 615 seekoff(off_type ofs,std::ios_base::seekdir dir,std::ios_base::openmode openMode)616 pos_type seekoff(off_type ofs, std::ios_base::seekdir dir, std::ios_base::openmode openMode) 617 { 618 if ((openMode & (std::ios_base::in | std::ios_base::out)) == std::ios_base::in) 619 { 620 if (dir == std::ios_base::cur && ofs >= 0) 621 { 622 // forward delta seek 623 while (currentJobId < 0 || this->egptr() - this->gptr() < ofs) 624 { 625 ofs -= this->egptr() - this->gptr(); 626 if (this->underflow() == EOF) 627 break; 628 } 629 630 if (currentJobId >= 0 && ofs <= this->egptr() - this->gptr()) 631 { 632 DecompressionJob &job = jobs[currentJobId]; 633 634 // reset buffer pointers 635 this->setg( 636 this->eback(), // beginning of putback area 637 this->gptr() + ofs, // read position 638 this->egptr()); // end of buffer 639 640 if (this->gptr() != this->egptr()) 641 return pos_type((job.fileOfs << 16) + ((this->gptr() - &job.buffer[MAX_PUTBACK]))); 642 else 643 return pos_type((job.fileOfs + job.compressedSize) << 16); 644 } 645 646 } 647 else if (dir == std::ios_base::beg) 648 { 649 // random seek 650 std::streampos destFileOfs = ofs >> 16; 651 652 // are we in the same block? 653 if (currentJobId >= 0 && jobs[currentJobId].fileOfs == (off_type)destFileOfs) 654 { 655 DecompressionJob &job = jobs[currentJobId]; 656 657 // reset buffer pointers 658 this->setg( 659 this->eback(), // beginning of putback area 660 &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)), // read position 661 this->egptr()); // end of buffer 662 return ofs; 663 } 664 665 // ok, different block 666 { 667 std::lock_guard<std::mutex> scopedLock(serializer.lock); 668 669 // remove all running jobs and put them in the idle queue unless we 670 // find our seek target 671 672 if (currentJobId >= 0) 673 appendValue(todoQueue, currentJobId); 674 675 // Note that if we are here the current job does not represent the sought block. 676 // Hence if the running queue is empty we need to explicitly unset the jobId, 677 // otherwise we would not update the serializers istream pointer to the correct position. 678 if (empty(runningQueue)) 679 currentJobId = -1; 680 681 // empty is thread-safe in serializer.lock 682 while (!empty(runningQueue)) 683 { 684 popFront(currentJobId, runningQueue); 685 686 if (jobs[currentJobId].fileOfs == (off_type)destFileOfs) 687 break; 688 689 // push back useless job 690 appendValue(todoQueue, currentJobId); 691 currentJobId = -1; 692 } 693 694 if (currentJobId == -1) 695 { 696 SEQAN_ASSERT(empty(runningQueue)); 697 serializer.istream.clear(serializer.istream.rdstate() & ~std::ios_base::eofbit); 698 if (serializer.istream.rdbuf()->pubseekpos(destFileOfs, std::ios_base::in) == destFileOfs) 699 serializer.fileOfs = destFileOfs; 700 else 701 currentJobId = -2; // temporarily signals a seek error 702 } 703 } 704 705 // if our block wasn't in the running queue yet, it should now 706 // be the first that falls out after modifying serializer.fileOfs 707 if (currentJobId == -1) 708 popFront(currentJobId, runningQueue); 709 else if (currentJobId == -2) 710 currentJobId = -1; 711 712 if (currentJobId >= 0) 713 { 714 // wait for the end of decompression 715 DecompressionJob &job = jobs[currentJobId]; 716 717 { 718 std::unique_lock<std::mutex> lock(job.cs); 719 job.readyEvent.wait(lock, [&job]{return job.ready;}); 720 } 721 722 SEQAN_ASSERT_EQ(job.fileOfs, (off_type)destFileOfs); 723 724 // reset buffer pointers 725 this->setg( 726 &job.buffer[0] + MAX_PUTBACK, // no putback area 727 &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)), // read position 728 &job.buffer[0] + (MAX_PUTBACK + job.size)); // end of buffer 729 return ofs; 730 } 731 } 732 } 733 return pos_type(off_type(-1)); 734 } 735 seekpos(pos_type pos,std::ios_base::openmode openMode)736 pos_type seekpos(pos_type pos, std::ios_base::openmode openMode) 737 { 738 return seekoff(off_type(pos), std::ios_base::beg, openMode); 739 } 740 741 // returns the compressed input istream get_istream()742 istream_reference get_istream() { return serializer.istream; }; 743 }; 744 745 // -------------------------------------------------------------------------- 746 // Class basic_bgzf_ostreambase 747 // -------------------------------------------------------------------------- 748 749 template< 750 typename Elem, 751 typename Tr = std::char_traits<Elem>, 752 typename ElemA = std::allocator<Elem>, 753 typename ByteT = char, 754 typename ByteAT = std::allocator<ByteT> 755 > 756 class basic_bgzf_ostreambase : virtual public std::basic_ios<Elem,Tr> 757 { 758 public: 759 typedef std::basic_ostream<Elem, Tr>& ostream_reference; 760 typedef basic_bgzf_streambuf<Elem, Tr, ElemA, ByteT, ByteAT> bgzf_streambuf_type; 761 basic_bgzf_ostreambase(ostream_reference ostream_)762 basic_bgzf_ostreambase(ostream_reference ostream_) 763 : m_buf(ostream_) 764 { 765 this->init(&m_buf ); 766 }; 767 768 // returns the underlying zip ostream object rdbuf()769 bgzf_streambuf_type* rdbuf() { return &m_buf; }; 770 // returns the bgzf error state get_zerr()771 int get_zerr() const { return m_buf.get_err(); }; 772 // returns the uncompressed data crc get_crc()773 long get_crc() const { return m_buf.get_crc(); }; 774 // returns the compressed data size get_out_size()775 long get_out_size() const { return m_buf.get_out_size(); }; 776 // returns the uncompressed data size get_in_size()777 long get_in_size() const { return m_buf.get_in_size(); }; 778 779 private: 780 bgzf_streambuf_type m_buf; 781 }; 782 783 // -------------------------------------------------------------------------- 784 // Class basic_bgzf_istreambase 785 // -------------------------------------------------------------------------- 786 787 template< 788 typename Elem, 789 typename Tr = std::char_traits<Elem>, 790 typename ElemA = std::allocator<Elem>, 791 typename ByteT = char, 792 typename ByteAT = std::allocator<ByteT> 793 > 794 class basic_bgzf_istreambase : virtual public std::basic_ios<Elem,Tr> 795 { 796 public: 797 typedef std::basic_istream<Elem, Tr>& istream_reference; 798 typedef basic_unbgzf_streambuf<Elem, Tr, ElemA, ByteT, ByteAT> unbgzf_streambuf_type; 799 basic_bgzf_istreambase(istream_reference ostream_)800 basic_bgzf_istreambase(istream_reference ostream_) 801 : m_buf(ostream_) 802 { 803 this->init(&m_buf ); 804 }; 805 806 // returns the underlying unzip istream object rdbuf()807 unbgzf_streambuf_type* rdbuf() { return &m_buf; }; 808 809 // returns the bgzf error state get_zerr()810 int get_zerr() const { return m_buf.get_zerr(); }; 811 // returns the uncompressed data crc get_crc()812 long get_crc() const { return m_buf.get_crc(); }; 813 // returns the uncompressed data size get_out_size()814 long get_out_size() const { return m_buf.get_out_size(); }; 815 // returns the compressed data size get_in_size()816 long get_in_size() const { return m_buf.get_in_size(); }; 817 818 private: 819 unbgzf_streambuf_type m_buf; 820 }; 821 822 // -------------------------------------------------------------------------- 823 // Class basic_bgzf_ostream 824 // -------------------------------------------------------------------------- 825 826 template< 827 typename Elem, 828 typename Tr = std::char_traits<Elem>, 829 typename ElemA = std::allocator<Elem>, 830 typename ByteT = char, 831 typename ByteAT = std::allocator<ByteT> 832 > 833 class basic_bgzf_ostream : 834 public basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT>, 835 public std::basic_ostream<Elem,Tr> 836 { 837 public: 838 typedef basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_ostreambase_type; 839 typedef std::basic_ostream<Elem,Tr> ostream_type; 840 typedef ostream_type& ostream_reference; 841 basic_bgzf_ostream(ostream_reference ostream_)842 basic_bgzf_ostream(ostream_reference ostream_) : 843 bgzf_ostreambase_type(ostream_), 844 ostream_type(bgzf_ostreambase_type::rdbuf()) 845 {} 846 847 // flush inner buffer and zipper buffer zflush()848 basic_bgzf_ostream<Elem,Tr>& zflush() 849 { 850 this->flush(); this->rdbuf()->flush(); return *this; 851 }; 852 ~basic_bgzf_ostream()853 ~basic_bgzf_ostream() 854 { 855 this->rdbuf()->addFooter(); 856 } 857 858 private: 859 static void put_long(ostream_reference out_, unsigned long x_); 860 #ifdef _WIN32 _Add_vtordisp1()861 void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250 _Add_vtordisp2()862 void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250 863 #endif 864 }; 865 866 // -------------------------------------------------------------------------- 867 // Class basic_bgzf_istream 868 // -------------------------------------------------------------------------- 869 870 template< 871 typename Elem, 872 typename Tr = std::char_traits<Elem>, 873 typename ElemA = std::allocator<Elem>, 874 typename ByteT = char, 875 typename ByteAT = std::allocator<ByteT> 876 > 877 class basic_bgzf_istream : 878 public basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT>, 879 public std::basic_istream<Elem,Tr> 880 { 881 public: 882 typedef basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_istreambase_type; 883 typedef std::basic_istream<Elem,Tr> istream_type; 884 typedef istream_type & istream_reference; 885 typedef char byte_type; 886 basic_bgzf_istream(istream_reference istream_)887 basic_bgzf_istream(istream_reference istream_) : 888 bgzf_istreambase_type(istream_), 889 istream_type(bgzf_istreambase_type::rdbuf()), 890 m_is_gzip(false), 891 m_gbgzf_data_size(0) 892 {}; 893 894 // returns true if it is a gzip file is_gzip()895 bool is_gzip() const { return m_is_gzip; }; 896 // return data size check check_data_size()897 bool check_data_size() const { return this->get_out_size() == m_gbgzf_data_size; }; 898 899 // return the data size in the file get_gbgzf_data_size()900 long get_gbgzf_data_size() const { return m_gbgzf_data_size; }; 901 902 protected: 903 static void read_long(istream_reference in_, unsigned long& x_); 904 905 int check_header(); 906 bool m_is_gzip; 907 unsigned long m_gbgzf_data_size; 908 909 #ifdef _WIN32 910 private: _Add_vtordisp1()911 void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250 _Add_vtordisp2()912 void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250 913 #endif 914 }; 915 916 // =========================================================================== 917 // Typedefs 918 // =========================================================================== 919 920 // A typedef for basic_bgzf_ostream<char> 921 typedef basic_bgzf_ostream<char> bgzf_ostream; 922 // A typedef for basic_bgzf_ostream<wchar_t> 923 typedef basic_bgzf_ostream<wchar_t> bgzf_wostream; 924 // A typedef for basic_bgzf_istream<char> 925 typedef basic_bgzf_istream<char> bgzf_istream; 926 // A typedef for basic_bgzf_istream<wchart> 927 typedef basic_bgzf_istream<wchar_t> bgzf_wistream; 928 929 } // namespace seqan 930 931 #endif // INCLUDE_SEQAN_STREAM_IOSTREAM_BGZF_H_ 932