1 #ifndef PAT_H_
2 #define PAT_H_
3
4 #include <iostream>
5 #include <cassert>
6 #include <cmath>
7 #include <zlib.h>
8 #include <sys/stat.h>
9 #include <stdexcept>
10 #include <string>
11 #include <cstring>
12 #include <ctype.h>
13 #include <fstream>
14
15 #include "alphabet.h"
16 #include "assert_helpers.h"
17 #include "ds.h"
18 #include "ds.h"
19 #include "filebuf.h"
20 #include "qual.h"
21 #include "random_source.h"
22 #include "read.h"
23 #include "search_globals.h"
24 #include "sstring.h"
25 #include "threading.h"
26 #include "tokenize.h"
27 #include "util.h"
28
29 #ifdef _WIN32
30 #define getc_unlocked _fgetc_nolock
31 #endif
32
33 /**
34 * Classes and routines for reading reads from various input sources.
35 */
36
37 using namespace std;
38
39 typedef uint64_t TReadId;
40
41 /**
42 * Parameters affecting how reads and read in.
43 * Note: Bowtie 2 uses this but Bowtie doesn't yet.
44 */
45 struct PatternParams {
46
PatternParamsPatternParams47 PatternParams() { }
48
PatternParamsPatternParams49 PatternParams(
50 int format_,
51 bool fileParallel_,
52 uint32_t seed_,
53 size_t max_buf_,
54 bool solexa64_,
55 bool phred64_,
56 bool intQuals_,
57 int sampleLen_,
58 int sampleFreq_,
59 size_t skip_,
60 int nthreads_,
61 bool fixName_) :
62 format(format_),
63 fileParallel(fileParallel_),
64 seed(seed_),
65 max_buf(max_buf_),
66 solexa64(solexa64_),
67 phred64(phred64_),
68 intQuals(intQuals_),
69 sampleLen(sampleLen_),
70 sampleFreq(sampleFreq_),
71 skip(skip_),
72 nthreads(nthreads_),
73 fixName(fixName_) { }
74
75 int format; // file format
76 bool fileParallel; // true -> wrap files with separate PatternComposers
77 uint32_t seed; // pseudo-random seed
78 size_t max_buf; // number of reads to buffer in one read
79 bool solexa64; // true -> qualities are on solexa64 scale
80 bool phred64; // true -> qualities are on phred64 scale
81 bool intQuals; // true -> qualities are space-separated numbers
82 int sampleLen; // length of sampled reads for FastaContinuous...
83 int sampleFreq; // frequency of sampled reads for FastaContinuous...
84 size_t skip; // skip the first 'skip' patterns
85 int nthreads; // number of threads for locking
86 bool fixName; //
87 };
88
89 /**
90 * All per-thread storage for input read data.
91 */
92 struct PerThreadReadBuf {
93
PerThreadReadBufPerThreadReadBuf94 PerThreadReadBuf(size_t max_buf) :
95 max_buf_(max_buf),
96 bufa_(max_buf),
97 bufb_(max_buf),
98 rdid_()
99 {
100 bufa_.resize(max_buf);
101 bufb_.resize(max_buf);
102 reset();
103 }
104
read_aPerThreadReadBuf105 Read& read_a() { return bufa_[cur_buf_]; }
read_bPerThreadReadBuf106 Read& read_b() { return bufb_[cur_buf_]; }
107
read_aPerThreadReadBuf108 const Read& read_a() const { return bufa_[cur_buf_]; }
read_bPerThreadReadBuf109 const Read& read_b() const { return bufb_[cur_buf_]; }
110
111 /**
112 * Return read id for read/pair currently in the buffer.
113 */
rdidPerThreadReadBuf114 TReadId rdid() const {
115 assert_neq(rdid_, std::numeric_limits<TReadId>::max());
116 return rdid_ + cur_buf_;
117 }
118
119 /**
120 * Reset state as though no reads have been read.
121 */
resetPerThreadReadBuf122 void reset() {
123 cur_buf_ = bufa_.size();
124 for(size_t i = 0; i < max_buf_; i++) {
125 bufa_[i].reset();
126 bufb_[i].reset();
127 }
128 rdid_ = std::numeric_limits<TReadId>::max();
129 }
130
131 /**
132 * Advance cursor to next element
133 */
nextPerThreadReadBuf134 void next() {
135 assert_lt(cur_buf_, bufa_.size());
136 cur_buf_++;
137 }
138
139 /**
140 * Return true when there's nothing left to dish out.
141 */
exhaustedPerThreadReadBuf142 bool exhausted() {
143 assert_leq(cur_buf_, bufa_.size());
144 return cur_buf_ >= bufa_.size()-1;
145 }
146
147 /**
148 * Just after a new batch has been loaded, use init to
149 * set the cuf_buf_ and rdid_ fields appropriately.
150 */
initPerThreadReadBuf151 void init() {
152 cur_buf_ = 0;
153 }
154
155 /**
156 * Set the read id of the first read in the buffer.
157 */
setReadIdPerThreadReadBuf158 void setReadId(TReadId rdid) {
159 rdid_ = rdid;
160 assert_neq(rdid_, std::numeric_limits<TReadId>::max());
161 }
162
163 const size_t max_buf_; // max # reads to read into buffer at once
164 EList<Read> bufa_; // Read buffer for mate as
165 EList<Read> bufb_; // Read buffer for mate bs
166 size_t cur_buf_; // Read buffer currently active
167 TReadId rdid_; // index of read at offset 0 of bufa_/bufb_
168 };
169
170
171 /**
172 * Encapsulates a synchronized source of patterns; usually a file.
173 * Handles dumping patterns to a logfile (useful for debugging). Also
174 * optionally reverses reads and quality strings before returning them,
175 * though that is usually more efficiently done by the concrete
176 * subclass. Concrete subclasses should delimit critical sections with
177 * ThreadSafe objects.
178 */
179 class PatternSource {
180 public:
PatternSource()181 PatternSource() :
182 readCnt_(0),
183 mutex()
184 { }
185
~PatternSource()186 virtual ~PatternSource() { }
187
188 /**
189 * Implementation to be provided by concrete subclasses. An
190 * implementation for this member is only relevant for formats
191 * where individual input sources look like single-end-read
192 * sources, e.g., formats where paired-end reads are specified in
193 * parallel read files.
194 */
195 virtual std::pair<bool, int> nextBatch(
196 PerThreadReadBuf& pt,
197 bool batch_a,
198 bool lock = true) = 0;
199
200 /**
201 * Finishes parsing a given read. Happens outside the critical section.
202 */
203 virtual bool parse(Read& ra, Read& rb, TReadId rdid) const = 0;
204
205 /// Reset state to start over again with the first read
reset()206 virtual void reset() { readCnt_ = 0; }
207
208 /**
209 * Return the number of reads attempted.
210 */
readCount()211 TReadId readCount() const { return readCnt_; }
212
213 protected:
214
215 /**
216 * Default format for dumping a read to an output stream. Concrete
217 * subclasses might want to do something fancier.
218 */
dump(ostream & out,const BTDnaString & seq,const BTString & qual,const BTString & name)219 virtual void dump(ostream& out,
220 const BTDnaString& seq,
221 const BTString& qual,
222 const BTString& name)
223 {
224 out << name << ": " << seq << " " << qual << endl;
225 }
226
227 /// The number of reads read by this PatternSource
228 volatile uint64_t readCnt_;
229
230 /// Lock enforcing mutual exclusion for (a) file I/O, (b) writing fields
231 /// of this or another other shared object.
232 MUTEX_T mutex;
233 };
234
235 /**
236 * Encapsualtes a source of patterns where each raw pattern is trimmed
237 * by some user-defined amount on the 3' and 5' ends. Doesn't
238 * implement the actual trimming - that's up to the concrete
239 * descendants.
240 */
241 class TrimmingPatternSource : public PatternSource {
242 public:
243 TrimmingPatternSource(int trim3 = 0,
244 int trim5 = 0) :
PatternSource()245 PatternSource(),
246 trim3_(trim3), trim5_(trim5) { }
247 protected:
248 int trim3_;
249 int trim5_;
250 };
251
252 extern void wrongQualityFormat(const BTString& read_name);
253 extern void tooFewQualities(const BTString& read_name);
254 extern void tooManyQualities(const BTString& read_name);
255 extern void tooManySeqChars(const BTString& read_name);
256
257 /**
258 * Encapsulates a source of patterns which is an in-memory vector.
259 */
260 class VectorPatternSource : public TrimmingPatternSource {
261 public:
262 VectorPatternSource(
263 const EList<string>& v,
264 int trim3 = 0,
265 int trim5 = 0);
266
~VectorPatternSource()267 virtual ~VectorPatternSource() { }
268
269 /**
270 * Read next batch. However, batch concept is not very applicable for this
271 * PatternSource where all the info has already been parsed into the fields
272 * in the contsructor. This essentially modifies the pt as though we read
273 * in some number of patterns.
274 */
275 virtual pair<bool, int> nextBatch(
276 PerThreadReadBuf& pt,
277 bool batch_a,
278 bool lock = true);
279
280 /**
281 * Reset so that next call to nextBatch* gets the first batch.
282 */
reset()283 virtual void reset() {
284 TrimmingPatternSource::reset();
285 cur_ = 0;
286 paired_ = false;
287 }
288
289 /**
290 * Finishes parsing outside the critical section
291 */
292 virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
293
294 private:
295
296 pair<bool, int> nextBatchImpl(
297 PerThreadReadBuf& pt,
298 bool batch_a);
299
300 size_t cur_; // index for first read of next batch
301 bool paired_; // whether reads are paired
302 EList<std::string> tokbuf_; // buffer for storing parsed tokens
303 EList<std::string> bufs_; // per-read buffers
304 char nametmp_[20]; // temp buffer for constructing name
305 };
306
307 /**
308 * Parent class for PatternSources that read from a file.
309 * Uses unlocked C I/O, on the assumption that all reading
310 * from the file will take place in an otherwise-protected
311 * critical section.
312 */
313 class CFilePatternSource : public TrimmingPatternSource {
314 public:
315 CFilePatternSource(
316 const EList<string>& infiles,
317 const EList<string>* qinfiles,
318 int trim3 = 0,
319 int trim5 = 0) :
TrimmingPatternSource(trim3,trim5)320 TrimmingPatternSource( trim3, trim5),
321 infiles_(infiles),
322 filecur_(0),
323 fp_(NULL),
324 qfp_(NULL),
325 zfp_(NULL),
326 is_open_(false),
327 first_(true)
328 {
329 qinfiles_.clear();
330 if(qinfiles != NULL) qinfiles_ = *qinfiles;
331 assert_gt(infiles.size(), 0);
332 errs_.resize(infiles_.size());
333 errs_.fill(0, infiles_.size(), false);
334 if(qinfiles_.size() > 0 &&
335 qinfiles_.size() != infiles_.size())
336 {
337 cerr << "Error: Different numbers of input FASTA/quality files ("
338 << infiles_.size() << "/" << qinfiles_.size() << ")" << endl;
339 throw 1;
340 }
341 open(); // open first file in the list
342 filecur_++;
343 }
344
~CFilePatternSource()345 virtual ~CFilePatternSource() {
346 if(is_open_) {
347 if (compressed_) {
348 gzclose(zfp_);
349 zfp_ = NULL;
350 }
351 else if (fp_ != stdin) {
352 fclose(fp_);
353 fp_ = NULL;
354 }
355 if(qfp_ != NULL && qfp_ != stdin) {
356 fclose(qfp_);
357 qfp_ = NULL;
358 }
359 assert(zfp_ == NULL);
360 assert(fp_ == NULL || fp_ == stdin);
361 assert(qfp_ == NULL || qfp_ == stdin);
362 }
363 }
364
365 /**
366 * Fill Read with the sequence, quality and name for the next
367 * read in the list of read files. This function gets called by
368 * all the search threads, so we must handle synchronization.
369 *
370 * Returns pair<bool, int> where bool indicates whether we're
371 * completely done, and int indicates how many reads were read.
372 */
373 virtual pair<bool, int> nextBatch(
374 PerThreadReadBuf& pt,
375 bool batch_a,
376 bool lock);
377
378 /**
379 * Reset so that next call to nextBatch* gets the first batch.
380 * Should only be called by the master thread.
381 */
reset()382 virtual void reset() {
383 TrimmingPatternSource::reset();
384 filecur_ = 0,
385 open();
386 filecur_++;
387 }
388
389 protected:
390
391 /**
392 * Light-parse a batch of unpaired reads from current file into the given
393 * buffer. Called from CFilePatternSource.nextBatch().
394 */
395 virtual std::pair<bool, int> nextBatchFromFile(
396 PerThreadReadBuf& pt,
397 bool batch_a,
398 size_t read_idx) = 0;
399
400 /**
401 * Reset state to handle a fresh file
402 */
resetForNextFile()403 virtual void resetForNextFile() { }
404
405 /**
406 * Open the next file in the list of input files.
407 */
408 void open();
409
getc_wrapper()410 int getc_wrapper() {
411 return compressed_ ? gzgetc(zfp_) : getc_unlocked(fp_);
412 }
413
ungetc_wrapper(int c)414 int ungetc_wrapper(int c) {
415 return compressed_ ? gzungetc(c, zfp_) : ungetc(c, fp_);
416 }
417
is_gzipped_file(const std::string & filename)418 bool is_gzipped_file(const std::string& filename) {
419 struct stat s;
420 if (stat(filename.c_str(), &s) != 0) {
421 perror("stat");
422 }
423 else {
424 if (S_ISFIFO(s.st_mode))
425 return true;
426 }
427 size_t pos = filename.find_last_of(".");
428 std::string ext = (pos == std::string::npos) ? "" : filename.substr(pos + 1);
429 if (ext == "" || ext == "gz" || ext == "Z") {
430 return true;
431 }
432 return false;
433 }
434
435 EList<string> infiles_; /// filenames for read files
436 EList<string> qinfiles_; /// filenames for quality files
437 EList<bool> errs_; /// whether we've already printed an error for each file
438 size_t filecur_; /// index into infiles_ of next file to read
439 FILE *fp_; /// read file currently being read from
440 FILE *qfp_; /// quality file currently being read from
441 gzFile zfp_;
442 bool is_open_; /// whether fp_ is currently open
443 bool first_;
444 char buf_[64*1024]; /// file buffer for sequences
445 char qbuf_[64*1024]; /// file buffer for qualities
446 bool compressed_;
447
448 private:
449
450 pair<bool, int> nextBatchImpl(
451 PerThreadReadBuf& pt,
452 bool batch_a);
453
454 };
455
456 /**
457 * Synchronized concrete pattern source for a list of FASTA
458 */
459 class FastaPatternSource : public CFilePatternSource {
460
461 public:
462
463 FastaPatternSource(
464 const EList<string>& infiles,
465 const EList<string>* qinfiles,
466 int trim3 = 0,
467 int trim5 = 0) :
CFilePatternSource(infiles,qinfiles,trim3,trim5)468 CFilePatternSource(
469 infiles,
470 qinfiles,
471 trim3,
472 trim5),
473 first_(true) { }
474
475 /**
476 * Reset so that next call to nextBatch* gets the first batch.
477 * Should only be called by the master thread.
478 */
reset()479 virtual void reset() {
480 first_ = true;
481 CFilePatternSource::reset();
482 }
483
484 /**
485 * Finalize FASTA parsing outside critical section.
486 */
487 virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
488
489 protected:
490
491 /**
492 * Light-parse a FASTA batch into the given buffer.
493 */
494 virtual std::pair<bool, int> nextBatchFromFile(
495 PerThreadReadBuf& pt,
496 bool batch_a,
497 size_t read_idx);
498
499 /**
500 * Reset state to handle a fresh file
501 */
resetForNextFile()502 virtual void resetForNextFile() {
503 first_ = true;
504 }
505
dump(ostream & out,const BTDnaString & seq,const BTString & qual,const BTString & name)506 virtual void dump(ostream& out,
507 const BTDnaString& seq,
508 const BTString& qual,
509 const BTString& name)
510 {
511 out << ">" << name << endl << seq << endl;
512 }
513
514 private:
515
516 bool first_;
517 };
518
519
520 /**
521 * Tokenize a line of space-separated integer quality values.
522 */
tokenizeQualLine(FileBuf & filebuf,char * buf,size_t buflen,EList<string> & toks)523 static inline bool tokenizeQualLine(FileBuf& filebuf, char *buf, size_t buflen, EList<string>& toks) {
524 size_t rd = filebuf.gets(buf, buflen);
525 if(rd == 0) return false;
526 assert(NULL == strrchr(buf, '\n'));
527 tokenize(string(buf), " ", toks);
528 return true;
529 }
530
531 /**
532 * Synchronized concrete pattern source for a list of files with tab-
533 * delimited name, seq, qual fields (or, for paired-end reads,
534 * basename, seq1, qual1, seq2, qual2).
535 */
536 class TabbedPatternSource : public CFilePatternSource {
537 public:
538 TabbedPatternSource(
539 const EList<string>& infiles,
540 bool secondName, // whether it's --12/--tab5 or --tab6
541 int trim3 = 0,
542 int trim5 = 0,
543 bool solQuals = false,
544 bool phred64Quals = false,
545 bool intQuals = false) :
CFilePatternSource(infiles,NULL,trim3,trim5)546 CFilePatternSource(
547 infiles,
548 NULL,
549 trim3,
550 trim5),
551 solQuals_(solQuals),
552 phred64Quals_(phred64Quals),
553 intQuals_(intQuals) { }
554
555 /**
556 * Finalize tabbed parsing outside critical section.
557 */
558 virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
559
560 protected:
561
562 /**
563 * Light-parse a batch of tabbed-format reads into given buffer.
564 */
565 virtual std::pair<bool, int> nextBatchFromFile(
566 PerThreadReadBuf& pt,
567 bool batch_a,
568 size_t read_idx);
569
570 /**
571 * Dump a FASTQ-style record for the read.
572 */
dump(ostream & out,const BTDnaString & seq,const BTString & qual,const BTString & name)573 virtual void dump(ostream& out,
574 const BTDnaString& seq,
575 const BTString& qual,
576 const BTString& name)
577 {
578 out << "@" << name << endl << seq << endl
579 << "+" << endl << qual << endl;
580 }
581
582 protected:
583
584 bool solQuals_; // base qualities are log odds
585 bool phred64Quals_; // base qualities are on -64 scale
586 bool intQuals_; // base qualities are space-separated strings
587 bool secondName_; // true if --tab6, false if --tab5
588 };
589
590 /**
591 * Synchronized concrete pattern source for a list of FASTA files where
592 * reads need to be extracted from long continuous sequences.
593 */
594 class FastaContinuousPatternSource : public CFilePatternSource {
595 public:
FastaContinuousPatternSource(const EList<string> & infiles,size_t length,size_t freq)596 FastaContinuousPatternSource(
597 const EList<string>& infiles,
598 size_t length,
599 size_t freq) :
600 CFilePatternSource(
601 infiles,
602 NULL,
603 0,
604 0),
605 length_(length),
606 freq_(freq),
607 eat_(length_-1),
608 beginning_(true),
609 bufCur_(0),
610 cur_(0llu),
611 last_(0llu)
612 {
613 assert_gt(freq_, 0);
614 resetForNextFile();
615 assert_lt(length_, 1024);
616 }
617
reset()618 virtual void reset() {
619 CFilePatternSource::reset();
620 resetForNextFile();
621 }
622
623 /**
624 * Finalize FASTA parsing outside critical section.
625 */
626 virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
627
628 protected:
629
630 /**
631 * Light-parse a batch into the given buffer.
632 */
633 virtual std::pair<bool, int> nextBatchFromFile(
634 PerThreadReadBuf& pt,
635 bool batch_a,
636 size_t read_idx);
637
638 /**
639 * Reset state to be read for the next file.
640 */
resetForNextFile()641 virtual void resetForNextFile() {
642 eat_ = length_-1;
643 name_prefix_buf_.clear();
644 beginning_ = true;
645 bufCur_ = 0;
646 last_ = cur_;
647 }
648
649 private:
650 const size_t length_; /// length of reads to generate
651 const size_t freq_; /// frequency to sample reads
652 size_t eat_; /// number of characters we need to skip before
653 /// we have flushed all of the ambiguous or
654 /// non-existent characters out of our read
655 /// window
656 bool beginning_; /// skipping over the first read length?
657 char buf_[1024]; /// read buffer
658 std::string name_prefix_buf_; /// FASTA sequence name buffer
659 char name_int_buf_[20]; /// for composing offsets for names
660 size_t bufCur_; /// buffer cursor; points to where we should
661 /// insert the next character
662 uint64_t cur_;
663 uint64_t last_;/// number to subtract from readCnt_ to get
664 /// the pat id to output (so it resets to 0 for
665 /// each new sequence)
666 };
667
668 /**
669 * Read a FASTQ-format file.
670 * See: http://maq.sourceforge.net/fastq.shtml
671 */
672 class FastqPatternSource : public CFilePatternSource {
673 public:
674 FastqPatternSource(
675 const EList<string>& infiles,
676 int trim3 = 0,
677 int trim5 = 0,
678 bool solexa_quals = false,
679 bool phred64Quals = false,
680 bool integer_quals = false,
681 bool interleaved = false,
682 uint32_t skip = 0) :
CFilePatternSource(infiles,NULL,trim3,trim5)683 CFilePatternSource(
684 infiles,
685 NULL,
686 trim3,
687 trim5),
688 first_(true),
689 solQuals_(solexa_quals),
690 phred64Quals_(phred64Quals),
691 intQuals_(integer_quals),
692 interleaved_(interleaved) { }
693
reset()694 virtual void reset() {
695 first_ = true;
696 CFilePatternSource::reset();
697 }
698
699 /**
700 * Finalize FASTQ parsing outside critical section.
701 */
702 virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
703
704 protected:
705
706 /**
707 * "Light" parser. This is inside the critical section, so the key is to do
708 * just enough parsing so that another function downstream (finalize()) can do
709 * the rest of the parsing. Really this function's only job is to stick every
710 * for lines worth of the input file into a buffer (r.readOrigBuf). finalize()
711 * then parses the contents of r.readOrigBuf later.
712 */
713 virtual pair<bool, int> nextBatchFromFile(
714 PerThreadReadBuf& pt,
715 bool batch_a,
716 size_t read_idx);
717
resetForNextFile()718 virtual void resetForNextFile() {
719 first_ = true;
720 }
721
dump(ostream & out,const BTDnaString & seq,const BTString & qual,const BTString & name)722 virtual void dump(ostream& out,
723 const BTDnaString& seq,
724 const BTString& qual,
725 const BTString& name)
726 {
727 out << "@" << name << endl << seq << endl << "+" << endl << qual << endl;
728 }
729
730 private:
731
732 bool first_;
733 bool solQuals_;
734 bool phred64Quals_;
735 bool intQuals_;
736 bool interleaved_;
737 };
738
739 /**
740 * Read a Raw-format file (one sequence per line). No quality strings
741 * allowed. All qualities are assumed to be 'I' (40 on the Phred-33
742 * scale).
743 */
744 class RawPatternSource : public CFilePatternSource {
745
746 public:
747
748 RawPatternSource(
749 const EList<string>& infiles,
750 int trim3 = 0,
751 int trim5 = 0) :
CFilePatternSource(infiles,NULL,trim3,trim5)752 CFilePatternSource(
753 infiles,
754 NULL,
755 trim3,
756 trim5),
757 first_(true) { }
758
reset()759 virtual void reset() {
760 first_ = true;
761 CFilePatternSource::reset();
762 }
763
764 /**
765 * Finalize raw parsing outside critical section.
766 */
767 virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
768
769 protected:
770
771 /**
772 * Light-parse a batch into the given buffer.
773 */
774 virtual std::pair<bool, int> nextBatchFromFile(
775 PerThreadReadBuf& pt,
776 bool batch_a,
777 size_t read_idx);
778
resetForNextFile()779 virtual void resetForNextFile() {
780 first_ = true;
781 }
782
dump(ostream & out,const BTDnaString & seq,const BTString & qual,const BTString & name)783 virtual void dump(ostream& out,
784 const BTDnaString& seq,
785 const BTString& qual,
786 const BTString& name)
787 {
788 out << seq << endl;
789 }
790
791
792 private:
793
794 bool first_;
795 };
796
797 /**
798 * Abstract parent class for synhconized sources of paired-end reads
799 * (and possibly also single-end reads).
800 */
801 class PatternComposer {
802 public:
PatternComposer()803 PatternComposer() { }
804
~PatternComposer()805 virtual ~PatternComposer() { }
806
807 virtual void reset() = 0;
808
809 /**
810 * Member function override by concrete, format-specific classes.
811 */
812 virtual std::pair<bool, int> nextBatch(PerThreadReadBuf& pt) = 0;
813
814 /**
815 * Make appropriate call into the format layer to parse individual read.
816 */
817 virtual bool parse(Read& ra, Read& rb, TReadId rdid) = 0;
818
free_pmembers(const EList<PatternSource * > & elist)819 virtual void free_pmembers( const EList<PatternSource*> &elist) {
820 for (size_t i = 0; i < elist.size(); i++) {
821 if (elist[i] != NULL)
822 delete elist[i];
823 }
824 }
825
826 protected:
827
828 MUTEX_T mutex_m; /// mutex for locking critical regions
829 };
830
831 /**
832 * Encapsulates a synchronized source of both paired-end reads and
833 * unpaired reads, where the paired-end must come from parallel files.
834 */
835 class SoloPatternComposer : public PatternComposer {
836
837 public:
838
SoloPatternComposer(const EList<PatternSource * > & src)839 SoloPatternComposer(const EList<PatternSource*>& src) :
840 PatternComposer(),
841 cur_(0),
842 src_(src)
843 {
844 for(size_t i = 0; i < src_.size(); i++) {
845 assert(src_[i] != NULL);
846 }
847 }
848
849 /**
850 * Reset this object and all the PatternSources under it so that
851 * the next call to nextReadPair gets the very first read pair.
852 */
reset()853 virtual void reset() {
854 for(size_t i = 0; i < src_.size(); i++) {
855 src_[i]->reset();
856 }
857 cur_ = 0;
858 }
859
860 /**
861 * The main member function for dispensing pairs of reads or
862 * singleton reads. Returns true iff ra and rb contain a new
863 * pair; returns false if ra contains a new unpaired read.
864 */
865 pair<bool, int> nextBatch(PerThreadReadBuf& pt);
866
867 /**
868 * Make appropriate call into the format layer to parse individual read.
869 */
parse(Read & ra,Read & rb,TReadId rdid)870 virtual bool parse(Read& ra, Read& rb, TReadId rdid) {
871 return src_[0]->parse(ra, rb, rdid);
872 }
873
874 protected:
875
876 volatile uint32_t cur_; // current element in parallel srca_, srcb_ vectors
877 EList<PatternSource*> src_; /// PatternSources for paired-end reads
878 };
879
880 /**
881 * Encapsulates a synchronized source of both paired-end reads and
882 * unpaired reads, where the paired-end must come from parallel files.
883 */
884 class DualPatternComposer : public PatternComposer {
885
886 public:
887
DualPatternComposer(const EList<PatternSource * > & srca,const EList<PatternSource * > & srcb)888 DualPatternComposer(const EList<PatternSource*>& srca,
889 const EList<PatternSource*>& srcb) :
890 PatternComposer(),
891 cur_(0),
892 srca_(srca),
893 srcb_(srcb)
894 {
895 // srca_ and srcb_ must be parallel
896 assert_eq(srca_.size(), srcb_.size());
897 for(size_t i = 0; i < srca_.size(); i++) {
898 // Can't have NULL first-mate sources. Second-mate sources
899 // can be NULL, in the case when the corresponding first-
900 // mate source is unpaired.
901 assert(srca_[i] != NULL);
902 for(size_t j = 0; j < srcb_.size(); j++) {
903 assert_neq(srca_[i], srcb_[j]);
904 }
905 }
906 }
907
908 /**
909 * Reset this object and all the PatternSources under it so that
910 * the next call to nextReadPair gets the very first read pair.
911 */
reset()912 virtual void reset() {
913 for(size_t i = 0; i < srca_.size(); i++) {
914 srca_[i]->reset();
915 if(srcb_[i] != NULL) {
916 srcb_[i]->reset();
917 }
918 }
919 cur_ = 0;
920 }
921
922 /**
923 * The main member function for dispensing pairs of reads or
924 * singleton reads. Returns true iff ra and rb contain a new
925 * pair; returns false if ra contains a new unpaired read.
926 */
927 pair<bool, int> nextBatch(PerThreadReadBuf& pt);
928
929 /**
930 * Make appropriate call into the format layer to parse individual read.
931 */
parse(Read & ra,Read & rb,TReadId rdid)932 virtual bool parse(Read& ra, Read& rb, TReadId rdid) {
933 return srca_[0]->parse(ra, rb, rdid);
934 }
935
936
937 protected:
938
939 volatile uint32_t cur_; // current element in parallel srca_, srcb_ vectors
940 EList<PatternSource*> srca_; /// PatternSources for 1st mates and/or unpaired reads
941 EList<PatternSource*> srcb_; /// PatternSources for 2nd mates
942 };
943
944 /**
945 * Encapsulates a single thread's interaction with the PatternSource.
946 * Most notably, this class holds the buffers into which the
947 * PatterSource will write sequences. This class is *not* threadsafe
948 * - it doesn't need to be since there's one per thread. PatternSource
949 * is thread-safe.
950 */
951 class PatternSourcePerThread {
952
953 public:
954
PatternSourcePerThread(PatternComposer & composer,uint32_t max_buf,uint32_t skip,uint32_t seed)955 PatternSourcePerThread(
956 PatternComposer& composer,
957 uint32_t max_buf,
958 uint32_t skip,
959 uint32_t seed) :
960 composer_(composer),
961 buf_(max_buf),
962 last_batch_(false),
963 last_batch_size_(0),
964 skip_(skip),
965 seed_(seed),
966 batch_id_(0) { }
967
968 /**
969 * Get the next paired or unpaired read from the wrapped
970 * PatternComposer. Returns a pair of bools; first indicates
971 * whether we were successful, second indicates whether we're
972 * done.
973 */
974 std::pair<bool, bool> nextReadPair();
975
bufa()976 Read& bufa() { return buf_.read_a(); }
bufb()977 Read& bufb() { return buf_.read_b(); }
978
bufa()979 const Read& bufa() const { return buf_.read_a(); }
bufb()980 const Read& bufb() const { return buf_.read_b(); }
981
rdid()982 TReadId rdid() const { return buf_.rdid(); }
983
batch_id()984 size_t batch_id() const { return batch_id_; }
985
986 /**
987 * Return true iff the read currently in the buffer is a
988 * paired-end read.
989 */
paired()990 bool paired() const {
991 // can also do buf_.read_b().mate > 0, but the mate
992 // field isn't set until finalize is called, whereas
993 // parsed is set by the time parse() is finished.
994 return buf_.read_b().parsed;
995 }
996
997 private:
998
999 /**
1000 * When we've finished fully parsing and dishing out reads in
1001 * the current batch, we go get the next one by calling into
1002 * the composition layer.
1003 */
nextBatch()1004 std::pair<bool, int> nextBatch() {
1005 buf_.reset();
1006 std::pair<bool, int> res = composer_.nextBatch(buf_);
1007 buf_.init();
1008 batch_id_ = (size_t)(buf_.rdid()/buf_.max_buf_);
1009 return res;
1010 }
1011
1012 /**
1013 * Once name/sequence/qualities have been parsed for an
1014 * unpaired read, set all the other key fields of the Read
1015 * struct.
1016 */
1017 void finalize(Read& ra);
1018
1019 /**
1020 * Once name/sequence/qualities have been parsed for a
1021 * paired-end read, set all the other key fields of the Read
1022 * structs.
1023 */
1024 void finalizePair(Read& ra, Read& rb);
1025
1026 /**
1027 * Call into composition layer (which in turn calls into
1028 * format layer) to parse the read.
1029 */
parse(Read & ra,Read & rb)1030 bool parse(Read& ra, Read& rb) {
1031 return composer_.parse(ra, rb, buf_.rdid());
1032 }
1033
1034 PatternComposer& composer_; // pattern composer
1035 PerThreadReadBuf buf_; // read data buffer
1036 bool last_batch_; // true if this is final batch
1037 size_t last_batch_size_; // # reads read in previous batch
1038 uint32_t skip_; // skip reads with rdids less than this
1039 uint32_t seed_; // pseudo-random seed based on read content
1040 size_t batch_id_; // identify batches of reads for reordering
1041 };
1042
1043 /**
1044 * Abstract parent factory for PatternSourcePerThreads.
1045 */
1046 class PatternSourcePerThreadFactory {
1047 public:
PatternSourcePerThreadFactory(PatternComposer & composer,uint32_t max_buf,uint32_t skip,uint32_t seed)1048 PatternSourcePerThreadFactory(
1049 PatternComposer& composer,
1050 uint32_t max_buf,
1051 uint32_t skip,
1052 uint32_t seed):
1053 composer_(composer),
1054 max_buf_(max_buf),
1055 skip_(skip),
1056 seed_(seed) {}
1057
1058 /**
1059 * Create a new heap-allocated PatternSourcePerThreads.
1060 */
create()1061 virtual PatternSourcePerThread* create() const {
1062 return new PatternSourcePerThread(composer_, max_buf_, skip_, seed_);
1063 }
1064
1065 /**
1066 * Create a new heap-allocated vector of heap-allocated
1067 * PatternSourcePerThreads.
1068 */
create(uint32_t n)1069 virtual EList<PatternSourcePerThread*>* create(uint32_t n) const {
1070 EList<PatternSourcePerThread*>* v = new EList<PatternSourcePerThread*>;
1071 for(size_t i = 0; i < n; i++) {
1072 v->push_back(new PatternSourcePerThread(composer_, max_buf_, skip_, seed_));
1073 assert(v->back() != NULL);
1074 }
1075 return v;
1076 }
1077
1078 /// Free memory associated with a pattern source
destroy(PatternSourcePerThread * composer)1079 virtual void destroy(PatternSourcePerThread* composer) const {
1080 assert(composer != NULL);
1081 // Free the PatternSourcePerThread
1082 delete composer;
1083 }
1084
1085 /// Free memory associated with a pattern source list
destroy(EList<PatternSourcePerThread * > * composers)1086 virtual void destroy(EList<PatternSourcePerThread*>* composers) const {
1087 assert(composers != NULL);
1088 // Free all of the PatternSourcePerThreads
1089 for(size_t i = 0; i < composers->size(); i++) {
1090 if((*composers)[i] != NULL) {
1091 delete (*composers)[i];
1092 (*composers)[i] = NULL;
1093 }
1094 }
1095 // Free the vector
1096 delete composers;
1097 }
1098
~PatternSourcePerThreadFactory()1099 virtual ~PatternSourcePerThreadFactory() {}
1100
1101 private:
1102 /// Container for obtaining paired reads from PatternSources
1103 PatternComposer& composer_;
1104 /// Maximum size of batch to read in
1105 uint32_t max_buf_;
1106 uint32_t skip_;
1107 uint32_t seed_;
1108 };
1109
1110 #endif /*PAT_H_*/
1111