1 #ifndef PAT_H_
2 #define PAT_H_
3 
4 #include <iostream>
5 #include <cassert>
6 #include <cmath>
7 #include <zlib.h>
8 #include <sys/stat.h>
9 #include <stdexcept>
10 #include <string>
11 #include <cstring>
12 #include <ctype.h>
13 #include <fstream>
14 
15 #include "alphabet.h"
16 #include "assert_helpers.h"
17 #include "ds.h"
18 #include "ds.h"
19 #include "filebuf.h"
20 #include "qual.h"
21 #include "random_source.h"
22 #include "read.h"
23 #include "search_globals.h"
24 #include "sstring.h"
25 #include "threading.h"
26 #include "tokenize.h"
27 #include "util.h"
28 
29 #ifdef _WIN32
30 #define getc_unlocked _fgetc_nolock
31 #endif
32 
33 /**
34  * Classes and routines for reading reads from various input sources.
35  */
36 
37 using namespace std;
38 
39 typedef uint64_t TReadId;
40 
41 /**
42  * Parameters affecting how reads and read in.
43  * Note: Bowtie 2 uses this but Bowtie doesn't yet.
44  */
45 struct PatternParams {
46 
PatternParamsPatternParams47 	PatternParams() { }
48 
PatternParamsPatternParams49 	PatternParams(
50 		int format_,
51 		bool fileParallel_,
52 		uint32_t seed_,
53 		size_t max_buf_,
54 		bool solexa64_,
55 		bool phred64_,
56 		bool intQuals_,
57 		int sampleLen_,
58 		int sampleFreq_,
59 		size_t skip_,
60 		int nthreads_,
61 		bool fixName_) :
62 		format(format_),
63 		fileParallel(fileParallel_),
64 		seed(seed_),
65 		max_buf(max_buf_),
66 		solexa64(solexa64_),
67 		phred64(phred64_),
68 		intQuals(intQuals_),
69 		sampleLen(sampleLen_),
70 		sampleFreq(sampleFreq_),
71 		skip(skip_),
72 		nthreads(nthreads_),
73 		fixName(fixName_) { }
74 
75 	int format;           // file format
76 	bool fileParallel;    // true -> wrap files with separate PatternComposers
77 	uint32_t seed;        // pseudo-random seed
78 	size_t max_buf;       // number of reads to buffer in one read
79 	bool solexa64;        // true -> qualities are on solexa64 scale
80 	bool phred64;         // true -> qualities are on phred64 scale
81 	bool intQuals;        // true -> qualities are space-separated numbers
82 	int sampleLen;        // length of sampled reads for FastaContinuous...
83 	int sampleFreq;       // frequency of sampled reads for FastaContinuous...
84 	size_t skip;          // skip the first 'skip' patterns
85 	int nthreads;         // number of threads for locking
86 	bool fixName;         //
87 };
88 
89 /**
90  * All per-thread storage for input read data.
91  */
92 struct PerThreadReadBuf {
93 
PerThreadReadBufPerThreadReadBuf94 	PerThreadReadBuf(size_t max_buf) :
95 		max_buf_(max_buf),
96 		bufa_(max_buf),
97 		bufb_(max_buf),
98 		rdid_()
99 	{
100 		bufa_.resize(max_buf);
101 		bufb_.resize(max_buf);
102 		reset();
103 	}
104 
read_aPerThreadReadBuf105 	Read& read_a() { return bufa_[cur_buf_]; }
read_bPerThreadReadBuf106 	Read& read_b() { return bufb_[cur_buf_]; }
107 
read_aPerThreadReadBuf108 	const Read& read_a() const { return bufa_[cur_buf_]; }
read_bPerThreadReadBuf109 	const Read& read_b() const { return bufb_[cur_buf_]; }
110 
111 	/**
112 	 * Return read id for read/pair currently in the buffer.
113 	 */
rdidPerThreadReadBuf114 	TReadId rdid() const {
115 		assert_neq(rdid_, std::numeric_limits<TReadId>::max());
116 		return rdid_ + cur_buf_;
117 	}
118 
119 	/**
120 	 * Reset state as though no reads have been read.
121 	 */
resetPerThreadReadBuf122 	void reset() {
123 		cur_buf_ = bufa_.size();
124 		for(size_t i = 0; i < max_buf_; i++) {
125 			bufa_[i].reset();
126 			bufb_[i].reset();
127 		}
128 		rdid_ = std::numeric_limits<TReadId>::max();
129 	}
130 
131 	/**
132 	 * Advance cursor to next element
133 	 */
nextPerThreadReadBuf134 	void next() {
135 		assert_lt(cur_buf_, bufa_.size());
136 		cur_buf_++;
137 	}
138 
139 	/**
140 	 * Return true when there's nothing left to dish out.
141 	 */
exhaustedPerThreadReadBuf142 	bool exhausted() {
143 		assert_leq(cur_buf_, bufa_.size());
144 		return cur_buf_ >= bufa_.size()-1;
145 	}
146 
147 	/**
148 	 * Just after a new batch has been loaded, use init to
149 	 * set the cuf_buf_ and rdid_ fields appropriately.
150 	 */
initPerThreadReadBuf151 	void init() {
152 		cur_buf_ = 0;
153 	}
154 
155 	/**
156 	 * Set the read id of the first read in the buffer.
157 	 */
setReadIdPerThreadReadBuf158 	void setReadId(TReadId rdid) {
159 		rdid_ = rdid;
160 		assert_neq(rdid_, std::numeric_limits<TReadId>::max());
161 	}
162 
163 	const size_t max_buf_; // max # reads to read into buffer at once
164 	EList<Read> bufa_; // Read buffer for mate as
165 	EList<Read> bufb_; // Read buffer for mate bs
166 	size_t cur_buf_;       // Read buffer currently active
167 	TReadId rdid_;         // index of read at offset 0 of bufa_/bufb_
168 };
169 
170 
171 /**
172  * Encapsulates a synchronized source of patterns; usually a file.
173  * Handles dumping patterns to a logfile (useful for debugging).  Also
174  * optionally reverses reads and quality strings before returning them,
175  * though that is usually more efficiently done by the concrete
176  * subclass.  Concrete subclasses should delimit critical sections with
177  * ThreadSafe objects.
178  */
179 class PatternSource {
180 public:
PatternSource()181 	PatternSource() :
182 		readCnt_(0),
183 		mutex()
184 	{ }
185 
~PatternSource()186 	virtual ~PatternSource() { }
187 
188 	/**
189 	 * Implementation to be provided by concrete subclasses.  An
190 	 * implementation for this member is only relevant for formats
191 	 * where individual input sources look like single-end-read
192 	 * sources, e.g., formats where paired-end reads are specified in
193 	 * parallel read files.
194 	 */
195 	virtual std::pair<bool, int> nextBatch(
196 		PerThreadReadBuf& pt,
197 		bool batch_a,
198 		bool lock = true) = 0;
199 
200 	/**
201 	 * Finishes parsing a given read.  Happens outside the critical section.
202 	 */
203 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) const = 0;
204 
205 	/// Reset state to start over again with the first read
reset()206 	virtual void reset() { readCnt_ = 0; }
207 
208 	/**
209 	 * Return the number of reads attempted.
210 	 */
readCount()211 	TReadId readCount() const { return readCnt_; }
212 
213 protected:
214 
215 	/**
216 	 * Default format for dumping a read to an output stream.  Concrete
217 	 * subclasses might want to do something fancier.
218 	 */
dump(ostream & out,const BTDnaString & seq,const BTString & qual,const BTString & name)219 	virtual void dump(ostream& out,
220 	                  const BTDnaString& seq,
221 	                  const BTString& qual,
222 	                  const BTString& name)
223 	{
224 		out << name << ": " << seq << " " << qual << endl;
225 	}
226 
227 	/// The number of reads read by this PatternSource
228 	volatile uint64_t readCnt_;
229 
230 	/// Lock enforcing mutual exclusion for (a) file I/O, (b) writing fields
231 	/// of this or another other shared object.
232 	MUTEX_T mutex;
233 };
234 
235 /**
236  * Encapsualtes a source of patterns where each raw pattern is trimmed
237  * by some user-defined amount on the 3' and 5' ends.  Doesn't
238  * implement the actual trimming - that's up to the concrete
239  * descendants.
240  */
241 class TrimmingPatternSource : public PatternSource {
242 public:
243 	TrimmingPatternSource(int trim3 = 0,
244 	                      int trim5 = 0) :
PatternSource()245 		PatternSource(),
246 		trim3_(trim3), trim5_(trim5) { }
247 protected:
248 	int trim3_;
249 	int trim5_;
250 };
251 
252 extern void wrongQualityFormat(const BTString& read_name);
253 extern void tooFewQualities(const BTString& read_name);
254 extern void tooManyQualities(const BTString& read_name);
255 extern void tooManySeqChars(const BTString& read_name);
256 
257 /**
258  * Encapsulates a source of patterns which is an in-memory vector.
259  */
260 class VectorPatternSource : public TrimmingPatternSource {
261 public:
262 	VectorPatternSource(
263 		const EList<string>& v,
264 		int trim3 = 0,
265 		int trim5 = 0);
266 
~VectorPatternSource()267 	virtual ~VectorPatternSource() { }
268 
269 	/**
270 	 * Read next batch.  However, batch concept is not very applicable for this
271 	 * PatternSource where all the info has already been parsed into the fields
272 	 * in the contsructor.  This essentially modifies the pt as though we read
273 	 * in some number of patterns.
274 	 */
275 	virtual pair<bool, int> nextBatch(
276 		PerThreadReadBuf& pt,
277 		bool batch_a,
278 		bool lock = true);
279 
280 	/**
281 	 * Reset so that next call to nextBatch* gets the first batch.
282 	 */
reset()283 	virtual void reset() {
284 		TrimmingPatternSource::reset();
285 		cur_ = 0;
286 		paired_ = false;
287 	}
288 
289 	/**
290 	 * Finishes parsing outside the critical section
291 	 */
292 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
293 
294 private:
295 
296 	pair<bool, int> nextBatchImpl(
297 		PerThreadReadBuf& pt,
298 		bool batch_a);
299 
300 	size_t cur_;                      // index for first read of next batch
301 	bool paired_;                     // whether reads are paired
302 	EList<std::string> tokbuf_; // buffer for storing parsed tokens
303 	EList<std::string> bufs_;   // per-read buffers
304 	char nametmp_[20];                // temp buffer for constructing name
305 };
306 
307 /**
308  * Parent class for PatternSources that read from a file.
309  * Uses unlocked C I/O, on the assumption that all reading
310  * from the file will take place in an otherwise-protected
311  * critical section.
312  */
313 class CFilePatternSource : public TrimmingPatternSource {
314 public:
315 	CFilePatternSource(
316 		const EList<string>& infiles,
317 		const EList<string>* qinfiles,
318 		int trim3 = 0,
319 	    int trim5 = 0) :
TrimmingPatternSource(trim3,trim5)320 		TrimmingPatternSource( trim3, trim5),
321 		infiles_(infiles),
322 		filecur_(0),
323 		fp_(NULL),
324 		qfp_(NULL),
325 		zfp_(NULL),
326 		is_open_(false),
327 		first_(true)
328 	{
329 		qinfiles_.clear();
330 		if(qinfiles != NULL) qinfiles_ = *qinfiles;
331 		assert_gt(infiles.size(), 0);
332 		errs_.resize(infiles_.size());
333 		errs_.fill(0, infiles_.size(), false);
334 		if(qinfiles_.size() > 0 &&
335 		   qinfiles_.size() != infiles_.size())
336 		{
337 			cerr << "Error: Different numbers of input FASTA/quality files ("
338 			     << infiles_.size() << "/" << qinfiles_.size() << ")" << endl;
339 			throw 1;
340 		}
341 		open(); // open first file in the list
342 		filecur_++;
343 	}
344 
~CFilePatternSource()345 	virtual ~CFilePatternSource() {
346 		if(is_open_) {
347 			if (compressed_) {
348 				gzclose(zfp_);
349 				zfp_ = NULL;
350 			}
351 			else if (fp_ != stdin) {
352 				fclose(fp_);
353 				fp_ = NULL;
354 			}
355 			if(qfp_ != NULL && qfp_ != stdin) {
356 				fclose(qfp_);
357 				qfp_ = NULL;
358 			}
359 			assert(zfp_ == NULL);
360 			assert(fp_ == NULL || fp_ == stdin);
361 			assert(qfp_ == NULL || qfp_ == stdin);
362 		}
363 	}
364 
365 	/**
366 	 * Fill Read with the sequence, quality and name for the next
367 	 * read in the list of read files.  This function gets called by
368 	 * all the search threads, so we must handle synchronization.
369 	 *
370 	 * Returns pair<bool, int> where bool indicates whether we're
371 	 * completely done, and int indicates how many reads were read.
372 	 */
373 	virtual pair<bool, int> nextBatch(
374 		PerThreadReadBuf& pt,
375 		bool batch_a,
376 		bool lock);
377 
378 	/**
379 	 * Reset so that next call to nextBatch* gets the first batch.
380 	 * Should only be called by the master thread.
381 	 */
reset()382 	virtual void reset() {
383 		TrimmingPatternSource::reset();
384 		filecur_ = 0,
385 		open();
386 		filecur_++;
387 	}
388 
389 protected:
390 
391 	/**
392 	 * Light-parse a batch of unpaired reads from current file into the given
393 	 * buffer.  Called from CFilePatternSource.nextBatch().
394 	 */
395 	virtual std::pair<bool, int> nextBatchFromFile(
396 		PerThreadReadBuf& pt,
397 		bool batch_a,
398 		size_t read_idx) = 0;
399 
400 	/**
401 	 * Reset state to handle a fresh file
402 	 */
resetForNextFile()403 	virtual void resetForNextFile() { }
404 
405 	/**
406 	 * Open the next file in the list of input files.
407 	 */
408 	void open();
409 
getc_wrapper()410 	int getc_wrapper() {
411 		return compressed_ ? gzgetc(zfp_) : getc_unlocked(fp_);
412 	}
413 
ungetc_wrapper(int c)414 	int ungetc_wrapper(int c) {
415 		return compressed_ ? gzungetc(c, zfp_) : ungetc(c, fp_);
416 	}
417 
is_gzipped_file(const std::string & filename)418 	bool is_gzipped_file(const std::string& filename) {
419 		struct stat s;
420 		if (stat(filename.c_str(), &s) != 0) {
421 			perror("stat");
422 		}
423 		else {
424 			if (S_ISFIFO(s.st_mode))
425 				return true;
426 		}
427 		size_t pos = filename.find_last_of(".");
428 		std::string ext = (pos == std::string::npos) ? "" : filename.substr(pos + 1);
429 		if (ext == "" || ext == "gz" || ext == "Z") {
430 			return true;
431 		}
432 		return false;
433 	}
434 
435 	EList<string> infiles_; /// filenames for read files
436 	EList<string> qinfiles_; /// filenames for quality files
437 	EList<bool> errs_; /// whether we've already printed an error for each file
438 	size_t filecur_;   /// index into infiles_ of next file to read
439 	FILE *fp_; /// read file currently being read from
440 	FILE *qfp_; /// quality file currently being read from
441     gzFile zfp_;
442 	bool is_open_; /// whether fp_ is currently open
443 	bool first_;
444 	char buf_[64*1024]; /// file buffer for sequences
445 	char qbuf_[64*1024]; /// file buffer for qualities
446     bool compressed_;
447 
448 private:
449 
450 	pair<bool, int> nextBatchImpl(
451 		PerThreadReadBuf& pt,
452 		bool batch_a);
453 
454 };
455 
456 /**
457  * Synchronized concrete pattern source for a list of FASTA
458  */
459 class FastaPatternSource : public CFilePatternSource {
460 
461 public:
462 
463 	FastaPatternSource(
464 		const EList<string>& infiles,
465 		const EList<string>* qinfiles,
466 		int trim3 = 0,
467 		int trim5 = 0) :
CFilePatternSource(infiles,qinfiles,trim3,trim5)468 		CFilePatternSource(
469 			infiles,
470 			qinfiles,
471 			trim3,
472 			trim5),
473 		first_(true) { }
474 
475 	/**
476 	 * Reset so that next call to nextBatch* gets the first batch.
477 	 * Should only be called by the master thread.
478 	 */
reset()479 	virtual void reset() {
480 		first_ = true;
481 		CFilePatternSource::reset();
482 	}
483 
484 	/**
485 	 * Finalize FASTA parsing outside critical section.
486 	 */
487 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
488 
489 protected:
490 
491 	/**
492 	 * Light-parse a FASTA batch into the given buffer.
493 	 */
494 	virtual std::pair<bool, int> nextBatchFromFile(
495 		PerThreadReadBuf& pt,
496 		bool batch_a,
497 		size_t read_idx);
498 
499 	/**
500 	 * Reset state to handle a fresh file
501 	 */
resetForNextFile()502 	virtual void resetForNextFile() {
503 		first_ = true;
504 	}
505 
dump(ostream & out,const BTDnaString & seq,const BTString & qual,const BTString & name)506 	virtual void dump(ostream& out,
507 	                  const BTDnaString& seq,
508 	                  const BTString& qual,
509 	                  const BTString& name)
510 	{
511 		out << ">" << name << endl << seq << endl;
512 	}
513 
514 private:
515 
516 	bool first_;
517 };
518 
519 
520 /**
521  * Tokenize a line of space-separated integer quality values.
522  */
tokenizeQualLine(FileBuf & filebuf,char * buf,size_t buflen,EList<string> & toks)523 static inline bool tokenizeQualLine(FileBuf& filebuf, char *buf, size_t buflen, EList<string>& toks) {
524 	size_t rd = filebuf.gets(buf, buflen);
525 	if(rd == 0) return false;
526 	assert(NULL == strrchr(buf, '\n'));
527 	tokenize(string(buf), " ", toks);
528 	return true;
529 }
530 
531 /**
532  * Synchronized concrete pattern source for a list of files with tab-
533  * delimited name, seq, qual fields (or, for paired-end reads,
534  * basename, seq1, qual1, seq2, qual2).
535  */
536 class TabbedPatternSource : public CFilePatternSource {
537 public:
538 	TabbedPatternSource(
539 		const EList<string>& infiles,
540 		bool secondName,  // whether it's --12/--tab5 or --tab6
541 		int trim3 = 0,
542 		int trim5 = 0,
543 		bool solQuals = false,
544 		bool phred64Quals = false,
545 		bool intQuals = false) :
CFilePatternSource(infiles,NULL,trim3,trim5)546 		CFilePatternSource(
547 			infiles,
548 			NULL,
549 			trim3,
550 			trim5),
551 		solQuals_(solQuals),
552 		phred64Quals_(phred64Quals),
553 		intQuals_(intQuals) { }
554 
555 	/**
556 	 * Finalize tabbed parsing outside critical section.
557 	 */
558 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
559 
560 protected:
561 
562 	/**
563 	 * Light-parse a batch of tabbed-format reads into given buffer.
564 	 */
565 	virtual std::pair<bool, int> nextBatchFromFile(
566 		PerThreadReadBuf& pt,
567 		bool batch_a,
568 		size_t read_idx);
569 
570 	/**
571 	 * Dump a FASTQ-style record for the read.
572 	 */
dump(ostream & out,const BTDnaString & seq,const BTString & qual,const BTString & name)573 	virtual void dump(ostream& out,
574 	                  const BTDnaString& seq,
575 	                  const BTString& qual,
576 	                  const BTString& name)
577 	{
578 		out << "@" << name << endl << seq << endl
579 		    << "+" << endl << qual << endl;
580 	}
581 
582 protected:
583 
584 	bool solQuals_;     // base qualities are log odds
585 	bool phred64Quals_; // base qualities are on -64 scale
586 	bool intQuals_;     // base qualities are space-separated strings
587 	bool secondName_;   // true if --tab6, false if --tab5
588 };
589 
590 /**
591  * Synchronized concrete pattern source for a list of FASTA files where
592  * reads need to be extracted from long continuous sequences.
593  */
594 class FastaContinuousPatternSource : public CFilePatternSource {
595 public:
FastaContinuousPatternSource(const EList<string> & infiles,size_t length,size_t freq)596 	FastaContinuousPatternSource(
597 			const EList<string>& infiles,
598 			size_t length,
599 			size_t freq) :
600 		CFilePatternSource(
601 			infiles,
602 			NULL,
603 			0,
604 			0),
605 		length_(length),
606 		freq_(freq),
607 		eat_(length_-1),
608 		beginning_(true),
609 		bufCur_(0),
610 		cur_(0llu),
611 		last_(0llu)
612 		{
613 			assert_gt(freq_, 0);
614 			resetForNextFile();
615 			assert_lt(length_, 1024);
616 		 }
617 
reset()618 	virtual void reset() {
619 		CFilePatternSource::reset();
620 		resetForNextFile();
621 	}
622 
623 	/**
624 	 * Finalize FASTA parsing outside critical section.
625 	 */
626 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
627 
628 protected:
629 
630 	/**
631 	 * Light-parse a batch into the given buffer.
632 	 */
633 	virtual std::pair<bool, int> nextBatchFromFile(
634 		PerThreadReadBuf& pt,
635 		bool batch_a,
636 		size_t read_idx);
637 
638 	/**
639 	 * Reset state to be read for the next file.
640 	 */
resetForNextFile()641 	virtual void resetForNextFile() {
642 		eat_ = length_-1;
643 		name_prefix_buf_.clear();
644 		beginning_ = true;
645 		bufCur_ = 0;
646 		last_ = cur_;
647 	}
648 
649 private:
650 	const size_t length_; /// length of reads to generate
651 	const size_t freq_;   /// frequency to sample reads
652 	size_t eat_;        /// number of characters we need to skip before
653 	                    /// we have flushed all of the ambiguous or
654 	                    /// non-existent characters out of our read
655 	                    /// window
656 	bool beginning_;    /// skipping over the first read length?
657 	char buf_[1024];    /// read buffer
658 	std::string name_prefix_buf_; /// FASTA sequence name buffer
659 	char name_int_buf_[20]; /// for composing offsets for names
660 	size_t bufCur_;     /// buffer cursor; points to where we should
661 	                    /// insert the next character
662 	uint64_t cur_;
663 	uint64_t last_;/// number to subtract from readCnt_ to get
664 	                    /// the pat id to output (so it resets to 0 for
665 	                    /// each new sequence)
666 };
667 
668 /**
669  * Read a FASTQ-format file.
670  * See: http://maq.sourceforge.net/fastq.shtml
671  */
672 class FastqPatternSource : public CFilePatternSource {
673 public:
674 	FastqPatternSource(
675 		const EList<string>& infiles,
676 		int trim3 = 0,
677 		int trim5 = 0,
678 		bool solexa_quals = false,
679 		bool phred64Quals = false,
680 		bool integer_quals = false,
681 		bool interleaved = false,
682 		uint32_t skip = 0) :
CFilePatternSource(infiles,NULL,trim3,trim5)683 		CFilePatternSource(
684 			infiles,
685 			NULL,
686 			trim3,
687 			trim5),
688 		first_(true),
689 		solQuals_(solexa_quals),
690 		phred64Quals_(phred64Quals),
691 		intQuals_(integer_quals),
692 		interleaved_(interleaved) { }
693 
reset()694 	virtual void reset() {
695 		first_ = true;
696 		CFilePatternSource::reset();
697 	}
698 
699 	/**
700 	 * Finalize FASTQ parsing outside critical section.
701 	 */
702 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
703 
704 protected:
705 
706 	/**
707 	 * "Light" parser.  This is inside the critical section, so the key is to do
708 	 * just enough parsing so that another function downstream (finalize()) can do
709 	 * the rest of the parsing.  Really this function's only job is to stick every
710 	 * for lines worth of the input file into a buffer (r.readOrigBuf).  finalize()
711 	 * then parses the contents of r.readOrigBuf later.
712 	 */
713 	virtual pair<bool, int> nextBatchFromFile(
714 		PerThreadReadBuf& pt,
715 		bool batch_a,
716 		size_t read_idx);
717 
resetForNextFile()718 	virtual void resetForNextFile() {
719 		first_ = true;
720 	}
721 
dump(ostream & out,const BTDnaString & seq,const BTString & qual,const BTString & name)722 	virtual void dump(ostream& out,
723 	                  const BTDnaString& seq,
724 	                  const BTString& qual,
725 	                  const BTString& name)
726 	{
727 		out << "@" << name << endl << seq << endl << "+" << endl << qual << endl;
728 	}
729 
730 private:
731 
732 	bool first_;
733 	bool solQuals_;
734 	bool phred64Quals_;
735 	bool intQuals_;
736 	bool interleaved_;
737 };
738 
739 /**
740  * Read a Raw-format file (one sequence per line).  No quality strings
741  * allowed.  All qualities are assumed to be 'I' (40 on the Phred-33
742  * scale).
743  */
744 class RawPatternSource : public CFilePatternSource {
745 
746 public:
747 
748 	RawPatternSource(
749 		const EList<string>& infiles,
750 		int trim3 = 0,
751 		int trim5 = 0) :
CFilePatternSource(infiles,NULL,trim3,trim5)752 		CFilePatternSource(
753 			infiles,
754 			NULL,
755 			trim3,
756 			trim5),
757 		first_(true) { }
758 
reset()759 	virtual void reset() {
760 		first_ = true;
761 		CFilePatternSource::reset();
762 	}
763 
764 	/**
765 	 * Finalize raw parsing outside critical section.
766 	 */
767 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
768 
769 protected:
770 
771 	/**
772 	 * Light-parse a batch into the given buffer.
773 	 */
774 	virtual std::pair<bool, int> nextBatchFromFile(
775 		PerThreadReadBuf& pt,
776 		bool batch_a,
777 		size_t read_idx);
778 
resetForNextFile()779 	virtual void resetForNextFile() {
780 		first_ = true;
781 	}
782 
dump(ostream & out,const BTDnaString & seq,const BTString & qual,const BTString & name)783 	virtual void dump(ostream& out,
784 	                  const BTDnaString& seq,
785 	                  const BTString& qual,
786 	                  const BTString& name)
787 	{
788 		out << seq << endl;
789 	}
790 
791 
792 private:
793 
794 	bool first_;
795 };
796 
797 /**
798  * Abstract parent class for synhconized sources of paired-end reads
799  * (and possibly also single-end reads).
800  */
801 class PatternComposer {
802 public:
PatternComposer()803 	PatternComposer() { }
804 
~PatternComposer()805 	virtual ~PatternComposer() { }
806 
807 	virtual void reset() = 0;
808 
809 	/**
810 	 * Member function override by concrete, format-specific classes.
811 	 */
812 	virtual std::pair<bool, int> nextBatch(PerThreadReadBuf& pt) = 0;
813 
814 	/**
815 	 * Make appropriate call into the format layer to parse individual read.
816 	 */
817 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) = 0;
818 
free_pmembers(const EList<PatternSource * > & elist)819 	virtual void free_pmembers( const EList<PatternSource*> &elist) {
820     		for (size_t i = 0; i < elist.size(); i++) {
821         		if (elist[i] != NULL)
822             			delete elist[i];
823 		}
824 	}
825 
826 protected:
827 
828 	MUTEX_T mutex_m; /// mutex for locking critical regions
829 };
830 
831 /**
832  * Encapsulates a synchronized source of both paired-end reads and
833  * unpaired reads, where the paired-end must come from parallel files.
834  */
835 class SoloPatternComposer : public PatternComposer {
836 
837 public:
838 
SoloPatternComposer(const EList<PatternSource * > & src)839 	SoloPatternComposer(const EList<PatternSource*>& src) :
840 		PatternComposer(),
841 		cur_(0),
842 		src_(src)
843 	{
844 	    for(size_t i = 0; i < src_.size(); i++) {
845 	    	assert(src_[i] != NULL);
846 	    }
847 	}
848 
849 	/**
850 	 * Reset this object and all the PatternSources under it so that
851 	 * the next call to nextReadPair gets the very first read pair.
852 	 */
reset()853 	virtual void reset() {
854 		for(size_t i = 0; i < src_.size(); i++) {
855 			src_[i]->reset();
856 		}
857 		cur_ = 0;
858 	}
859 
860 	/**
861 	 * The main member function for dispensing pairs of reads or
862 	 * singleton reads.  Returns true iff ra and rb contain a new
863 	 * pair; returns false if ra contains a new unpaired read.
864 	 */
865 	pair<bool, int> nextBatch(PerThreadReadBuf& pt);
866 
867 	/**
868 	 * Make appropriate call into the format layer to parse individual read.
869 	 */
parse(Read & ra,Read & rb,TReadId rdid)870 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) {
871 		return src_[0]->parse(ra, rb, rdid);
872 	}
873 
874 protected:
875 
876 	volatile uint32_t cur_; // current element in parallel srca_, srcb_ vectors
877 	EList<PatternSource*> src_; /// PatternSources for paired-end reads
878 };
879 
880 /**
881  * Encapsulates a synchronized source of both paired-end reads and
882  * unpaired reads, where the paired-end must come from parallel files.
883  */
884 class DualPatternComposer : public PatternComposer {
885 
886 public:
887 
DualPatternComposer(const EList<PatternSource * > & srca,const EList<PatternSource * > & srcb)888 	DualPatternComposer(const EList<PatternSource*>& srca,
889 	                        const EList<PatternSource*>& srcb) :
890 		PatternComposer(),
891 		cur_(0),
892 		srca_(srca),
893 		srcb_(srcb)
894 	{
895 		// srca_ and srcb_ must be parallel
896 		assert_eq(srca_.size(), srcb_.size());
897 		for(size_t i = 0; i < srca_.size(); i++) {
898 			// Can't have NULL first-mate sources.  Second-mate sources
899 			// can be NULL, in the case when the corresponding first-
900 			// mate source is unpaired.
901 			assert(srca_[i] != NULL);
902 			for(size_t j = 0; j < srcb_.size(); j++) {
903 				assert_neq(srca_[i], srcb_[j]);
904 			}
905 		}
906 	}
907 
908 	/**
909 	 * Reset this object and all the PatternSources under it so that
910 	 * the next call to nextReadPair gets the very first read pair.
911 	 */
reset()912 	virtual void reset() {
913 		for(size_t i = 0; i < srca_.size(); i++) {
914 			srca_[i]->reset();
915 			if(srcb_[i] != NULL) {
916 				srcb_[i]->reset();
917 			}
918 		}
919 		cur_ = 0;
920 	}
921 
922 	/**
923 	 * The main member function for dispensing pairs of reads or
924 	 * singleton reads.  Returns true iff ra and rb contain a new
925 	 * pair; returns false if ra contains a new unpaired read.
926 	 */
927 	pair<bool, int> nextBatch(PerThreadReadBuf& pt);
928 
929 	/**
930 	 * Make appropriate call into the format layer to parse individual read.
931 	 */
parse(Read & ra,Read & rb,TReadId rdid)932 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) {
933 		return srca_[0]->parse(ra, rb, rdid);
934 	}
935 
936 
937 protected:
938 
939 	volatile uint32_t cur_; // current element in parallel srca_, srcb_ vectors
940 	EList<PatternSource*> srca_; /// PatternSources for 1st mates and/or unpaired reads
941 	EList<PatternSource*> srcb_; /// PatternSources for 2nd mates
942 };
943 
944 /**
945  * Encapsulates a single thread's interaction with the PatternSource.
946  * Most notably, this class holds the buffers into which the
947  * PatterSource will write sequences.  This class is *not* threadsafe
948  * - it doesn't need to be since there's one per thread.  PatternSource
949  * is thread-safe.
950  */
951 class PatternSourcePerThread {
952 
953 public:
954 
PatternSourcePerThread(PatternComposer & composer,uint32_t max_buf,uint32_t skip,uint32_t seed)955 	PatternSourcePerThread(
956 		PatternComposer& composer,
957 		uint32_t max_buf,
958 		uint32_t skip,
959 		uint32_t seed) :
960 		composer_(composer),
961 		buf_(max_buf),
962 		last_batch_(false),
963 		last_batch_size_(0),
964 		skip_(skip),
965 		seed_(seed),
966 		batch_id_(0) { }
967 
968 	/**
969 	 * Get the next paired or unpaired read from the wrapped
970 	 * PatternComposer.  Returns a pair of bools; first indicates
971 	 * whether we were successful, second indicates whether we're
972 	 * done.
973 	 */
974 	std::pair<bool, bool> nextReadPair();
975 
bufa()976 	Read& bufa() { return buf_.read_a(); }
bufb()977 	Read& bufb() { return buf_.read_b(); }
978 
bufa()979 	const Read& bufa() const { return buf_.read_a(); }
bufb()980 	const Read& bufb() const { return buf_.read_b(); }
981 
rdid()982 	TReadId rdid() const { return buf_.rdid(); }
983 
batch_id()984 	size_t batch_id() const { return batch_id_; }
985 
986 	/**
987 	 * Return true iff the read currently in the buffer is a
988 	 * paired-end read.
989 	 */
paired()990 	bool paired() const {
991 		// can also do buf_.read_b().mate > 0, but the mate
992 		// field isn't set until finalize is called, whereas
993 		// parsed is set by the time parse() is finished.
994 		return buf_.read_b().parsed;
995 	}
996 
997 private:
998 
999 	/**
1000 	 * When we've finished fully parsing and dishing out reads in
1001 	 * the current batch, we go get the next one by calling into
1002 	 * the composition layer.
1003 	 */
nextBatch()1004 	std::pair<bool, int> nextBatch() {
1005 		buf_.reset();
1006 		std::pair<bool, int> res = composer_.nextBatch(buf_);
1007 		buf_.init();
1008 		batch_id_ = (size_t)(buf_.rdid()/buf_.max_buf_);
1009 		return res;
1010 	}
1011 
1012 	/**
1013 	 * Once name/sequence/qualities have been parsed for an
1014 	 * unpaired read, set all the other key fields of the Read
1015 	 * struct.
1016 	 */
1017 	void finalize(Read& ra);
1018 
1019 	/**
1020 	 * Once name/sequence/qualities have been parsed for a
1021 	 * paired-end read, set all the other key fields of the Read
1022 	 * structs.
1023 	 */
1024 	void finalizePair(Read& ra, Read& rb);
1025 
1026 	/**
1027 	 * Call into composition layer (which in turn calls into
1028 	 * format layer) to parse the read.
1029 	 */
parse(Read & ra,Read & rb)1030 	bool parse(Read& ra, Read& rb) {
1031 		return composer_.parse(ra, rb, buf_.rdid());
1032 	}
1033 
1034 	PatternComposer& composer_; // pattern composer
1035 	PerThreadReadBuf buf_;    // read data buffer
1036 	bool last_batch_;         // true if this is final batch
1037 	size_t last_batch_size_;  // # reads read in previous batch
1038 	uint32_t skip_;           // skip reads with rdids less than this
1039 	uint32_t seed_;           // pseudo-random seed based on read content
1040 	size_t batch_id_;	  // identify batches of reads for reordering
1041 };
1042 
1043 /**
1044  * Abstract parent factory for PatternSourcePerThreads.
1045  */
1046 class PatternSourcePerThreadFactory {
1047 public:
PatternSourcePerThreadFactory(PatternComposer & composer,uint32_t max_buf,uint32_t skip,uint32_t seed)1048 	PatternSourcePerThreadFactory(
1049 		PatternComposer& composer,
1050 		uint32_t max_buf,
1051 		uint32_t skip,
1052 		uint32_t seed):
1053 		composer_(composer),
1054 		max_buf_(max_buf),
1055 		skip_(skip),
1056 		seed_(seed) {}
1057 
1058 	/**
1059 	 * Create a new heap-allocated PatternSourcePerThreads.
1060 	 */
create()1061 	virtual PatternSourcePerThread* create() const {
1062 		return new PatternSourcePerThread(composer_, max_buf_, skip_, seed_);
1063 	}
1064 
1065 	/**
1066 	 * Create a new heap-allocated vector of heap-allocated
1067 	 * PatternSourcePerThreads.
1068 	 */
create(uint32_t n)1069 	virtual EList<PatternSourcePerThread*>* create(uint32_t n) const {
1070 		EList<PatternSourcePerThread*>* v = new EList<PatternSourcePerThread*>;
1071 		for(size_t i = 0; i < n; i++) {
1072 			v->push_back(new PatternSourcePerThread(composer_, max_buf_, skip_, seed_));
1073 			assert(v->back() != NULL);
1074 		}
1075 		return v;
1076 	}
1077 
1078 	/// Free memory associated with a pattern source
destroy(PatternSourcePerThread * composer)1079 	virtual void destroy(PatternSourcePerThread* composer) const {
1080 		assert(composer != NULL);
1081 		// Free the PatternSourcePerThread
1082 		delete composer;
1083 	}
1084 
1085 	/// Free memory associated with a pattern source list
destroy(EList<PatternSourcePerThread * > * composers)1086 	virtual void destroy(EList<PatternSourcePerThread*>* composers) const {
1087 		assert(composers != NULL);
1088 		// Free all of the PatternSourcePerThreads
1089 		for(size_t i = 0; i < composers->size(); i++) {
1090 			if((*composers)[i] != NULL) {
1091 				delete (*composers)[i];
1092 				(*composers)[i] = NULL;
1093 			}
1094 		}
1095 		// Free the vector
1096 		delete composers;
1097 	}
1098 
~PatternSourcePerThreadFactory()1099 	virtual ~PatternSourcePerThreadFactory() {}
1100 
1101 private:
1102 	/// Container for obtaining paired reads from PatternSources
1103 	PatternComposer& composer_;
1104 	/// Maximum size of batch to read in
1105 	uint32_t max_buf_;
1106 	uint32_t skip_;
1107 	uint32_t seed_;
1108 };
1109 
1110 #endif /*PAT_H_*/
1111