1 // zipstream Library License:
2 // --------------------------
3 //
4 // The zlib/libpng License Copyright (c) 2003 Jonathan de Halleux.
5 //
6 // This software is provided 'as-is', without any express or implied warranty. In no event will the authors be held liable for any damages arising from the use of this software.
7 //
8 // Permission is granted to anyone to use this software for any purpose, including commercial applications, and to alter it and redistribute it freely, subject to the following restrictions:
9 //
10 // 1. The origin of this software must not be misrepresented; you must not claim that you wrote the original software. If you use this software in a product, an acknowledgment in the product documentation would be appreciated but is not required.
11 //
12 // 2. Altered source versions must be plainly marked as such, and must not be misrepresented as being the original software.
13 //
14 // 3. This notice may not be removed or altered from any source distribution
15 //
16 //
17 // Author: Jonathan de Halleux, dehalleux@pelikhan.com, 2003   (original zlib stream)
18 // Author: David Weese, dave.weese@gmail.com, 2014             (extension to parallel block-wise compression in bgzf format)
19 
20 #ifndef INCLUDE_SEQAN_STREAM_IOSTREAM_BGZF_H_
21 #define INCLUDE_SEQAN_STREAM_IOSTREAM_BGZF_H_
22 
23 #ifndef SEQAN_BGZF_NUM_THREADS
24 #define SEQAN_BGZF_NUM_THREADS 16
25 #endif
26 
27 namespace seqan {
28 
29 const unsigned BGZF_MAX_BLOCK_SIZE = 64 * 1024;
30 const unsigned BGZF_BLOCK_HEADER_LENGTH = 18;
31 const unsigned BGZF_BLOCK_FOOTER_LENGTH = 8;
32 const unsigned ZLIB_BLOCK_OVERHEAD = 5; // 5 bytes block overhead (see 3.2.4. at http://www.gzip.org/zlib/rfc-deflate.html)
33 
34 // Reduce the maximal input size, such that the compressed data
35 // always fits in one block even for level Z_NO_COMPRESSION.
36 const unsigned BGZF_BLOCK_SIZE = BGZF_MAX_BLOCK_SIZE - BGZF_BLOCK_HEADER_LENGTH - BGZF_BLOCK_FOOTER_LENGTH - ZLIB_BLOCK_OVERHEAD;
37 
38 // ===========================================================================
39 // Classes
40 // ===========================================================================
41 
42 // --------------------------------------------------------------------------
43 // Class basic_bgzf_streambuf
44 // --------------------------------------------------------------------------
45 
46 template<
47     typename Elem,
48     typename Tr = std::char_traits<Elem>,
49     typename ElemA = std::allocator<Elem>,
50     typename ByteT = char,
51     typename ByteAT = std::allocator<ByteT>
52 >
53 class basic_bgzf_streambuf : public std::basic_streambuf<Elem, Tr>
54 {
55 public:
56     typedef std::basic_ostream<Elem, Tr>& ostream_reference;
57     typedef ElemA char_allocator_type;
58     typedef ByteT byte_type;
59     typedef ByteAT byte_allocator_type;
60     typedef byte_type* byte_buffer_type;
61     typedef typename Tr::char_type char_type;
62     typedef typename Tr::int_type int_type;
63 
64     typedef ConcurrentQueue<size_t, Suspendable<Limit> > TJobQueue;
65 
66     struct OutputBuffer
67     {
68         char    buffer[BGZF_MAX_BLOCK_SIZE];
69         size_t  size;
70     };
71 
72     struct BufferWriter
73     {
74         ostream_reference ostream;
75 
BufferWriterBufferWriter76         BufferWriter(ostream_reference ostream) :
77             ostream(ostream)
78         {}
79 
operatorBufferWriter80         bool operator() (OutputBuffer const & outputBuffer)
81         {
82             ostream.write(outputBuffer.buffer, outputBuffer.size);
83             return ostream.good();
84         }
85     };
86 
87     struct CompressionJob
88     {
89         typedef std::vector<char_type, char_allocator_type> TBuffer;
90 
91         TBuffer         buffer;
92         size_t          size;
93         OutputBuffer    *outputBuffer;
94 
CompressionJobCompressionJob95         CompressionJob() :
96             buffer(BGZF_BLOCK_SIZE / sizeof(char_type), 0),
97             size(0),
98             outputBuffer(NULL)
99         {}
100     };
101 
102     // string of recycable jobs
103     size_t                  numThreads;
104     size_t                  numJobs;
105     String<CompressionJob>  jobs;
106     TJobQueue               jobQueue;
107     TJobQueue               idleQueue;
108     Serializer<
109         OutputBuffer,
110         BufferWriter>       serializer;
111 
112     size_t                  currentJobId;
113     bool                    currentJobAvail;
114 
115 
116     struct CompressionThread
117     {
118         basic_bgzf_streambuf            *streamBuf;
119         CompressionContext<BgzfFile>    compressionCtx;
120         size_t                          threadNum;
121 
operatorCompressionThread122         void operator()()
123         {
124             ScopedReadLock<TJobQueue> readLock(streamBuf->jobQueue);
125             ScopedWriteLock<TJobQueue> writeLock(streamBuf->idleQueue);
126 
127             // wait for a new job to become available
128             bool success = true;
129             while (success)
130             {
131                 size_t jobId = -1;
132                 if (!popFront(jobId, streamBuf->jobQueue))
133                     return;
134 
135                 CompressionJob &job = streamBuf->jobs[jobId];
136 
137                 // compress block with zlib
138                 job.outputBuffer->size = _compressBlock(
139                     job.outputBuffer->buffer, sizeof(job.outputBuffer->buffer),
140                     &job.buffer[0], job.size, compressionCtx);
141 
142                 success = releaseValue(streamBuf->serializer, job.outputBuffer);
143                 appendValue(streamBuf->idleQueue, jobId);
144             }
145         }
146     };
147 
148     // array of worker threads
149     using TFuture = decltype(std::async(CompressionThread{nullptr, CompressionContext<BgzfFile>{}, static_cast<size_t>(0)}));
150     std::vector<TFuture>         threads;
151 
152     basic_bgzf_streambuf(ostream_reference ostream_,
153                          size_t numThreads = SEQAN_BGZF_NUM_THREADS,
154                          size_t jobsPerThread = 8) :
numThreads(numThreads)155         numThreads(numThreads),
156         numJobs(numThreads * jobsPerThread),
157         jobQueue(numJobs),
158         idleQueue(numJobs),
159         serializer(ostream_, numThreads * jobsPerThread)
160     {
161         resize(jobs, numJobs, Exact());
162         currentJobId = 0;
163 
164         lockWriting(jobQueue);
165         lockReading(idleQueue);
166         setReaderWriterCount(jobQueue, numThreads, 1);
167         setReaderWriterCount(idleQueue, 1, numThreads);
168 
169         for (unsigned i = 0; i < numJobs; ++i)
170         {
171             bool success = appendValue(idleQueue, i);
172             ignoreUnusedVariableWarning(success);
173             SEQAN_ASSERT(success);
174         }
175 
176         for (size_t i = 0; i < numThreads; ++i)
177         {
178             threads.push_back(std::async(std::launch::async, CompressionThread{this, CompressionContext<BgzfFile>{}, i}));
179         }
180 
181         currentJobAvail = popFront(currentJobId, idleQueue);
182         SEQAN_ASSERT(currentJobAvail);
183 
184         CompressionJob &job = jobs[currentJobId];
185         job.outputBuffer = aquireValue(serializer);
186         this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
187     }
188 
~basic_bgzf_streambuf()189     ~basic_bgzf_streambuf()
190     {
191         // the buffer is now (after addFooter()) and flush will append the empty EOF marker
192         flush(true);
193 
194         unlockWriting(jobQueue);
195         unlockReading(idleQueue);
196     }
197 
compressBuffer(size_t size)198     bool compressBuffer(size_t size)
199     {
200         // submit current job
201         if (currentJobAvail)
202         {
203             jobs[currentJobId].size = size;
204             appendValue(jobQueue, currentJobId);
205         }
206 
207         // recycle existing idle job
208         if (!(currentJobAvail = popFront(currentJobId, idleQueue)))
209             return false;
210 
211         jobs[currentJobId].outputBuffer = aquireValue(serializer);
212 
213         return serializer;
214     }
215 
overflow(int_type c)216     int_type overflow(int_type c)
217     {
218         int w = static_cast<int>(this->pptr() - this->pbase());
219         if (c != EOF)
220         {
221             *this->pptr() = c;
222             ++w;
223         }
224         if (compressBuffer(w))
225         {
226             CompressionJob &job = jobs[currentJobId];
227             this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
228             return c;
229         }
230         else
231         {
232             return EOF;
233         }
234     }
235 
236     std::streamsize flush(bool flushEmptyBuffer = false)
237     {
238         int w = static_cast<int>(this->pptr() - this->pbase());
239         if ((w != 0 || flushEmptyBuffer) && compressBuffer(w))
240         {
241             CompressionJob &job = jobs[currentJobId];
242             this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
243         }
244         else
245         {
246             w = 0;
247         }
248 
249         // wait for running compressor threads
250         waitForMinSize(idleQueue, numJobs - 1);
251 
252         serializer.worker.ostream.flush();
253         return w;
254     }
255 
sync()256     int sync()
257     {
258         if (this->pptr() != this->pbase())
259         {
260             int c = overflow(EOF);
261             if (c == EOF)
262                 return -1;
263         }
264         return 0;
265     }
266 
addFooter()267     void addFooter()
268     {
269         // we flush the filled buffer here, so that an empty (EOF) buffer is flushed in the d'tor
270         if (this->pptr() != this->pbase())
271             overflow(EOF);
272     }
273 
274     // returns a reference to the output stream
get_ostream()275     ostream_reference get_ostream() const    { return serializer.worker.ostream; };
276 };
277 
278 // --------------------------------------------------------------------------
279 // Class basic_unbgzf_streambuf
280 // --------------------------------------------------------------------------
281 
282 template<
283     typename Elem,
284     typename Tr = std::char_traits<Elem>,
285     typename ElemA = std::allocator<Elem>,
286     typename ByteT = char,
287     typename ByteAT = std::allocator<ByteT>
288 >
289 class basic_unbgzf_streambuf :
290     public std::basic_streambuf<Elem, Tr>
291 {
292 public:
293     typedef std::basic_istream<Elem, Tr>& istream_reference;
294     typedef ElemA char_allocator_type;
295     typedef ByteT byte_type;
296     typedef ByteAT byte_allocator_type;
297     typedef byte_type* byte_buffer_type;
298     typedef typename Tr::char_type char_type;
299     typedef typename Tr::int_type int_type;
300     typedef typename Tr::off_type off_type;
301     typedef typename Tr::pos_type pos_type;
302 
303     typedef std::vector<char_type, char_allocator_type>     TBuffer;
304     typedef ConcurrentQueue<int, Suspendable<Limit> >       TJobQueue;
305 
306     static const size_t MAX_PUTBACK = 4;
307 
308     struct Serializer
309     {
310         istream_reference   istream;
311         std::mutex          lock;
312         IOError             *error;
313         off_type            fileOfs;
314 
SerializerSerializer315         Serializer(istream_reference istream) :
316             istream(istream),
317             error(NULL),
318             fileOfs(0u)
319         {}
320 
~SerializerSerializer321         ~Serializer()
322         {
323             delete error;
324         }
325     };
326 
327     Serializer serializer;
328 
329     struct DecompressionJob
330     {
331         typedef std::vector<byte_type, byte_allocator_type> TInputBuffer;
332 
333         TInputBuffer            inputBuffer;
334         TBuffer                 buffer;
335         off_type                fileOfs;
336         int                     size;
337         unsigned                compressedSize;
338 
339         std::mutex              cs;
340         std::condition_variable readyEvent;
341         bool                    ready;
342         bool                    bgzfEofMarker;
343 
DecompressionJobDecompressionJob344         DecompressionJob() :
345             inputBuffer(BGZF_MAX_BLOCK_SIZE, 0),
346             buffer(MAX_PUTBACK + BGZF_MAX_BLOCK_SIZE / sizeof(char_type), 0),
347             fileOfs(),
348             size(0),
349             cs(),
350             readyEvent(),
351             ready(true),
352             bgzfEofMarker(false)
353         {}
354 
355         // TODO(rrahn): Do we need a copy constructor for the decompression job.
DecompressionJobDecompressionJob356         DecompressionJob(DecompressionJob const &other) :
357             inputBuffer(other.inputBuffer),
358             buffer(other.buffer),
359             fileOfs(other.fileOfs),
360             size(other.size),
361             cs(),
362             readyEvent(),
363             ready(other.ready),
364             bgzfEofMarker(other.bgzfEofMarker)
365         {}
366     };
367 
368     // string of recycable jobs
369     size_t                      numThreads;
370     size_t                      numJobs;
371     String<DecompressionJob>    jobs;
372     TJobQueue                   runningQueue;
373     TJobQueue                   todoQueue;
374     int                         currentJobId;
375 
376     struct DecompressionThread
377     {
378         basic_unbgzf_streambuf          *streamBuf;
379         CompressionContext<BgzfFile>    compressionCtx;
380 
operatorDecompressionThread381         void operator()()
382         {
383             ScopedReadLock<TJobQueue> readLock(streamBuf->todoQueue);
384             ScopedWriteLock<TJobQueue> writeLock(streamBuf->runningQueue);
385 
386             // wait for a new job to become available
387             while (true)
388             {
389                 int jobId = -1;
390                 if (!popFront(jobId, streamBuf->todoQueue))
391                     return;
392 
393                 DecompressionJob &job = streamBuf->jobs[jobId];
394                 size_t tailLen = 0;
395 
396                 // typically the idle queue contains only ready jobs
397                 // however, if seek() fast forwards running jobs into the todoQueue
398                 // the caller defers the task of waiting to the decompression threads
399                 if (!job.ready)
400                 {
401                     std::unique_lock<std::mutex> lock(job.cs);
402                     job.readyEvent.wait(lock, [&job]{return job.ready;});
403                     SEQAN_ASSERT_EQ(job.ready, true);
404                 }
405 
406                 {
407                     std::lock_guard<std::mutex> scopedLock(streamBuf->serializer.lock);
408 
409                     job.bgzfEofMarker = false;
410                     if (streamBuf->serializer.error != NULL)
411                         return;
412 
413                     // remember start offset (for tellg later)
414                     job.fileOfs = streamBuf->serializer.fileOfs;
415                     job.size = -1;
416                     job.compressedSize = 0;
417 
418                     // only load if not at EOF
419                     if (job.fileOfs != -1)
420                     {
421                         // read header
422                         streamBuf->serializer.istream.read(
423                             (char*)&job.inputBuffer[0],
424                             BGZF_BLOCK_HEADER_LENGTH);
425 
426                         if (!streamBuf->serializer.istream.good())
427                         {
428                             streamBuf->serializer.fileOfs = -1;
429                             if (streamBuf->serializer.istream.eof())
430                                 goto eofSkip;
431                             streamBuf->serializer.error = new IOError("Stream read error.");
432                             return;
433                         }
434 
435                         // check header
436                         if (!_bgzfCheckHeader(&job.inputBuffer[0]))
437                         {
438                             streamBuf->serializer.fileOfs = -1;
439                             streamBuf->serializer.error = new IOError("Invalid BGZF block header.");
440                             return;
441                         }
442 
443                         // extract length of compressed data
444                         tailLen = _bgzfUnpack16(&job.inputBuffer[0] + 16) + 1u - BGZF_BLOCK_HEADER_LENGTH;
445 
446                         // read compressed data and tail
447                         streamBuf->serializer.istream.read(
448                             (char*)&job.inputBuffer[0] + BGZF_BLOCK_HEADER_LENGTH,
449                             tailLen);
450 
451                         // Check if end-of-file marker is set
452                         if (memcmp(reinterpret_cast<uint8_t const *>(&job.inputBuffer[0]),
453                                    reinterpret_cast<uint8_t const *>(&BGZF_END_OF_FILE_MARKER[0]),
454                                    28) == 0)
455                         {
456                             job.bgzfEofMarker = true;
457                         }
458 
459                         if (!streamBuf->serializer.istream.good())
460                         {
461                             streamBuf->serializer.fileOfs = -1;
462                             if (streamBuf->serializer.istream.eof())
463                                 goto eofSkip;
464                             streamBuf->serializer.error = new IOError("Stream read error.");
465                             return;
466                         }
467 
468                         job.compressedSize = BGZF_BLOCK_HEADER_LENGTH + tailLen;
469                         streamBuf->serializer.fileOfs += job.compressedSize;
470                         job.ready = false;
471 
472                     eofSkip:
473                         streamBuf->serializer.istream.clear(
474                             streamBuf->serializer.istream.rdstate() & ~std::ios_base::failbit);
475                     }
476 
477                     if (!appendValue(streamBuf->runningQueue, jobId))
478                     {
479                         // signal that job is ready
480                         {
481                             std::unique_lock<std::mutex> lock(job.cs);
482                             job.ready = true;
483                         }
484                         job.readyEvent.notify_all();
485                         return;  // Terminate this thread.
486                     }
487                 }
488 
489                 if (!job.ready)
490                 {
491                     // decompress block
492                     job.size = _decompressBlock(
493                         &job.buffer[0] + MAX_PUTBACK, capacity(job.buffer),
494                         &job.inputBuffer[0], job.compressedSize, compressionCtx);
495 
496                     // signal that job is ready
497                     {
498                         std::unique_lock<std::mutex> lock(job.cs);
499                         job.ready = true;
500                     }
501                     job.readyEvent.notify_all();
502                 }
503             }
504         }
505     };
506 
507     // array of worker threads
508     using TFuture = decltype(std::async(DecompressionThread{nullptr, CompressionContext<BgzfFile>{}}));
509     std::vector<TFuture> threads;
510     TBuffer              putbackBuffer;
511 
512     basic_unbgzf_streambuf(istream_reference istream_,
513                            size_t numThreads = SEQAN_BGZF_NUM_THREADS,
514                            size_t jobsPerThread = 8) :
serializer(istream_)515         serializer(istream_),
516         numThreads(numThreads),
517         numJobs(numThreads * jobsPerThread),
518         runningQueue(numJobs),
519         todoQueue(numJobs),
520         putbackBuffer(MAX_PUTBACK)
521     {
522         resize(jobs, numJobs, Exact());
523         currentJobId = -1;
524 
525         lockReading(runningQueue);
526         lockWriting(todoQueue);
527         setReaderWriterCount(runningQueue, 1, numThreads);
528         setReaderWriterCount(todoQueue, numThreads, 1);
529 
530         for (unsigned i = 0; i < numJobs; ++i)
531         {
532             bool success = appendValue(todoQueue, i);
533             ignoreUnusedVariableWarning(success);
534             SEQAN_ASSERT(success);
535         }
536 
537         for (unsigned i = 0; i < numThreads; ++i)
538         {
539             threads.push_back(std::async(std::launch::async, DecompressionThread{this, CompressionContext<BgzfFile>{}}));
540         }
541     }
542 
~basic_unbgzf_streambuf()543     ~basic_unbgzf_streambuf()
544     {
545         unlockWriting(todoQueue);
546         unlockReading(runningQueue);
547     }
548 
underflow()549     int_type underflow()
550     {
551         // no need to use the next buffer?
552         if (this->gptr() && this->gptr() < this->egptr())
553             return Tr::to_int_type(*this->gptr());
554 
555         size_t putback = this->gptr() - this->eback();
556         if (putback > MAX_PUTBACK)
557             putback = MAX_PUTBACK;
558 
559         // save at most MAX_PUTBACK characters from previous page to putback buffer
560         if (putback != 0)
561             std::copy(
562                 this->gptr() - putback,
563                 this->gptr(),
564                 &putbackBuffer[0]);
565 
566         if (currentJobId >= 0)
567             appendValue(todoQueue, currentJobId);
568 
569         while (true)
570         {
571             if (!popFront(currentJobId, runningQueue))
572             {
573                 currentJobId = -1;
574                 SEQAN_ASSERT(serializer.error != NULL);
575                 if (serializer.error != NULL)
576                     throw *serializer.error;
577                 return EOF;
578             }
579 
580             DecompressionJob &job = jobs[currentJobId];
581 
582             // restore putback buffer
583             this->setp(&job.buffer[0], &job.buffer[0] + (job.buffer.size() - 1));
584             if (putback != 0)
585                 std::copy(
586                     &putbackBuffer[0],
587                     &putbackBuffer[0] + putback,
588                     &job.buffer[0] + (MAX_PUTBACK - putback));
589 
590             // wait for the end of decompression
591             {
592                 std::unique_lock<std::mutex> lock(job.cs);
593                 job.readyEvent.wait(lock, [&job]{return job.ready;});
594             }
595 
596             size_t size = (job.size != -1)? job.size : 0;
597 
598             // reset buffer pointers
599             this->setg(
600                   &job.buffer[0] + (MAX_PUTBACK - putback),     // beginning of putback area
601                   &job.buffer[0] + MAX_PUTBACK,                 // read position
602                   &job.buffer[0] + (MAX_PUTBACK + size));       // end of buffer
603 
604             // The end of the bgzf file is reached, either if there was an error, or if the
605             // end-of-file marker was reached, while the uncompressed block had zero size.
606             if (job.size == -1 || (job.size == 0 && job.bgzfEofMarker))
607                 return EOF;
608             else if (job.size > 0)
609                 return Tr::to_int_type(*this->gptr());      // return next character
610 
611             throw IOError("BGZF: Invalid end condition in decompression. "
612                           "Most likely due to an empty bgzf block without end-of-file marker.");
613         }
614     }
615 
seekoff(off_type ofs,std::ios_base::seekdir dir,std::ios_base::openmode openMode)616     pos_type seekoff(off_type ofs, std::ios_base::seekdir dir, std::ios_base::openmode openMode)
617     {
618         if ((openMode & (std::ios_base::in | std::ios_base::out)) == std::ios_base::in)
619         {
620             if (dir == std::ios_base::cur && ofs >= 0)
621             {
622                 // forward delta seek
623                 while (currentJobId < 0 || this->egptr() - this->gptr() < ofs)
624                 {
625                     ofs -= this->egptr() - this->gptr();
626                     if (this->underflow() == EOF)
627                         break;
628                 }
629 
630                 if (currentJobId >= 0 && ofs <= this->egptr() - this->gptr())
631                 {
632                     DecompressionJob &job = jobs[currentJobId];
633 
634                     // reset buffer pointers
635                     this->setg(
636                           this->eback(),            // beginning of putback area
637                           this->gptr() + ofs,       // read position
638                           this->egptr());           // end of buffer
639 
640                     if (this->gptr() != this->egptr())
641                         return pos_type((job.fileOfs << 16) + ((this->gptr() - &job.buffer[MAX_PUTBACK])));
642                     else
643                         return pos_type((job.fileOfs + job.compressedSize) << 16);
644                 }
645 
646             }
647             else if (dir == std::ios_base::beg)
648             {
649                 // random seek
650                 std::streampos destFileOfs = ofs >> 16;
651 
652                 // are we in the same block?
653                 if (currentJobId >= 0 && jobs[currentJobId].fileOfs == (off_type)destFileOfs)
654                 {
655                     DecompressionJob &job = jobs[currentJobId];
656 
657                     // reset buffer pointers
658                     this->setg(
659                           this->eback(),                                        // beginning of putback area
660                           &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)),      // read position
661                           this->egptr());                                       // end of buffer
662                     return ofs;
663                 }
664 
665                 // ok, different block
666                 {
667                     std::lock_guard<std::mutex> scopedLock(serializer.lock);
668 
669                     // remove all running jobs and put them in the idle queue unless we
670                     // find our seek target
671 
672                     if (currentJobId >= 0)
673                         appendValue(todoQueue, currentJobId);
674 
675                     // Note that if we are here the current job does not represent the sought block.
676                     // Hence if the running queue is empty we need to explicitly unset the jobId,
677                     // otherwise we would not update the serializers istream pointer to the correct position.
678                     if (empty(runningQueue))
679                         currentJobId = -1;
680 
681                     // empty is thread-safe in serializer.lock
682                     while (!empty(runningQueue))
683                     {
684                         popFront(currentJobId, runningQueue);
685 
686                         if (jobs[currentJobId].fileOfs == (off_type)destFileOfs)
687                             break;
688 
689                         // push back useless job
690                         appendValue(todoQueue, currentJobId);
691                         currentJobId = -1;
692                     }
693 
694                     if (currentJobId == -1)
695                     {
696                         SEQAN_ASSERT(empty(runningQueue));
697                         serializer.istream.clear(serializer.istream.rdstate() & ~std::ios_base::eofbit);
698                         if (serializer.istream.rdbuf()->pubseekpos(destFileOfs, std::ios_base::in) == destFileOfs)
699                             serializer.fileOfs = destFileOfs;
700                         else
701                             currentJobId = -2;      // temporarily signals a seek error
702                     }
703                 }
704 
705                 // if our block wasn't in the running queue yet, it should now
706                 // be the first that falls out after modifying serializer.fileOfs
707                 if (currentJobId == -1)
708                     popFront(currentJobId, runningQueue);
709                 else if (currentJobId == -2)
710                     currentJobId = -1;
711 
712                 if (currentJobId >= 0)
713                 {
714                     // wait for the end of decompression
715                     DecompressionJob &job = jobs[currentJobId];
716 
717                     {
718                         std::unique_lock<std::mutex> lock(job.cs);
719                         job.readyEvent.wait(lock, [&job]{return job.ready;});
720                     }
721 
722                     SEQAN_ASSERT_EQ(job.fileOfs, (off_type)destFileOfs);
723 
724                     // reset buffer pointers
725                     this->setg(
726                           &job.buffer[0] + MAX_PUTBACK,                     // no putback area
727                           &job.buffer[0] + (MAX_PUTBACK + (ofs & 0xffff)),  // read position
728                           &job.buffer[0] + (MAX_PUTBACK + job.size));       // end of buffer
729                     return ofs;
730                 }
731             }
732         }
733         return pos_type(off_type(-1));
734     }
735 
seekpos(pos_type pos,std::ios_base::openmode openMode)736     pos_type seekpos(pos_type pos, std::ios_base::openmode openMode)
737     {
738         return seekoff(off_type(pos), std::ios_base::beg, openMode);
739     }
740 
741     // returns the compressed input istream
get_istream()742     istream_reference get_istream()    { return serializer.istream; };
743 };
744 
745 // --------------------------------------------------------------------------
746 // Class basic_bgzf_ostreambase
747 // --------------------------------------------------------------------------
748 
749 template<
750     typename Elem,
751     typename Tr = std::char_traits<Elem>,
752     typename ElemA = std::allocator<Elem>,
753     typename ByteT = char,
754     typename ByteAT = std::allocator<ByteT>
755 >
756 class basic_bgzf_ostreambase : virtual public std::basic_ios<Elem,Tr>
757 {
758 public:
759     typedef std::basic_ostream<Elem, Tr>&                        ostream_reference;
760     typedef basic_bgzf_streambuf<Elem, Tr, ElemA, ByteT, ByteAT> bgzf_streambuf_type;
761 
basic_bgzf_ostreambase(ostream_reference ostream_)762     basic_bgzf_ostreambase(ostream_reference ostream_)
763         : m_buf(ostream_)
764     {
765         this->init(&m_buf );
766     };
767 
768     // returns the underlying zip ostream object
rdbuf()769     bgzf_streambuf_type* rdbuf()            { return &m_buf; };
770     // returns the bgzf error state
get_zerr()771     int get_zerr() const                    { return m_buf.get_err(); };
772     // returns the uncompressed data crc
get_crc()773     long get_crc() const                    { return m_buf.get_crc(); };
774     // returns the compressed data size
get_out_size()775     long get_out_size() const               { return m_buf.get_out_size(); };
776     // returns the uncompressed data size
get_in_size()777     long get_in_size() const                { return m_buf.get_in_size(); };
778 
779 private:
780     bgzf_streambuf_type m_buf;
781 };
782 
783 // --------------------------------------------------------------------------
784 // Class basic_bgzf_istreambase
785 // --------------------------------------------------------------------------
786 
787 template<
788     typename Elem,
789     typename Tr = std::char_traits<Elem>,
790     typename ElemA = std::allocator<Elem>,
791     typename ByteT = char,
792     typename ByteAT = std::allocator<ByteT>
793 >
794 class basic_bgzf_istreambase : virtual public std::basic_ios<Elem,Tr>
795 {
796 public:
797     typedef std::basic_istream<Elem, Tr>&                           istream_reference;
798     typedef basic_unbgzf_streambuf<Elem, Tr, ElemA, ByteT, ByteAT>  unbgzf_streambuf_type;
799 
basic_bgzf_istreambase(istream_reference ostream_)800     basic_bgzf_istreambase(istream_reference ostream_)
801         : m_buf(ostream_)
802     {
803         this->init(&m_buf );
804     };
805 
806     // returns the underlying unzip istream object
rdbuf()807     unbgzf_streambuf_type* rdbuf() { return &m_buf; };
808 
809     // returns the bgzf error state
get_zerr()810     int get_zerr() const                    { return m_buf.get_zerr(); };
811     // returns the uncompressed data crc
get_crc()812     long get_crc() const                    { return m_buf.get_crc(); };
813     // returns the uncompressed data size
get_out_size()814     long get_out_size() const               { return m_buf.get_out_size(); };
815     // returns the compressed data size
get_in_size()816     long get_in_size() const                { return m_buf.get_in_size(); };
817 
818 private:
819     unbgzf_streambuf_type m_buf;
820 };
821 
822 // --------------------------------------------------------------------------
823 // Class basic_bgzf_ostream
824 // --------------------------------------------------------------------------
825 
826 template<
827     typename Elem,
828     typename Tr = std::char_traits<Elem>,
829     typename ElemA = std::allocator<Elem>,
830     typename ByteT = char,
831     typename ByteAT = std::allocator<ByteT>
832 >
833 class basic_bgzf_ostream :
834     public basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
835     public std::basic_ostream<Elem,Tr>
836 {
837 public:
838     typedef basic_bgzf_ostreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_ostreambase_type;
839     typedef std::basic_ostream<Elem,Tr>                        ostream_type;
840     typedef ostream_type&                                      ostream_reference;
841 
basic_bgzf_ostream(ostream_reference ostream_)842     basic_bgzf_ostream(ostream_reference ostream_) :
843         bgzf_ostreambase_type(ostream_),
844         ostream_type(bgzf_ostreambase_type::rdbuf())
845     {}
846 
847     // flush inner buffer and zipper buffer
zflush()848     basic_bgzf_ostream<Elem,Tr>& zflush()
849     {
850         this->flush(); this->rdbuf()->flush(); return *this;
851     };
852 
~basic_bgzf_ostream()853     ~basic_bgzf_ostream()
854     {
855         this->rdbuf()->addFooter();
856     }
857 
858 private:
859     static void put_long(ostream_reference out_, unsigned long x_);
860 #ifdef _WIN32
_Add_vtordisp1()861     void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250
_Add_vtordisp2()862     void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250
863 #endif
864 };
865 
866 // --------------------------------------------------------------------------
867 // Class basic_bgzf_istream
868 // --------------------------------------------------------------------------
869 
870 template<
871     typename Elem,
872     typename Tr = std::char_traits<Elem>,
873     typename ElemA = std::allocator<Elem>,
874     typename ByteT = char,
875     typename ByteAT = std::allocator<ByteT>
876 >
877 class basic_bgzf_istream :
878     public basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT>,
879     public std::basic_istream<Elem,Tr>
880 {
881 public:
882     typedef basic_bgzf_istreambase<Elem,Tr,ElemA,ByteT,ByteAT> bgzf_istreambase_type;
883     typedef std::basic_istream<Elem,Tr>                        istream_type;
884     typedef istream_type &                                     istream_reference;
885     typedef char                                               byte_type;
886 
basic_bgzf_istream(istream_reference istream_)887     basic_bgzf_istream(istream_reference istream_) :
888         bgzf_istreambase_type(istream_),
889         istream_type(bgzf_istreambase_type::rdbuf()),
890         m_is_gzip(false),
891         m_gbgzf_data_size(0)
892     {};
893 
894     // returns true if it is a gzip file
is_gzip()895     bool is_gzip() const                { return m_is_gzip; };
896     // return data size check
check_data_size()897     bool check_data_size() const        { return this->get_out_size() == m_gbgzf_data_size; };
898 
899     // return the data size in the file
get_gbgzf_data_size()900     long get_gbgzf_data_size() const    { return m_gbgzf_data_size; };
901 
902 protected:
903     static void read_long(istream_reference in_, unsigned long& x_);
904 
905     int check_header();
906     bool m_is_gzip;
907     unsigned long m_gbgzf_data_size;
908 
909 #ifdef _WIN32
910 private:
_Add_vtordisp1()911     void _Add_vtordisp1() { } // Required to avoid VC++ warning C4250
_Add_vtordisp2()912     void _Add_vtordisp2() { } // Required to avoid VC++ warning C4250
913 #endif
914 };
915 
916 // ===========================================================================
917 // Typedefs
918 // ===========================================================================
919 
920 // A typedef for basic_bgzf_ostream<char>
921 typedef basic_bgzf_ostream<char> bgzf_ostream;
922 // A typedef for basic_bgzf_ostream<wchar_t>
923 typedef basic_bgzf_ostream<wchar_t> bgzf_wostream;
924 // A typedef for basic_bgzf_istream<char>
925 typedef basic_bgzf_istream<char> bgzf_istream;
926 // A typedef for basic_bgzf_istream<wchart>
927 typedef basic_bgzf_istream<wchar_t> bgzf_wistream;
928 
929 }  // namespace seqan
930 
931 #endif // INCLUDE_SEQAN_STREAM_IOSTREAM_BGZF_H_
932