1 /*
2 zipstream Library License:
3 --------------------------
4 
5 The zlib/libpng License Copyright (c) 2003 Jonathan de Halleux.
6 
7 This software is provided 'as-is', without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software.
8 
9 Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions:
10 
11 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
12 
13 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
14 
15 3. This notice may not be removed or altered from any source distribution
16 
17 Author: Jonathan de Halleux, dehalleux@pelikhan.com, 2003   (original zlib stream)
18 Author: David Weese, dave.weese@gmail.com, 2014             (extension to parallel block-wise compression in bgzf format)
19 */
20 
21 #ifndef INCLUDE_SEQAN_STREAM_IOSTREAM_BGZF_H_
22 #define INCLUDE_SEQAN_STREAM_IOSTREAM_BGZF_H_
23 
24 
25 #include <vector>
26 #include <iostream>
27 #include <algorithm>
28 #include <zlib.h>
29 #include "iostream_zutil.h"
30 
31 namespace seqan {
32 
33 /// default gzip buffer size,
34 /// change this to suite your needs
35 const size_t default_buffer_size = 4096;
36 
37 const unsigned BGZF_MAX_BLOCK_SIZE = 64 * 1024;
38 const unsigned BGZF_BLOCK_HEADER_LENGTH = 18;
39 const unsigned BGZF_BLOCK_FOOTER_LENGTH = 8;
40 const unsigned ZLIB_BLOCK_OVERHEAD = 5; // 5 bytes block overhead (see 3.2.4. at http://www.gzip.org/zlib/rfc-deflate.html)
41 
42 // Reduce the maximal input size, such that the compressed data
43 // always fits in one block even for level Z_NO_COMPRESSION.
44 const unsigned BGZF_BLOCK_SIZE = BGZF_MAX_BLOCK_SIZE - BGZF_BLOCK_HEADER_LENGTH - BGZF_BLOCK_FOOTER_LENGTH - ZLIB_BLOCK_OVERHEAD;
45 
46 /// Compression strategy, see bgzf doc.
47 enum EStrategy
48 {
49     StrategyFiltered = 1,
50     StrategyHuffmanOnly = 2,
51     DefaultStrategy = 0
52 };
53 
54 template<
55     typename Elem,
56     typename Tr = std::char_traits<Elem>,
57     typename ElemA = std::allocator<Elem>,
58     typename ByteT = char,
59     typename ByteAT = std::allocator<ByteT>
60 >
61 class basic_bgzf_streambuf : public std::basic_streambuf<Elem, Tr>
62 {
63 public:
64     typedef std::basic_ostream<Elem, Tr>& ostream_reference;
65     typedef ElemA char_allocator_type;
66     typedef ByteT byte_type;
67     typedef ByteAT byte_allocator_type;
68     typedef byte_type* byte_buffer_type;
69     typedef typename Tr::char_type char_type;
70     typedef typename Tr::int_type int_type;
71 
72     typedef ConcurrentQueue<size_t, Suspendable<Limit> > TJobQueue;
73 
74     struct OutputBuffer
75     {
76         char    buffer[BGZF_MAX_BLOCK_SIZE];
77         size_t  size;
78     };
79 
80     struct BufferWriter
81     {
82         ostream_reference ostream;
83 
BufferWriterBufferWriter84         BufferWriter(ostream_reference ostream) :
85             ostream(ostream)
86         {}
87 
operatorBufferWriter88         bool operator() (OutputBuffer const & outputBuffer)
89         {
90             ostream.write(outputBuffer.buffer, outputBuffer.size);
91             return ostream.good();
92         }
93     };
94 
95     struct CompressionJob
96     {
97         typedef std::vector<char_type, char_allocator_type> TBuffer;
98 
99         TBuffer         buffer;
100         size_t          size;
101         OutputBuffer    *outputBuffer;
102 
CompressionJobCompressionJob103         CompressionJob() :
104             buffer(BGZF_BLOCK_SIZE / sizeof(char_type), 0),
105             size(0),
106             outputBuffer(NULL)
107         {}
108     };
109 
110     // string of recycable jobs
111     size_t                  numThreads;
112     size_t                  numJobs;
113     String<CompressionJob>  jobs;
114     TJobQueue               jobQueue;
115     TJobQueue               idleQueue;
116     Serializer<
117         OutputBuffer,
118         BufferWriter>       serializer;
119 
120     size_t                  currentJobId;
121     bool                    currentJobAvail;
122 
123 
124     struct CompressionThread
125     {
126         basic_bgzf_streambuf            *streamBuf;
127         CompressionContext<BgzfFile>    compressionCtx;
128         size_t                          threadNum;
129 
operatorCompressionThread130         void operator()()
131         {
132             ScopedReadLock<TJobQueue> readLock(streamBuf->jobQueue);
133             ScopedWriteLock<TJobQueue> writeLock(streamBuf->idleQueue);
134 
135             // wait for a new job to become available
136             bool success = true;
137             while (success)
138             {
139                 size_t jobId = -1;
140                 if (!popFront(jobId, streamBuf->jobQueue))
141                     return;
142 
143                 CompressionJob &job = streamBuf->jobs[jobId];
144 
145                 // compress block with zlib
146                 job.outputBuffer->size = _compressBlock(
147                     job.outputBuffer->buffer, sizeof(job.outputBuffer->buffer),
148                     &job.buffer[0], job.size, compressionCtx);
149 
150                 success = releaseValue(streamBuf->serializer, job.outputBuffer);
151                 appendValue(streamBuf->idleQueue, jobId);
152             }
153         }
154     };
155 
156     // array of worker threads
157     Thread<CompressionThread>   *threads;
158 
159     basic_bgzf_streambuf(ostream_reference ostream_,
160                          size_t numThreads = 16,
161                          size_t jobsPerThread = 8) :
numThreads(numThreads)162         numThreads(numThreads),
163         numJobs(numThreads * jobsPerThread),
164         jobQueue(numJobs),
165         idleQueue(numJobs),
166         serializer(ostream_, numThreads * jobsPerThread)
167     {
168         resize(jobs, numJobs, Exact());
169         currentJobId = 0;
170 
171         lockWriting(jobQueue);
172         lockReading(idleQueue);
173         setReaderWriterCount(jobQueue, numThreads, 1);
174         setReaderWriterCount(idleQueue, 1, numThreads);
175 
176         for (unsigned i = 0; i < numJobs; ++i)
177         {
178             bool success = appendValue(idleQueue, i);
179             ignoreUnusedVariableWarning(success);
180             SEQAN_ASSERT(success);
181         }
182 
183         threads = new Thread<CompressionThread>[numThreads];
184         for (unsigned i = 0; i < numThreads; ++i)
185         {
186             threads[i].worker.streamBuf = this;
187             threads[i].worker.threadNum = i;
188             run(threads[i]);
189         }
190 
191         currentJobAvail = popFront(currentJobId, idleQueue);
192         SEQAN_ASSERT(currentJobAvail);
193 
194         CompressionJob &job = jobs[currentJobId];
195         job.outputBuffer = aquireValue(serializer);
196         this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
197     }
198 
~basic_bgzf_streambuf()199     ~basic_bgzf_streambuf()
200     {
201         // the buffer is now (after addFooter()) and flush will append the empty EOF marker
202         flush(true);
203 
204         unlockWriting(jobQueue);
205         unlockReading(idleQueue);
206 
207         for (unsigned i = 0; i < numThreads; ++i)
208             waitFor(threads[i]);
209         delete[] threads;
210     }
211 
compressBuffer(size_t size)212     bool compressBuffer(size_t size)
213     {
214         // submit current job
215         if (currentJobAvail)
216         {
217             jobs[currentJobId].size = size;
218             appendValue(jobQueue, currentJobId);
219         }
220 
221         // recycle existing idle job
222         if (!(currentJobAvail = popFront(currentJobId, idleQueue)))
223             return false;
224 
225         jobs[currentJobId].outputBuffer = aquireValue(serializer);
226 
227         return serializer;
228     }
229 
overflow(int_type c)230     int_type overflow(int_type c)
231     {
232         int w = static_cast<int>(this->pptr() - this->pbase());
233         if (c != EOF)
234         {
235             *this->pptr() = c;
236             ++w;
237         }
238         if (compressBuffer(w))
239         {
240             CompressionJob &job = jobs[currentJobId];
241             this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
242             return c;
243         }
244         else
245         {
246             return EOF;
247         }
248     }
249 
250     std::streamsize flush(bool flushEmptyBuffer = false)
251     {
252         int w = static_cast<int>(this->pptr() - this->pbase());
253         if ((w != 0 || flushEmptyBuffer) && compressBuffer(w))
254         {
255             CompressionJob &job = jobs[currentJobId];
256             this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
257         }
258         else
259         {
260             w = 0;
261         }
262 
263         // wait for running compressor threads
264         waitForMinSize(idleQueue, numJobs - 1);
265 
266         serializer.worker.ostream.flush();
267         return w;
268     }
269 
sync()270     int sync()
271     {
272         if (this->pptr() != this->pbase())
273         {
274             int c = overflow(EOF);
275             if (c == EOF)
276                 return -1;
277         }
278         return 0;
279     }
280 
addFooter()281     void addFooter()
282     {
283         // we flush the filled buffer here, so that an empty (EOF) buffer is flushed in the d'tor
284         if (this->pptr() != this->pbase())
285             overflow(EOF);
286     }
287 
288     /// returns a reference to the output stream
get_ostream()289     ostream_reference get_ostream() const    { return serializer.worker.ostream; };
290 };
291 
292 template<
293     typename Elem,
294     typename Tr = std::char_traits<Elem>,
295     typename ElemA = std::allocator<Elem>,
296     typename ByteT = char,
297     typename ByteAT = std::allocator<ByteT>
298 >
299 class basic_unbgzf_streambuf :
300     public std::basic_streambuf<Elem, Tr>
301 {
302 public:
303     typedef std::basic_istream<Elem, Tr>& istream_reference;
304     typedef ElemA char_allocator_type;
305     typedef ByteT byte_type;
306     typedef ByteAT byte_allocator_type;
307     typedef byte_type* byte_buffer_type;
308     typedef typename Tr::char_type char_type;
309     typedef typename Tr::int_type int_type;
310     typedef typename Tr::off_type off_type;
311     typedef typename Tr::pos_type pos_type;
312 
313     typedef std::vector<char_type, char_allocator_type>     TBuffer;
314     typedef ConcurrentQueue<int, Suspendable<Limit> >       TJobQueue;
315 
316     static const size_t MAX_PUTBACK = 4;
317 
318     struct Serializer
319     {
320         istream_reference   istream;
321         Mutex               lock;
322         IOError             *error;
323         off_type            fileOfs;
324 
SerializerSerializer325         Serializer(istream_reference istream) :
326             istream(istream),
327             lock(false),
328             error(NULL),
329             fileOfs(0u)
330         {}
331 
~SerializerSerializer332         ~Serializer()
333         {
334             delete error;
335         }
336     };
337 
338     Serializer serializer;
339 
340     struct DecompressionJob
341     {
342         typedef std::vector<byte_type, byte_allocator_type> TInputBuffer;
343 
344         TInputBuffer    inputBuffer;
345         TBuffer         buffer;
346         off_type        fileOfs;
347         int             size;
348         unsigned        compressedSize;
349 
350         CriticalSection cs;
351         Condition       readyEvent;
352         bool            ready;
353 
DecompressionJobDecompressionJob354         DecompressionJob() :
355             inputBuffer(BGZF_MAX_BLOCK_SIZE, 0),
356             buffer(MAX_PUTBACK + BGZF_MAX_BLOCK_SIZE / sizeof(char_type), 0),
357             fileOfs(),
358             size(0),
359             readyEvent(cs),
360             ready(true)
361         {}
362 
DecompressionJobDecompressionJob363         DecompressionJob(DecompressionJob const &other) :
364             inputBuffer(other.inputBuffer),
365             buffer(other.buffer),
366             fileOfs(other.fileOfs),
367             size(other.size),
368             readyEvent(cs),
369             ready(other.ready)
370         {}
371     };
372 
373     // string of recycable jobs
374     size_t                      numThreads;
375     size_t                      numJobs;
376     String<DecompressionJob>    jobs;
377     TJobQueue                   runningQueue;
378     TJobQueue                   todoQueue;
379     int                         currentJobId;
380 
381     struct DecompressionThread
382     {
383         basic_unbgzf_streambuf          *streamBuf;
384         CompressionContext<BgzfFile>    compressionCtx;
385 
operatorDecompressionThread386         void operator()()
387         {
388             ScopedReadLock<TJobQueue> readLock(streamBuf->todoQueue);
389             ScopedWriteLock<TJobQueue> writeLock(streamBuf->runningQueue);
390 
391             // wait for a new job to become available
392             while (true)
393             {
394                 int jobId = -1;
395                 if (!popFront(jobId, streamBuf->todoQueue))
396                     return;
397 
398                 DecompressionJob &job = streamBuf->jobs[jobId];
399                 size_t tailLen = 0;
400 
401                 // typically the idle queue contains only ready jobs
402                 // however, if seek() fast forwards running jobs into the todoQueue
403                 // the caller defers the task of waiting to the decompression threads
404                 if (!job.ready)
405                 {
406                     ScopedLock<CriticalSection> lock(job.cs);
407                     if (!job.ready)
408                     {
409                         waitFor(job.readyEvent);
410                         job.ready = true;
411                     }
412                 }
413 
414                 {
415                     ScopedLock<Mutex> scopedLock(streamBuf->serializer.lock);
416 
417                     if (streamBuf->serializer.error != NULL)
418                         return;
419 
420                     // remember start offset (for tellg later)
421                     job.fileOfs = streamBuf->serializer.fileOfs;
422                     job.size = -1;
423                     job.compressedSize = 0;
424 
425                     // only load if not at EOF
426                     if (job.fileOfs != -1)
427                     {
428                         // read header
429                         streamBuf->serializer.istream.read(
430                             (char*)&job.inputBuffer[0],
431                             BGZF_BLOCK_HEADER_LENGTH);
432 
433                         if (!streamBuf->serializer.istream.good())
434                         {
435                             streamBuf->serializer.fileOfs = -1;
436                             if (streamBuf->serializer.istream.eof())
437                                 goto eofSkip;
438                             streamBuf->serializer.error = new IOError("Stream read error.");
439                             return;
440                         }
441 
442                         // check header
443                         if (!_bgzfCheckHeader(&job.inputBuffer[0]))
444                         {
445                             streamBuf->serializer.fileOfs = -1;
446                             streamBuf->serializer.error = new IOError("Invalid BGZF block header.");
447                             return;
448                         }
449 
450                         // extract length of compressed data
451                         tailLen = _bgzfUnpack16(&job.inputBuffer[0] + 16) + 1u - BGZF_BLOCK_HEADER_LENGTH;
452 
453                         // read compressed data and tail
454                         streamBuf->serializer.istream.read(
455                             (char*)&job.inputBuffer[0] + BGZF_BLOCK_HEADER_LENGTH,
456                             tailLen);
457 
458                         if (!streamBuf->serializer.istream.good())
459                         {
460                             streamBuf->serializer.fileOfs = -1;
461                             if (streamBuf->serializer.istream.eof())
462                                 goto eofSkip;
463                             streamBuf->serializer.error = new IOError("Stream read error.");
464                             return;
465                         }
466 
467                         job.compressedSize = BGZF_BLOCK_HEADER_LENGTH + tailLen;
468                         streamBuf->serializer.fileOfs += job.compressedSize;
469                         job.ready = false;
470 
471                     eofSkip:
472                         streamBuf->serializer.istream.clear(
473                             streamBuf->serializer.istream.rdstate() & ~std::ios_base::failbit);
474                     }
475 
476                     if (!appendValue(streamBuf->runningQueue, jobId))
477                     {
478                         // signal that job is ready
479                         {
480                             ScopedLock<CriticalSection> lock(job.cs);
481                             job.ready = true;
482                             signal(job.readyEvent);
483                         }
484                         return;
485                     }
486                 }
487 
488                 if (!job.ready)
489                 {
490                     // decompress block
491                     job.size = _decompressBlock(
492                         &job.buffer[0] + MAX_PUTBACK, capacity(job.buffer),
493                         &job.inputBuffer[0], job.compressedSize, compressionCtx);
494 
495                     // signal that job is ready
496                     {
497                         ScopedLock<CriticalSection> lock(job.cs);
498                         job.ready = true;
499                         signal(job.readyEvent);
500                     }
501                 }
502             }
503         }
504     };
505 
506     // array of worker threads
507     Thread<DecompressionThread> *threads;
508     TBuffer                     putbackBuffer;
509 
510     basic_unbgzf_streambuf(istream_reference istream_,
511                            size_t numThreads = 16,
512                            size_t jobsPerThread = 8) :
serializer(istream_)513         serializer(istream_),
514         numThreads(numThreads),
515         numJobs(numThreads * jobsPerThread),
516         runningQueue(numJobs),
517         todoQueue(numJobs),
518         putbackBuffer(MAX_PUTBACK)
519     {
520         resize(jobs, numJobs, Exact());
521         currentJobId = -1;
522 
523         lockReading(runningQueue);
524         lockWriting(todoQueue);
525         setReaderWriterCount(runningQueue, 1, numThreads);
526         setReaderWriterCount(todoQueue, numThreads, 1);
527 
528         for (unsigned i = 0; i < numJobs; ++i)
529         {
530             bool success = appendValue(todoQueue, i);
531             ignoreUnusedVariableWarning(success);
532             SEQAN_ASSERT(success);
533         }
534 
535         threads = new Thread<DecompressionThread>[numThreads];
536         for (unsigned i = 0; i < numThreads; ++i)
537         {
538             threads[i].worker.streamBuf = this;
539             run(threads[i]);
540         }
541     }
542 
~basic_unbgzf_streambuf()543     ~basic_unbgzf_streambuf()
544     {
545         unlockWriting(todoQueue);
546         unlockReading(runningQueue);
547 
548         for (unsigned i = 0; i < numThreads; ++i)
549             waitFor(threads[i]);
550         delete[] threads;
551     }
552 
underflow()553     int_type underflow()
554     {
555         // no need to use the next buffer?
556         if (this->gptr() && this->gptr() < this->egptr())
557             return Tr::to_int_type(*this->gptr());
558 
559         size_t putback = this->gptr() - this->eback();
560         if (putback > MAX_PUTBACK)
561             putback = MAX_PUTBACK;
562 
563         // save at most MAX_PUTBACK characters from previous page to putback buffer
564         if (putback != 0)
565             std::copy(
566                 this->gptr() - putback,
567                 this->gptr(),
568                 &putbackBuffer[0]);
569 
570         if (currentJobId >= 0)
571             appendValue(todoQueue, currentJobId);
572 
573         while (true)
574         {
575             if (!popFront(currentJobId, runningQueue))
576             {
577                 currentJobId = -1;
578                 SEQAN_ASSERT(serializer.error != NULL);
579                 if (serializer.error != NULL)
580                     throw *serializer.error;
581                 return EOF;
582             }
583 
584             DecompressionJob &job = jobs[currentJobId];
585 
586             // restore putback buffer
587             this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
588             if (putback != 0)
589                 std::copy(
590                     &putbackBuffer[0],
591                     &putbackBuffer[0] + putback,
592                     &job.buffer[0] + (MAX_PUTBACK - putback));
593 
594             // wait for the end of decompression
595             {
596                 ScopedLock<CriticalSection> lock(job.cs);
597                 if (!job.ready)
598                     waitFor(job.readyEvent);
599             }
600 
601             size_t size = (job.size != -1)? job.size : 0;
602 
603             // reset buffer pointers
604             this->setg(
605                   &job.buffer[0] + (MAX_PUTBACK - putback),     // beginning of putback area
606                   &job.buffer[0] + MAX_PUTBACK,                 // read position
607                   &job.buffer[0] + (MAX_PUTBACK + size));       // end of buffer
608 
609             if (job.size == -1)
610                 return EOF;
611             else if (job.size > 0)
612                 return Tr::to_int_type(*this->gptr());      // return next character
613         }
614     }
615 
seekoff(off_type ofs,std::ios_base::seekdir dir,std::ios_base::openmode openMode)616     pos_type seekoff(off_type ofs, std::ios_base::seekdir dir, std::ios_base::openmode openMode)
617     {
618         if ((openMode & (std::ios_base::in | std::ios_base::out)) == std::ios_base::in)
619         {
620             if (dir == std::ios_base::cur && ofs >= 0)
621             {
622                 // forward delta seek
623                 while (currentJobId < 0 || this->egptr() - this->gptr() < ofs)
624                 {
625                     ofs -= this->egptr() - this->gptr();
626                     if (this->underflow() == EOF)
627                         break;
628                 }
629 
630                 if (currentJobId >= 0 && ofs <= this->egptr() - this->gptr())
631                 {
632                     DecompressionJob &job = jobs[currentJobId];
633 
634                     // reset buffer pointers
635                     this->setg(
636                           this->eback(),            // beginning of putback area
637                           this->gptr() + ofs,       // read position
638                           this->egptr());           // end of buffer
639 
640                     if (this->gptr() != this->egptr())
641                         return pos_type((job.fileOfs << 16) + ((this->gptr() - &job.buffer[MAX_PUTBACK])));
642                     else
643                         return pos_type((job.fileOfs + job.compressedSize) << 16);
644                 }
645 
646             }
647             else if (dir == std::ios_base::beg)
648             {
649                 // random seek
650                 std::streampos destFileOfs = ofs >> 16;
651 
652                 // are we in the same block?
653                 if (currentJobId >= 0 && jobs[currentJobId].fileOfs == (off_type)destFileOfs)
654                 {
655                     DecompressionJob &job = jobs[currentJobId];
656 
657                     // reset buffer pointers
658                     this->setg(
659                           this->eback(),                                        // beginning of putback area
660                           &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)),      // read position
661                           this->egptr());                                       // end of buffer
662                     return ofs;
663                 }
664 
665                 // ok, different block
666                 {
667                     ScopedLock<Mutex> scopedLock(serializer.lock);
668 
669                     // remove all running jobs and put them in the idle queue unless we
670                     // find our seek target
671 
672                     if (currentJobId >= 0)
673                         appendValue(todoQueue, currentJobId);
674 
675                     // Note that if we are here the current job does not represent the sought block.
676                     // Hence if the running queue is empty we need to explicitly unset the jobId,
677                     // otherwise we would not update the serializers istream pointer to the correct position.
678                     if (empty(runningQueue))
679                         currentJobId = -1;
680 
681                     // empty is thread-safe in serializer.lock
682                     while (!empty(runningQueue))
683                     {
684                         popFront(currentJobId, runningQueue);
685 
686                         if (jobs[currentJobId].fileOfs == (off_type)destFileOfs)
687                             break;
688 
689                         // push back useless job
690                         appendValue(todoQueue, currentJobId);
691                         currentJobId = -1;
692                     }
693 
694                     if (currentJobId == -1)
695                     {
696                         SEQAN_ASSERT(empty(runningQueue));
697                         serializer.istream.clear(serializer.istream.rdstate() & ~std::ios_base::eofbit);
698                         if (serializer.istream.rdbuf()->pubseekpos(destFileOfs, std::ios_base::in) == destFileOfs)
699                             serializer.fileOfs = destFileOfs;
700                         else
701                             currentJobId = -2;      // temporarily signals a seek error
702                     }
703                 }
704 
705                 // if our block wasn't in the running queue yet, it should now
706                 // be the first that falls out after modifying serializer.fileOfs
707                 if (currentJobId == -1)
708                     popFront(currentJobId, runningQueue);
709                 else if (currentJobId == -2)
710                     currentJobId = -1;
711 
712                 if (currentJobId >= 0)
713                 {
714                     // wait for the end of decompression
715                     DecompressionJob &job = jobs[currentJobId];
716                     {
717                         ScopedLock<CriticalSection> lock(job.cs);
718                         if (!job.ready)
719                             waitFor(job.readyEvent);
720                     }
721 
722                     SEQAN_ASSERT_EQ(job.fileOfs, (off_type)destFileOfs);
723 
724                     // reset buffer pointers
725                     this->setg(
726                           &job.buffer[0] + MAX_PUTBACK,                     // no putback area
727                           &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)),  // read position
728                           &job.buffer[0] + (MAX_PUTBACK + job.size));       // end of buffer
729                     return ofs;
730                 }
731             }
732         }
733         return pos_type(off_type(-1));
734     }
735 
seekpos(pos_type pos,std::ios_base::openmode openMode)736     pos_type seekpos(pos_type pos, std::ios_base::openmode openMode)
737     {
738         return seekoff(off_type(pos), std::ios_base::beg, openMode);
739     }
740 
741     /// returns the compressed input istream
get_istream()742     istream_reference get_istream()    { return serializer.istream;};
743 };
744 
745 /* \brief Base class for zip ostreams
746 
747 Contains a basic_bgzf_streambuf.
748 */
749 template<
750     typename Elem,
751     typename Tr = std::char_traits<Elem>,
752     typename ElemA = std::allocator<Elem>,
753     typename ByteT = char,
754     typename ByteAT = std::allocator<ByteT>
755 >
756 class basic_bgzf_ostreambase : virtual public std::basic_ios<Elem,Tr>
757 {
758 public:
759     typedef std::basic_ostream<Elem, Tr>& ostream_reference;
760     typedef basic_bgzf_streambuf<
761         Elem,
762         Tr,
763         ElemA,
764         ByteT,
765         ByteAT
766         > bgzf_streambuf_type;
767 
basic_bgzf_ostreambase(ostream_reference ostream_)768     basic_bgzf_ostreambase(ostream_reference ostream_)
769         : m_buf(ostream_)
770     {
771         this->init(&m_buf );
772     };
773 
774     /// returns the underlying zip ostream object
rdbuf()775     bgzf_streambuf_type* rdbuf() { return &m_buf; };
776 
777     /// returns the bgzf error state
get_zerr()778     int get_zerr() const                    {    return m_buf.get_err();};
779     /// returns the uncompressed data crc
get_crc()780     long get_crc() const                    {    return m_buf.get_crc();};
781     /// returns the compressed data size
get_out_size()782     long get_out_size() const                {    return m_buf.get_out_size();};
783     /// returns the uncompressed data size
get_in_size()784     long get_in_size() const                {    return m_buf.get_in_size();};
785 private:
786     bgzf_streambuf_type m_buf;
787 };
788 
789 /* \brief Base class for unzip istreams
790 
791 Contains a basic_unbgzf_streambuf.
792 */
793 template<
794     typename Elem,
795     typename Tr = std::char_traits<Elem>,
796     typename ElemA = std::allocator<Elem>,
797     typename ByteT = char,
798     typename ByteAT = std::allocator<ByteT>
799 >
800 class basic_bgzf_istreambase : virtual public std::basic_ios<Elem,Tr>
801 {
802 public:
803     typedef std::basic_istream<Elem, Tr>& istream_reference;
804     typedef basic_unbgzf_streambuf<
805         Elem,
806         Tr,
807         ElemA,
808         ByteT,
809         ByteAT
810         > unbgzf_streambuf_type;
811 
basic_bgzf_istreambase(istream_reference ostream_)812     basic_bgzf_istreambase(istream_reference ostream_)
813         : m_buf(ostream_)
814     {
815         this->init(&m_buf );
816     };
817 
818     /// returns the underlying unzip istream object
rdbuf()819     unbgzf_streambuf_type* rdbuf() { return &m_buf; };
820 
821     /// returns the bgzf error state
get_zerr()822     int get_zerr() const                    {    return m_buf.get_zerr();};
823     /// returns the uncompressed data crc
get_crc()824     long get_crc() const                    {    return m_buf.get_crc();};
825     /// returns the uncompressed data size
get_out_size()826     long get_out_size() const                {    return m_buf.get_out_size();};
827     /// returns the compressed data size
get_in_size()828     long get_in_size() const                {    return m_buf.get_in_size();};
829 private:
830     unbgzf_streambuf_type m_buf;
831 };
832 
833 /*brief A zipper ostream
834 
835 This class is a ostream decorator that behaves 'almost' like any other ostream.
836 
837 At construction, it takes any ostream that shall be used to output of the compressed data.
838 
839 When finished, you need to call the special method zflush or call the destructor
840 to flush all the intermidiate streams.
841 
842 Example:
843 \code
844 // creating the target zip string, could be a fstream
845 ostringstream ostringstream_;
846 // creating the zip layer
847 bgzf_ostream zipper(ostringstream_);
848 
849 
850 // writing data
851 zipper<<f<<" "<<d<<" "<<ui<<" "<<ul<<" "<<us<<" "<<c<<" "<<dum;
852 // zip ostream needs special flushing...
853 zipper.zflush();
854 \endcode
855 */
856 template<
857     typename Elem,
858     typename Tr = std::char_traits<Elem>,
859     typename ElemA = std::allocator<Elem>,
860     typename ByteT = char,
861     typename ByteAT = std::allocator<ByteT>
862 >
863 class basic_bgzf_ostream :
864     public basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
865     public std::basic_ostream<Elem,Tr>
866 {
867 public:
868     typedef basic_bgzf_ostreambase<
869         Elem,Tr,ElemA,ByteT,ByteAT> bgzf_ostreambase_type;
870     typedef std::basic_ostream<Elem,Tr> ostream_type;
871     typedef ostream_type& ostream_reference;
872 
basic_bgzf_ostream(ostream_reference ostream_)873     basic_bgzf_ostream(ostream_reference ostream_)
874     :
875         bgzf_ostreambase_type(ostream_),
876         ostream_type(bgzf_ostreambase_type::rdbuf())
877     {}
878 
879     /// flush inner buffer and zipper buffer
zflush()880     basic_bgzf_ostream<Elem,Tr>& zflush()
881     {
882         this->flush(); this->rdbuf()->flush(); return *this;
883     };
884 
~basic_bgzf_ostream()885     ~basic_bgzf_ostream()
886     {
887         this->rdbuf()->addFooter();
888     }
889 
890 private:
891     static void put_long(ostream_reference out_, unsigned long x_);
892 #ifdef _WIN32
_Add_vtordisp1()893     void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250
_Add_vtordisp2()894     void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250
895 #endif
896 };
897 
898 /* \brief A zipper istream
899 
900 This class is a istream decorator that behaves 'almost' like any other ostream.
901 
902 At construction, it takes any istream that shall be used to input of the compressed data.
903 
904 Simlpe example:
905 \code
906 // create a stream on zip string
907 istringstream istringstream_( ostringstream_.str());
908 // create unzipper istream
909 bgzf_istream unzipper( istringstream_);
910 
911 // read and unzip
912 unzipper>>f_r>>d_r>>ui_r>>ul_r>>us_r>>c_r>>dum_r;
913 \endcode
914 */
915 template<
916     typename Elem,
917     typename Tr = std::char_traits<Elem>,
918     typename ElemA = std::allocator<Elem>,
919     typename ByteT = char,
920     typename ByteAT = std::allocator<ByteT>
921 >
922 class basic_bgzf_istream :
923     public basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
924     public std::basic_istream<Elem,Tr>
925 {
926 public:
927     typedef basic_bgzf_istreambase<
928         Elem,Tr,ElemA,ByteT,ByteAT> bgzf_istreambase_type;
929     typedef std::basic_istream<Elem,Tr> istream_type;
930     typedef istream_type& istream_reference;
931     typedef char byte_type;
932 
basic_bgzf_istream(istream_reference istream_)933     basic_bgzf_istream(istream_reference istream_)
934       :
935         bgzf_istreambase_type(istream_),
936         istream_type(bgzf_istreambase_type::rdbuf()),
937         m_is_gzip(false),
938         m_gbgzf_data_size(0)
939     {};
940 
941     /// returns true if it is a gzip file
is_gzip()942     bool is_gzip() const                {    return m_is_gzip;};
943     /// return data size check
check_data_size()944     bool check_data_size() const        {    return this->get_out_size() == m_gbgzf_data_size;};
945 
946     /// return the data size in the file
get_gbgzf_data_size()947     long get_gbgzf_data_size() const        {    return m_gbgzf_data_size;};
948 protected:
949     static void read_long(istream_reference in_, unsigned long& x_);
950 
951     int check_header();
952     bool m_is_gzip;
953     unsigned long m_gbgzf_data_size;
954 
955 #ifdef _WIN32
956 private:
_Add_vtordisp1()957     void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250
_Add_vtordisp2()958     void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250
959 #endif
960 };
961 
962 /// A typedef for basic_bgzf_ostream<char>
963 typedef basic_bgzf_ostream<char> bgzf_ostream;
964 /// A typedef for basic_bgzf_ostream<wchar_t>
965 typedef basic_bgzf_ostream<wchar_t> bgzf_wostream;
966 /// A typedef for basic_bgzf_istream<char>
967 typedef basic_bgzf_istream<char> bgzf_istream;
968 /// A typedef for basic_bgzf_istream<wchart>
969 typedef basic_bgzf_istream<wchar_t> bgzf_wistream;
970 
971 }  // namespace seqan
972 
973 #include "iostream_bgzf_impl.h"
974 
975 #endif // INCLUDE_SEQAN_STREAM_IOSTREAM_BGZF_H_
976