1 /* 2 zipstream Library License: 3 -------------------------- 4 5 The zlib/libpng License Copyright (c) 2003 Jonathan de Halleux. 6 7 This software is provided 'as-is', without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software. 8 9 Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions: 10 11 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required. 12 13 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software. 14 15 3. This notice may not be removed or altered from any source distribution 16 17 Author: Jonathan de Halleux, dehalleux@pelikhan.com, 2003 (original zlib stream) 18 Author: David Weese, dave.weese@gmail.com, 2014 (extension to parallel block-wise compression in bgzf format) 19 */ 20 21 #ifndef INCLUDE_SEQAN_STREAM_IOSTREAM_BGZF_H_ 22 #define INCLUDE_SEQAN_STREAM_IOSTREAM_BGZF_H_ 23 24 25 #include <vector> 26 #include <iostream> 27 #include <algorithm> 28 #include <zlib.h> 29 #include "iostream_zutil.h" 30 31 namespace seqan { 32 33 /// default gzip buffer size, 34 /// change this to suite your needs 35 const size_t default_buffer_size = 4096; 36 37 const unsigned BGZF_MAX_BLOCK_SIZE = 64 * 1024; 38 const unsigned BGZF_BLOCK_HEADER_LENGTH = 18; 39 const unsigned BGZF_BLOCK_FOOTER_LENGTH = 8; 40 const unsigned ZLIB_BLOCK_OVERHEAD = 5; // 5 bytes block overhead (see 3.2.4. at http://www.gzip.org/zlib/rfc-deflate.html) 41 42 // Reduce the maximal input size, such that the compressed data 43 // always fits in one block even for level Z_NO_COMPRESSION. 44 const unsigned BGZF_BLOCK_SIZE = BGZF_MAX_BLOCK_SIZE - BGZF_BLOCK_HEADER_LENGTH - BGZF_BLOCK_FOOTER_LENGTH - ZLIB_BLOCK_OVERHEAD; 45 46 /// Compression strategy, see bgzf doc. 47 enum EStrategy 48 { 49 StrategyFiltered = 1, 50 StrategyHuffmanOnly = 2, 51 DefaultStrategy = 0 52 }; 53 54 template< 55 typename Elem, 56 typename Tr = std::char_traits<Elem>, 57 typename ElemA = std::allocator<Elem>, 58 typename ByteT = char, 59 typename ByteAT = std::allocator<ByteT> 60 > 61 class basic_bgzf_streambuf : public std::basic_streambuf<Elem, Tr> 62 { 63 public: 64 typedef std::basic_ostream<Elem, Tr>& ostream_reference; 65 typedef ElemA char_allocator_type; 66 typedef ByteT byte_type; 67 typedef ByteAT byte_allocator_type; 68 typedef byte_type* byte_buffer_type; 69 typedef typename Tr::char_type char_type; 70 typedef typename Tr::int_type int_type; 71 72 typedef ConcurrentQueue<size_t, Suspendable<Limit> > TJobQueue; 73 74 struct OutputBuffer 75 { 76 char buffer[BGZF_MAX_BLOCK_SIZE]; 77 size_t size; 78 }; 79 80 struct BufferWriter 81 { 82 ostream_reference ostream; 83 BufferWriterBufferWriter84 BufferWriter(ostream_reference ostream) : 85 ostream(ostream) 86 {} 87 operatorBufferWriter88 bool operator() (OutputBuffer const & outputBuffer) 89 { 90 ostream.write(outputBuffer.buffer, outputBuffer.size); 91 return ostream.good(); 92 } 93 }; 94 95 struct CompressionJob 96 { 97 typedef std::vector<char_type, char_allocator_type> TBuffer; 98 99 TBuffer buffer; 100 size_t size; 101 OutputBuffer *outputBuffer; 102 CompressionJobCompressionJob103 CompressionJob() : 104 buffer(BGZF_BLOCK_SIZE / sizeof(char_type), 0), 105 size(0), 106 outputBuffer(NULL) 107 {} 108 }; 109 110 // string of recycable jobs 111 size_t numThreads; 112 size_t numJobs; 113 String<CompressionJob> jobs; 114 TJobQueue jobQueue; 115 TJobQueue idleQueue; 116 Serializer< 117 OutputBuffer, 118 BufferWriter> serializer; 119 120 size_t currentJobId; 121 bool currentJobAvail; 122 123 124 struct CompressionThread 125 { 126 basic_bgzf_streambuf *streamBuf; 127 CompressionContext<BgzfFile> compressionCtx; 128 size_t threadNum; 129 operatorCompressionThread130 void operator()() 131 { 132 ScopedReadLock<TJobQueue> readLock(streamBuf->jobQueue); 133 ScopedWriteLock<TJobQueue> writeLock(streamBuf->idleQueue); 134 135 // wait for a new job to become available 136 bool success = true; 137 while (success) 138 { 139 size_t jobId = -1; 140 if (!popFront(jobId, streamBuf->jobQueue)) 141 return; 142 143 CompressionJob &job = streamBuf->jobs[jobId]; 144 145 // compress block with zlib 146 job.outputBuffer->size = _compressBlock( 147 job.outputBuffer->buffer, sizeof(job.outputBuffer->buffer), 148 &job.buffer[0], job.size, compressionCtx); 149 150 success = releaseValue(streamBuf->serializer, job.outputBuffer); 151 appendValue(streamBuf->idleQueue, jobId); 152 } 153 } 154 }; 155 156 // array of worker threads 157 Thread<CompressionThread> *threads; 158 159 basic_bgzf_streambuf(ostream_reference ostream_, 160 size_t numThreads = 16, 161 size_t jobsPerThread = 8) : numThreads(numThreads)162 numThreads(numThreads), 163 numJobs(numThreads * jobsPerThread), 164 jobQueue(numJobs), 165 idleQueue(numJobs), 166 serializer(ostream_, numThreads * jobsPerThread) 167 { 168 resize(jobs, numJobs, Exact()); 169 currentJobId = 0; 170 171 lockWriting(jobQueue); 172 lockReading(idleQueue); 173 setReaderWriterCount(jobQueue, numThreads, 1); 174 setReaderWriterCount(idleQueue, 1, numThreads); 175 176 for (unsigned i = 0; i < numJobs; ++i) 177 { 178 bool success = appendValue(idleQueue, i); 179 ignoreUnusedVariableWarning(success); 180 SEQAN_ASSERT(success); 181 } 182 183 threads = new Thread<CompressionThread>[numThreads]; 184 for (unsigned i = 0; i < numThreads; ++i) 185 { 186 threads[i].worker.streamBuf = this; 187 threads[i].worker.threadNum = i; 188 run(threads[i]); 189 } 190 191 currentJobAvail = popFront(currentJobId, idleQueue); 192 SEQAN_ASSERT(currentJobAvail); 193 194 CompressionJob &job = jobs[currentJobId]; 195 job.outputBuffer = aquireValue(serializer); 196 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1)); 197 } 198 ~basic_bgzf_streambuf()199 ~basic_bgzf_streambuf() 200 { 201 // the buffer is now (after addFooter()) and flush will append the empty EOF marker 202 flush(true); 203 204 unlockWriting(jobQueue); 205 unlockReading(idleQueue); 206 207 for (unsigned i = 0; i < numThreads; ++i) 208 waitFor(threads[i]); 209 delete[] threads; 210 } 211 compressBuffer(size_t size)212 bool compressBuffer(size_t size) 213 { 214 // submit current job 215 if (currentJobAvail) 216 { 217 jobs[currentJobId].size = size; 218 appendValue(jobQueue, currentJobId); 219 } 220 221 // recycle existing idle job 222 if (!(currentJobAvail = popFront(currentJobId, idleQueue))) 223 return false; 224 225 jobs[currentJobId].outputBuffer = aquireValue(serializer); 226 227 return serializer; 228 } 229 overflow(int_type c)230 int_type overflow(int_type c) 231 { 232 int w = static_cast<int>(this->pptr() - this->pbase()); 233 if (c != EOF) 234 { 235 *this->pptr() = c; 236 ++w; 237 } 238 if (compressBuffer(w)) 239 { 240 CompressionJob &job = jobs[currentJobId]; 241 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1)); 242 return c; 243 } 244 else 245 { 246 return EOF; 247 } 248 } 249 250 std::streamsize flush(bool flushEmptyBuffer = false) 251 { 252 int w = static_cast<int>(this->pptr() - this->pbase()); 253 if ((w != 0 || flushEmptyBuffer) && compressBuffer(w)) 254 { 255 CompressionJob &job = jobs[currentJobId]; 256 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1)); 257 } 258 else 259 { 260 w = 0; 261 } 262 263 // wait for running compressor threads 264 waitForMinSize(idleQueue, numJobs - 1); 265 266 serializer.worker.ostream.flush(); 267 return w; 268 } 269 sync()270 int sync() 271 { 272 if (this->pptr() != this->pbase()) 273 { 274 int c = overflow(EOF); 275 if (c == EOF) 276 return -1; 277 } 278 return 0; 279 } 280 addFooter()281 void addFooter() 282 { 283 // we flush the filled buffer here, so that an empty (EOF) buffer is flushed in the d'tor 284 if (this->pptr() != this->pbase()) 285 overflow(EOF); 286 } 287 288 /// returns a reference to the output stream get_ostream()289 ostream_reference get_ostream() const { return serializer.worker.ostream; }; 290 }; 291 292 template< 293 typename Elem, 294 typename Tr = std::char_traits<Elem>, 295 typename ElemA = std::allocator<Elem>, 296 typename ByteT = char, 297 typename ByteAT = std::allocator<ByteT> 298 > 299 class basic_unbgzf_streambuf : 300 public std::basic_streambuf<Elem, Tr> 301 { 302 public: 303 typedef std::basic_istream<Elem, Tr>& istream_reference; 304 typedef ElemA char_allocator_type; 305 typedef ByteT byte_type; 306 typedef ByteAT byte_allocator_type; 307 typedef byte_type* byte_buffer_type; 308 typedef typename Tr::char_type char_type; 309 typedef typename Tr::int_type int_type; 310 typedef typename Tr::off_type off_type; 311 typedef typename Tr::pos_type pos_type; 312 313 typedef std::vector<char_type, char_allocator_type> TBuffer; 314 typedef ConcurrentQueue<int, Suspendable<Limit> > TJobQueue; 315 316 static const size_t MAX_PUTBACK = 4; 317 318 struct Serializer 319 { 320 istream_reference istream; 321 Mutex lock; 322 IOError *error; 323 off_type fileOfs; 324 SerializerSerializer325 Serializer(istream_reference istream) : 326 istream(istream), 327 lock(false), 328 error(NULL), 329 fileOfs(0u) 330 {} 331 ~SerializerSerializer332 ~Serializer() 333 { 334 delete error; 335 } 336 }; 337 338 Serializer serializer; 339 340 struct DecompressionJob 341 { 342 typedef std::vector<byte_type, byte_allocator_type> TInputBuffer; 343 344 TInputBuffer inputBuffer; 345 TBuffer buffer; 346 off_type fileOfs; 347 int size; 348 unsigned compressedSize; 349 350 CriticalSection cs; 351 Condition readyEvent; 352 bool ready; 353 DecompressionJobDecompressionJob354 DecompressionJob() : 355 inputBuffer(BGZF_MAX_BLOCK_SIZE, 0), 356 buffer(MAX_PUTBACK + BGZF_MAX_BLOCK_SIZE / sizeof(char_type), 0), 357 fileOfs(), 358 size(0), 359 readyEvent(cs), 360 ready(true) 361 {} 362 DecompressionJobDecompressionJob363 DecompressionJob(DecompressionJob const &other) : 364 inputBuffer(other.inputBuffer), 365 buffer(other.buffer), 366 fileOfs(other.fileOfs), 367 size(other.size), 368 readyEvent(cs), 369 ready(other.ready) 370 {} 371 }; 372 373 // string of recycable jobs 374 size_t numThreads; 375 size_t numJobs; 376 String<DecompressionJob> jobs; 377 TJobQueue runningQueue; 378 TJobQueue todoQueue; 379 int currentJobId; 380 381 struct DecompressionThread 382 { 383 basic_unbgzf_streambuf *streamBuf; 384 CompressionContext<BgzfFile> compressionCtx; 385 operatorDecompressionThread386 void operator()() 387 { 388 ScopedReadLock<TJobQueue> readLock(streamBuf->todoQueue); 389 ScopedWriteLock<TJobQueue> writeLock(streamBuf->runningQueue); 390 391 // wait for a new job to become available 392 while (true) 393 { 394 int jobId = -1; 395 if (!popFront(jobId, streamBuf->todoQueue)) 396 return; 397 398 DecompressionJob &job = streamBuf->jobs[jobId]; 399 size_t tailLen = 0; 400 401 // typically the idle queue contains only ready jobs 402 // however, if seek() fast forwards running jobs into the todoQueue 403 // the caller defers the task of waiting to the decompression threads 404 if (!job.ready) 405 { 406 ScopedLock<CriticalSection> lock(job.cs); 407 if (!job.ready) 408 { 409 waitFor(job.readyEvent); 410 job.ready = true; 411 } 412 } 413 414 { 415 ScopedLock<Mutex> scopedLock(streamBuf->serializer.lock); 416 417 if (streamBuf->serializer.error != NULL) 418 return; 419 420 // remember start offset (for tellg later) 421 job.fileOfs = streamBuf->serializer.fileOfs; 422 job.size = -1; 423 job.compressedSize = 0; 424 425 // only load if not at EOF 426 if (job.fileOfs != -1) 427 { 428 // read header 429 streamBuf->serializer.istream.read( 430 (char*)&job.inputBuffer[0], 431 BGZF_BLOCK_HEADER_LENGTH); 432 433 if (!streamBuf->serializer.istream.good()) 434 { 435 streamBuf->serializer.fileOfs = -1; 436 if (streamBuf->serializer.istream.eof()) 437 goto eofSkip; 438 streamBuf->serializer.error = new IOError("Stream read error."); 439 return; 440 } 441 442 // check header 443 if (!_bgzfCheckHeader(&job.inputBuffer[0])) 444 { 445 streamBuf->serializer.fileOfs = -1; 446 streamBuf->serializer.error = new IOError("Invalid BGZF block header."); 447 return; 448 } 449 450 // extract length of compressed data 451 tailLen = _bgzfUnpack16(&job.inputBuffer[0] + 16) + 1u - BGZF_BLOCK_HEADER_LENGTH; 452 453 // read compressed data and tail 454 streamBuf->serializer.istream.read( 455 (char*)&job.inputBuffer[0] + BGZF_BLOCK_HEADER_LENGTH, 456 tailLen); 457 458 if (!streamBuf->serializer.istream.good()) 459 { 460 streamBuf->serializer.fileOfs = -1; 461 if (streamBuf->serializer.istream.eof()) 462 goto eofSkip; 463 streamBuf->serializer.error = new IOError("Stream read error."); 464 return; 465 } 466 467 job.compressedSize = BGZF_BLOCK_HEADER_LENGTH + tailLen; 468 streamBuf->serializer.fileOfs += job.compressedSize; 469 job.ready = false; 470 471 eofSkip: 472 streamBuf->serializer.istream.clear( 473 streamBuf->serializer.istream.rdstate() & ~std::ios_base::failbit); 474 } 475 476 if (!appendValue(streamBuf->runningQueue, jobId)) 477 { 478 // signal that job is ready 479 { 480 ScopedLock<CriticalSection> lock(job.cs); 481 job.ready = true; 482 signal(job.readyEvent); 483 } 484 return; 485 } 486 } 487 488 if (!job.ready) 489 { 490 // decompress block 491 job.size = _decompressBlock( 492 &job.buffer[0] + MAX_PUTBACK, capacity(job.buffer), 493 &job.inputBuffer[0], job.compressedSize, compressionCtx); 494 495 // signal that job is ready 496 { 497 ScopedLock<CriticalSection> lock(job.cs); 498 job.ready = true; 499 signal(job.readyEvent); 500 } 501 } 502 } 503 } 504 }; 505 506 // array of worker threads 507 Thread<DecompressionThread> *threads; 508 TBuffer putbackBuffer; 509 510 basic_unbgzf_streambuf(istream_reference istream_, 511 size_t numThreads = 16, 512 size_t jobsPerThread = 8) : serializer(istream_)513 serializer(istream_), 514 numThreads(numThreads), 515 numJobs(numThreads * jobsPerThread), 516 runningQueue(numJobs), 517 todoQueue(numJobs), 518 putbackBuffer(MAX_PUTBACK) 519 { 520 resize(jobs, numJobs, Exact()); 521 currentJobId = -1; 522 523 lockReading(runningQueue); 524 lockWriting(todoQueue); 525 setReaderWriterCount(runningQueue, 1, numThreads); 526 setReaderWriterCount(todoQueue, numThreads, 1); 527 528 for (unsigned i = 0; i < numJobs; ++i) 529 { 530 bool success = appendValue(todoQueue, i); 531 ignoreUnusedVariableWarning(success); 532 SEQAN_ASSERT(success); 533 } 534 535 threads = new Thread<DecompressionThread>[numThreads]; 536 for (unsigned i = 0; i < numThreads; ++i) 537 { 538 threads[i].worker.streamBuf = this; 539 run(threads[i]); 540 } 541 } 542 ~basic_unbgzf_streambuf()543 ~basic_unbgzf_streambuf() 544 { 545 unlockWriting(todoQueue); 546 unlockReading(runningQueue); 547 548 for (unsigned i = 0; i < numThreads; ++i) 549 waitFor(threads[i]); 550 delete[] threads; 551 } 552 underflow()553 int_type underflow() 554 { 555 // no need to use the next buffer? 556 if (this->gptr() && this->gptr() < this->egptr()) 557 return Tr::to_int_type(*this->gptr()); 558 559 size_t putback = this->gptr() - this->eback(); 560 if (putback > MAX_PUTBACK) 561 putback = MAX_PUTBACK; 562 563 // save at most MAX_PUTBACK characters from previous page to putback buffer 564 if (putback != 0) 565 std::copy( 566 this->gptr() - putback, 567 this->gptr(), 568 &putbackBuffer[0]); 569 570 if (currentJobId >= 0) 571 appendValue(todoQueue, currentJobId); 572 573 while (true) 574 { 575 if (!popFront(currentJobId, runningQueue)) 576 { 577 currentJobId = -1; 578 SEQAN_ASSERT(serializer.error != NULL); 579 if (serializer.error != NULL) 580 throw *serializer.error; 581 return EOF; 582 } 583 584 DecompressionJob &job = jobs[currentJobId]; 585 586 // restore putback buffer 587 this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1)); 588 if (putback != 0) 589 std::copy( 590 &putbackBuffer[0], 591 &putbackBuffer[0] + putback, 592 &job.buffer[0] + (MAX_PUTBACK - putback)); 593 594 // wait for the end of decompression 595 { 596 ScopedLock<CriticalSection> lock(job.cs); 597 if (!job.ready) 598 waitFor(job.readyEvent); 599 } 600 601 size_t size = (job.size != -1)? job.size : 0; 602 603 // reset buffer pointers 604 this->setg( 605 &job.buffer[0] + (MAX_PUTBACK - putback), // beginning of putback area 606 &job.buffer[0] + MAX_PUTBACK, // read position 607 &job.buffer[0] + (MAX_PUTBACK + size)); // end of buffer 608 609 if (job.size == -1) 610 return EOF; 611 else if (job.size > 0) 612 return Tr::to_int_type(*this->gptr()); // return next character 613 } 614 } 615 seekoff(off_type ofs,std::ios_base::seekdir dir,std::ios_base::openmode openMode)616 pos_type seekoff(off_type ofs, std::ios_base::seekdir dir, std::ios_base::openmode openMode) 617 { 618 if ((openMode & (std::ios_base::in | std::ios_base::out)) == std::ios_base::in) 619 { 620 if (dir == std::ios_base::cur && ofs >= 0) 621 { 622 // forward delta seek 623 while (currentJobId < 0 || this->egptr() - this->gptr() < ofs) 624 { 625 ofs -= this->egptr() - this->gptr(); 626 if (this->underflow() == EOF) 627 break; 628 } 629 630 if (currentJobId >= 0 && ofs <= this->egptr() - this->gptr()) 631 { 632 DecompressionJob &job = jobs[currentJobId]; 633 634 // reset buffer pointers 635 this->setg( 636 this->eback(), // beginning of putback area 637 this->gptr() + ofs, // read position 638 this->egptr()); // end of buffer 639 640 if (this->gptr() != this->egptr()) 641 return pos_type((job.fileOfs << 16) + ((this->gptr() - &job.buffer[MAX_PUTBACK]))); 642 else 643 return pos_type((job.fileOfs + job.compressedSize) << 16); 644 } 645 646 } 647 else if (dir == std::ios_base::beg) 648 { 649 // random seek 650 std::streampos destFileOfs = ofs >> 16; 651 652 // are we in the same block? 653 if (currentJobId >= 0 && jobs[currentJobId].fileOfs == (off_type)destFileOfs) 654 { 655 DecompressionJob &job = jobs[currentJobId]; 656 657 // reset buffer pointers 658 this->setg( 659 this->eback(), // beginning of putback area 660 &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)), // read position 661 this->egptr()); // end of buffer 662 return ofs; 663 } 664 665 // ok, different block 666 { 667 ScopedLock<Mutex> scopedLock(serializer.lock); 668 669 // remove all running jobs and put them in the idle queue unless we 670 // find our seek target 671 672 if (currentJobId >= 0) 673 appendValue(todoQueue, currentJobId); 674 675 // Note that if we are here the current job does not represent the sought block. 676 // Hence if the running queue is empty we need to explicitly unset the jobId, 677 // otherwise we would not update the serializers istream pointer to the correct position. 678 if (empty(runningQueue)) 679 currentJobId = -1; 680 681 // empty is thread-safe in serializer.lock 682 while (!empty(runningQueue)) 683 { 684 popFront(currentJobId, runningQueue); 685 686 if (jobs[currentJobId].fileOfs == (off_type)destFileOfs) 687 break; 688 689 // push back useless job 690 appendValue(todoQueue, currentJobId); 691 currentJobId = -1; 692 } 693 694 if (currentJobId == -1) 695 { 696 SEQAN_ASSERT(empty(runningQueue)); 697 serializer.istream.clear(serializer.istream.rdstate() & ~std::ios_base::eofbit); 698 if (serializer.istream.rdbuf()->pubseekpos(destFileOfs, std::ios_base::in) == destFileOfs) 699 serializer.fileOfs = destFileOfs; 700 else 701 currentJobId = -2; // temporarily signals a seek error 702 } 703 } 704 705 // if our block wasn't in the running queue yet, it should now 706 // be the first that falls out after modifying serializer.fileOfs 707 if (currentJobId == -1) 708 popFront(currentJobId, runningQueue); 709 else if (currentJobId == -2) 710 currentJobId = -1; 711 712 if (currentJobId >= 0) 713 { 714 // wait for the end of decompression 715 DecompressionJob &job = jobs[currentJobId]; 716 { 717 ScopedLock<CriticalSection> lock(job.cs); 718 if (!job.ready) 719 waitFor(job.readyEvent); 720 } 721 722 SEQAN_ASSERT_EQ(job.fileOfs, (off_type)destFileOfs); 723 724 // reset buffer pointers 725 this->setg( 726 &job.buffer[0] + MAX_PUTBACK, // no putback area 727 &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)), // read position 728 &job.buffer[0] + (MAX_PUTBACK + job.size)); // end of buffer 729 return ofs; 730 } 731 } 732 } 733 return pos_type(off_type(-1)); 734 } 735 seekpos(pos_type pos,std::ios_base::openmode openMode)736 pos_type seekpos(pos_type pos, std::ios_base::openmode openMode) 737 { 738 return seekoff(off_type(pos), std::ios_base::beg, openMode); 739 } 740 741 /// returns the compressed input istream get_istream()742 istream_reference get_istream() { return serializer.istream;}; 743 }; 744 745 /* \brief Base class for zip ostreams 746 747 Contains a basic_bgzf_streambuf. 748 */ 749 template< 750 typename Elem, 751 typename Tr = std::char_traits<Elem>, 752 typename ElemA = std::allocator<Elem>, 753 typename ByteT = char, 754 typename ByteAT = std::allocator<ByteT> 755 > 756 class basic_bgzf_ostreambase : virtual public std::basic_ios<Elem,Tr> 757 { 758 public: 759 typedef std::basic_ostream<Elem, Tr>& ostream_reference; 760 typedef basic_bgzf_streambuf< 761 Elem, 762 Tr, 763 ElemA, 764 ByteT, 765 ByteAT 766 > bgzf_streambuf_type; 767 basic_bgzf_ostreambase(ostream_reference ostream_)768 basic_bgzf_ostreambase(ostream_reference ostream_) 769 : m_buf(ostream_) 770 { 771 this->init(&m_buf ); 772 }; 773 774 /// returns the underlying zip ostream object rdbuf()775 bgzf_streambuf_type* rdbuf() { return &m_buf; }; 776 777 /// returns the bgzf error state get_zerr()778 int get_zerr() const { return m_buf.get_err();}; 779 /// returns the uncompressed data crc get_crc()780 long get_crc() const { return m_buf.get_crc();}; 781 /// returns the compressed data size get_out_size()782 long get_out_size() const { return m_buf.get_out_size();}; 783 /// returns the uncompressed data size get_in_size()784 long get_in_size() const { return m_buf.get_in_size();}; 785 private: 786 bgzf_streambuf_type m_buf; 787 }; 788 789 /* \brief Base class for unzip istreams 790 791 Contains a basic_unbgzf_streambuf. 792 */ 793 template< 794 typename Elem, 795 typename Tr = std::char_traits<Elem>, 796 typename ElemA = std::allocator<Elem>, 797 typename ByteT = char, 798 typename ByteAT = std::allocator<ByteT> 799 > 800 class basic_bgzf_istreambase : virtual public std::basic_ios<Elem,Tr> 801 { 802 public: 803 typedef std::basic_istream<Elem, Tr>& istream_reference; 804 typedef basic_unbgzf_streambuf< 805 Elem, 806 Tr, 807 ElemA, 808 ByteT, 809 ByteAT 810 > unbgzf_streambuf_type; 811 basic_bgzf_istreambase(istream_reference ostream_)812 basic_bgzf_istreambase(istream_reference ostream_) 813 : m_buf(ostream_) 814 { 815 this->init(&m_buf ); 816 }; 817 818 /// returns the underlying unzip istream object rdbuf()819 unbgzf_streambuf_type* rdbuf() { return &m_buf; }; 820 821 /// returns the bgzf error state get_zerr()822 int get_zerr() const { return m_buf.get_zerr();}; 823 /// returns the uncompressed data crc get_crc()824 long get_crc() const { return m_buf.get_crc();}; 825 /// returns the uncompressed data size get_out_size()826 long get_out_size() const { return m_buf.get_out_size();}; 827 /// returns the compressed data size get_in_size()828 long get_in_size() const { return m_buf.get_in_size();}; 829 private: 830 unbgzf_streambuf_type m_buf; 831 }; 832 833 /*brief A zipper ostream 834 835 This class is a ostream decorator that behaves 'almost' like any other ostream. 836 837 At construction, it takes any ostream that shall be used to output of the compressed data. 838 839 When finished, you need to call the special method zflush or call the destructor 840 to flush all the intermidiate streams. 841 842 Example: 843 \code 844 // creating the target zip string, could be a fstream 845 ostringstream ostringstream_; 846 // creating the zip layer 847 bgzf_ostream zipper(ostringstream_); 848 849 850 // writing data 851 zipper<<f<<" "<<d<<" "<<ui<<" "<<ul<<" "<<us<<" "<<c<<" "<<dum; 852 // zip ostream needs special flushing... 853 zipper.zflush(); 854 \endcode 855 */ 856 template< 857 typename Elem, 858 typename Tr = std::char_traits<Elem>, 859 typename ElemA = std::allocator<Elem>, 860 typename ByteT = char, 861 typename ByteAT = std::allocator<ByteT> 862 > 863 class basic_bgzf_ostream : 864 public basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT>, 865 public std::basic_ostream<Elem,Tr> 866 { 867 public: 868 typedef basic_bgzf_ostreambase< 869 Elem,Tr,ElemA,ByteT,ByteAT> bgzf_ostreambase_type; 870 typedef std::basic_ostream<Elem,Tr> ostream_type; 871 typedef ostream_type& ostream_reference; 872 basic_bgzf_ostream(ostream_reference ostream_)873 basic_bgzf_ostream(ostream_reference ostream_) 874 : 875 bgzf_ostreambase_type(ostream_), 876 ostream_type(bgzf_ostreambase_type::rdbuf()) 877 {} 878 879 /// flush inner buffer and zipper buffer zflush()880 basic_bgzf_ostream<Elem,Tr>& zflush() 881 { 882 this->flush(); this->rdbuf()->flush(); return *this; 883 }; 884 ~basic_bgzf_ostream()885 ~basic_bgzf_ostream() 886 { 887 this->rdbuf()->addFooter(); 888 } 889 890 private: 891 static void put_long(ostream_reference out_, unsigned long x_); 892 #ifdef _WIN32 _Add_vtordisp1()893 void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250 _Add_vtordisp2()894 void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250 895 #endif 896 }; 897 898 /* \brief A zipper istream 899 900 This class is a istream decorator that behaves 'almost' like any other ostream. 901 902 At construction, it takes any istream that shall be used to input of the compressed data. 903 904 Simlpe example: 905 \code 906 // create a stream on zip string 907 istringstream istringstream_( ostringstream_.str()); 908 // create unzipper istream 909 bgzf_istream unzipper( istringstream_); 910 911 // read and unzip 912 unzipper>>f_r>>d_r>>ui_r>>ul_r>>us_r>>c_r>>dum_r; 913 \endcode 914 */ 915 template< 916 typename Elem, 917 typename Tr = std::char_traits<Elem>, 918 typename ElemA = std::allocator<Elem>, 919 typename ByteT = char, 920 typename ByteAT = std::allocator<ByteT> 921 > 922 class basic_bgzf_istream : 923 public basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT>, 924 public std::basic_istream<Elem,Tr> 925 { 926 public: 927 typedef basic_bgzf_istreambase< 928 Elem,Tr,ElemA,ByteT,ByteAT> bgzf_istreambase_type; 929 typedef std::basic_istream<Elem,Tr> istream_type; 930 typedef istream_type& istream_reference; 931 typedef char byte_type; 932 basic_bgzf_istream(istream_reference istream_)933 basic_bgzf_istream(istream_reference istream_) 934 : 935 bgzf_istreambase_type(istream_), 936 istream_type(bgzf_istreambase_type::rdbuf()), 937 m_is_gzip(false), 938 m_gbgzf_data_size(0) 939 {}; 940 941 /// returns true if it is a gzip file is_gzip()942 bool is_gzip() const { return m_is_gzip;}; 943 /// return data size check check_data_size()944 bool check_data_size() const { return this->get_out_size() == m_gbgzf_data_size;}; 945 946 /// return the data size in the file get_gbgzf_data_size()947 long get_gbgzf_data_size() const { return m_gbgzf_data_size;}; 948 protected: 949 static void read_long(istream_reference in_, unsigned long& x_); 950 951 int check_header(); 952 bool m_is_gzip; 953 unsigned long m_gbgzf_data_size; 954 955 #ifdef _WIN32 956 private: _Add_vtordisp1()957 void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250 _Add_vtordisp2()958 void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250 959 #endif 960 }; 961 962 /// A typedef for basic_bgzf_ostream<char> 963 typedef basic_bgzf_ostream<char> bgzf_ostream; 964 /// A typedef for basic_bgzf_ostream<wchar_t> 965 typedef basic_bgzf_ostream<wchar_t> bgzf_wostream; 966 /// A typedef for basic_bgzf_istream<char> 967 typedef basic_bgzf_istream<char> bgzf_istream; 968 /// A typedef for basic_bgzf_istream<wchart> 969 typedef basic_bgzf_istream<wchar_t> bgzf_wistream; 970 971 } // namespace seqan 972 973 #include "iostream_bgzf_impl.h" 974 975 #endif // INCLUDE_SEQAN_STREAM_IOSTREAM_BGZF_H_ 976