1 /*
2  * Copyright 2011, Ben Langmead <langmea@cs.jhu.edu>
3  *
4  * This file is part of Bowtie 2.
5  *
6  * Bowtie 2 is free software: you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation, either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * Bowtie 2 is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with Bowtie 2.  If not, see <http://www.gnu.org/licenses/>.
18  */
19 
20 #ifndef PAT_H_
21 #define PAT_H_
22 
23 #include <stdio.h>
24 #include <stdint.h>
25 #include <unistd.h>
26 #include <sys/stat.h>
27 #include <zlib.h>
28 #include <cassert>
29 #include <string>
30 #include <ctype.h>
31 #include <vector>
32 #include "alphabet.h"
33 #include "assert_helpers.h"
34 #include "random_source.h"
35 #include "threading.h"
36 #include "qual.h"
37 #include "search_globals.h"
38 #include "sstring.h"
39 #include "ds.h"
40 #include "read.h"
41 #include "util.h"
42 #ifdef WITH_ZSTD
43 #include "zstd_decompress.h"
44 #endif
45 
46 #ifdef USE_SRA
47 #include <ncbi-vdb/NGS.hpp>
48 #include <ngs/ErrorMsg.hpp>
49 #include <ngs/ReadCollection.hpp>
50 #include <ngs/ReadIterator.hpp>
51 #include <ngs/Read.hpp>
52 #endif
53 
54 #ifdef _WIN32
55 #define getc_unlocked _fgetc_nolock
56 #endif
57 
58 /**
59  * Classes and routines for reading reads from various input sources.
60  */
61 
62 /**
63  * Parameters affecting how reads and read in.
64  */
65 struct PatternParams {
PatternParamsPatternParams66 	PatternParams() { }
67 
PatternParamsPatternParams68 	PatternParams(
69 		int format_,
70 		bool interleaved_,
71 		bool fileParallel_,
72 		uint32_t seed_,
73 		size_t max_buf_,
74 		bool solexa64_,
75 		bool phred64_,
76 		bool intQuals_,
77 		int trim5_,
78 		int trim3_,
79 		pair<short, size_t> trimTo_,
80 		int sampleLen_,
81 		int sampleFreq_,
82 		size_t skip_,
83 		uint64_t upto_,
84 		int nthreads_,
85 		bool fixName_,
86 		bool preserve_tags_,
87 		bool align_paired_reads_) :
88 		format(format_),
89 		interleaved(interleaved_),
90 		fileParallel(fileParallel_),
91 		seed(seed_),
92 		max_buf(max_buf_),
93 		solexa64(solexa64_),
94 		phred64(phred64_),
95 		intQuals(intQuals_),
96 		trim5(trim5_),
97 		trim3(trim3_),
98 		trimTo(trimTo_),
99 		sampleLen(sampleLen_),
100 		sampleFreq(sampleFreq_),
101 		skip(skip_),
102 		upto(upto_),
103 		nthreads(nthreads_),
104 		fixName(fixName_),
105 		preserve_tags(preserve_tags_),
106 		align_paired_reads(align_paired_reads_) { }
107 
108 	int format;			  // file format
109 	bool interleaved;	  // some or all of the FASTQ/FASTA reads are interleaved
110 	bool fileParallel;	  // true -> wrap files with separate PatternComposers
111 	uint32_t seed;		  // pseudo-random seed
112 	size_t max_buf;		  // number of reads to buffer in one read
113 	bool solexa64;		  // true -> qualities are on solexa64 scale
114 	bool phred64;		  // true -> qualities are on phred64 scale
115 	bool intQuals;		  // true -> qualities are space-separated numbers
116 	int trim5;                // amt to hard clip from 5' end
117 	int trim3;                // amt to hard clip from 3' end
118 	pair<short, size_t> trimTo;
119 	int sampleLen;		  // length of sampled reads for FastaContinuous...
120 	int sampleFreq;		  // frequency of sampled reads for FastaContinuous...
121 	size_t skip;		  // skip the first 'skip' patterns
122 	uint64_t upto;		  // max number of queries to read
123 	int nthreads;		  // number of threads for locking
124 	bool fixName;		  //
125 	bool preserve_tags;       // keep existing tags when aligning BAM files
126 	bool align_paired_reads;
127 };
128 
129 /**
130  * All per-thread storage for input read data.
131  */
132 struct PerThreadReadBuf {
133 
PerThreadReadBufPerThreadReadBuf134 	PerThreadReadBuf(size_t max_buf, int tid) :
135 		max_buf_(max_buf),
136 		bufa_(max_buf),
137 		bufb_(max_buf),
138 		rdid_(),
139 		tid_(tid)
140 	{
141 		bufa_.resize(max_buf);
142 		bufb_.resize(max_buf);
143 		reset();
144 	}
145 
read_aPerThreadReadBuf146 	Read& read_a() { return bufa_[cur_buf_]; }
read_bPerThreadReadBuf147 	Read& read_b() { return bufb_[cur_buf_]; }
148 
read_aPerThreadReadBuf149 	const Read& read_a() const { return bufa_[cur_buf_]; }
read_bPerThreadReadBuf150 	const Read& read_b() const { return bufb_[cur_buf_]; }
151 
152 	/**
153 	 * Return read id for read/pair currently in the buffer.
154 	 */
rdidPerThreadReadBuf155 	TReadId rdid() const {
156 		assert_neq(rdid_, std::numeric_limits<TReadId>::max());
157 		return rdid_ + cur_buf_;
158 	}
159 
160 	/**
161 	 * Reset state as though no reads have been read.
162 	 */
resetPerThreadReadBuf163 	void reset() {
164 		cur_buf_ = bufa_.size();
165 		for(size_t i = 0; i < max_buf_; i++) {
166 			bufa_[i].reset();
167 			bufb_[i].reset();
168 		}
169 		rdid_ = std::numeric_limits<TReadId>::max();
170 	}
171 
172 	/**
173 	 * Advance cursor to next element
174 	 */
nextPerThreadReadBuf175 	void next() {
176 		assert_lt(cur_buf_, bufa_.size());
177 		cur_buf_++;
178 	}
179 
180 	/**
181 	 * Return true when there's nothing left for next().
182 	 */
exhaustedPerThreadReadBuf183 	bool exhausted() {
184 		assert_leq(cur_buf_, bufa_.size());
185 		return cur_buf_ >= bufa_.size()-1 || bufa_[cur_buf_+1].readOrigBuf.empty();
186 	}
187 
188 	/**
189 	 * Just after a new batch has been loaded, use init to
190 	 * set cur_buf_ appropriately.
191 	 */
initPerThreadReadBuf192 	void init() {
193 		cur_buf_ = 0;
194 	}
195 
196 	/**
197 	 * Set read id of first read in buffer.
198 	 */
setReadIdPerThreadReadBuf199 	void setReadId(TReadId rdid) {
200 		rdid_ = rdid;
201 	}
202 
203 	const size_t max_buf_; // max # reads to read into buffer at once
204 	EList<Read> bufa_;	   // Read buffer for mate as
205 	EList<Read> bufb_;	   // Read buffer for mate bs
206 	size_t cur_buf_;	   // Read buffer currently active
207 	TReadId rdid_;		   // index of read at offset 0 of bufa_/bufb_
208 	int tid_;
209 };
210 
211 extern void wrongQualityFormat(const BTString& read_name);
212 extern void tooFewQualities(const BTString& read_name);
213 extern void tooManyQualities(const BTString& read_name);
214 
215 /**
216  * Encapsulates a synchronized source of patterns; usually a file.
217  * Optionally reverses reads and quality strings before returning them,
218  * though that is usually more efficiently done by the concrete
219  * subclass.  Concrete subclasses should delimit critical sections with
220  * calls to lock() and unlock().
221  */
222 class PatternSource {
223 
224 public:
225 
PatternSource(const PatternParams & p)226 	PatternSource(const PatternParams& p) :
227 		pp_(p),
228 		readCnt_(0),
229 		mutex() { }
230 
~PatternSource()231 	virtual ~PatternSource() { }
232 
233 	/**
234 	 * Implementation to be provided by concrete subclasses.  An
235 	 * implementation for this member is only relevant for formats
236 	 * where individual input sources look like single-end-read
237 	 * sources, e.g., formats where paired-end reads are specified in
238 	 * parallel read files.
239 	 */
240 	virtual std::pair<bool, int> nextBatch(
241 		PerThreadReadBuf& pt,
242 		bool batch_a,
243 		bool lock = true) = 0;
244 
245 	/**
246 	 * Finishes parsing a given read.  Happens outside the critical section.
247 	 */
248 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) const = 0;
249 
250 	/**
251 	 * Reset so that next call to nextBatch* gets the first batch.
252 	 */
reset()253 	virtual void reset() { readCnt_ = 0; }
254 
255 	/**
256 	 * Return a new dynamically allocated PatternSource for the given
257 	 * format, using the given list of strings as the filenames to read
258 	 * from or as the sequences themselves (i.e. if -c was used).
259 	 */
260 	static PatternSource* patsrcFromStrings(
261 		const PatternParams& p,
262 		const EList<std::string>& qs);
263 
264 	/**
265 	 * Return number of reads light-parsed by this stream so far.
266 	 */
readCount()267 	TReadId readCount() const { return readCnt_; }
268 
269 protected:
270 
271 
272 	// Reference to global input-parsing parameters
273 	const PatternParams& pp_;
274 
275 	// The number of reads read by this PatternSource
276 	volatile TReadId readCnt_;
277 
278 	// Lock enforcing mutual exclusion for (a) file I/O, (b) writing fields
279 	// of this or another other shared object.
280 	MUTEX_T mutex;
281 };
282 
283 /**
284  * Encapsulates a source of patterns which is an in-memory vector.
285  */
286 class VectorPatternSource : public PatternSource {
287 
288 public:
289 
290 	/**
291 	 * Populate member lists, v_, quals_, names_, etc, with information parsed
292 	 * from the given list of strings.
293 	 */
294 	VectorPatternSource(
295 		const EList<std::string>& v,
296 		const PatternParams& p);
297 
~VectorPatternSource()298 	virtual ~VectorPatternSource() { }
299 
300 	/**
301 	 * Read next batch.  However, batch concept is not very applicable for this
302 	 * PatternSource where all the info has already been parsed into the fields
303 	 * in the contsructor.	This essentially modifies the pt as though we read
304 	 * in some number of patterns.
305 	 */
306 	virtual std::pair<bool, int> nextBatch(
307 		PerThreadReadBuf& pt,
308 		bool batch_a,
309 		bool lock = true);
310 
311 	/**
312 	 * Reset so that next call to nextBatch* gets the first batch.
313 	 */
reset()314 	virtual void reset() {
315 		PatternSource::reset();
316 		cur_ = skip_;
317 		paired_ = false;
318 	}
319 
320 	/**
321 	 * Finishes parsing outside the critical section
322 	 */
323 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
324 
325 private:
326 
327 	pair<bool, int> nextBatchImpl(
328 		PerThreadReadBuf& pt,
329 		bool batch_a);
330 
331 	size_t cur_;			   // index for first read of next batch
332 	size_t skip_;			   // # reads to skip
333 	bool paired_;			   // whether reads are paired
334 	EList<string> tokbuf_;	   // buffer for storing parsed tokens
335 	EList<Read::TBuf> bufs_;   // per-read buffers
336 	char nametmp_[20];		   // temp buffer for constructing name
337 };
338 
339 /**
340  * Parent class for PatternSources that read from a file.
341  * Uses unlocked C I/O, on the assumption that all reading
342  * from the file will take place in an otherwise-protected
343  * critical section.
344  */
345 class CFilePatternSource : public PatternSource {
346 	enum CompressionType {
347 		NONE,
348 		GZIP,
349 #ifdef WITH_ZSTD
350 		ZSTD,
351 #endif
352 	};
353 public:
CFilePatternSource(const EList<std::string> & infiles,const PatternParams & p)354 	CFilePatternSource(
355 		const EList<std::string>& infiles,
356 		const PatternParams& p) :
357 		PatternSource(p),
358 		infiles_(infiles),
359 		filecur_(0),
360 		fp_(NULL),
361 		zfp_(NULL),
362 #ifdef WITH_ZSTD
363 		zstdfp_(NULL),
364 #endif
365 		is_open_(false),
366 		skip_(p.skip),
367 		first_(true),
368 		compressionType_(CompressionType::NONE)
369 	{
370 		assert_gt(infiles.size(), 0);
371 		errs_.resize(infiles_.size());
372 		errs_.fill(0, infiles_.size(), false);
373 		open(); // open first file in the list
374 		filecur_++;
375 	}
376 
377 	/**
378 	 * Close open file.
379 	 */
~CFilePatternSource()380 	virtual ~CFilePatternSource() {
381 		if(is_open_) {
382 			switch (compressionType_) {
383 			case CompressionType::NONE:
384 				assert(fp_ != NULL);
385 				fclose(fp_);
386 				break;
387 			case CompressionType::GZIP:
388 				assert(zfp_ != NULL);
389 				gzclose(zfp_);
390 				break;
391 #ifdef WITH_ZSTD
392 			case CompressionType::ZSTD:
393 				assert(zstdfp_ != NULL);
394 				zstdClose(zstdfp_);
395 				break;
396 #endif
397 			}
398 		}
399 	}
400 
401 	/**
402 	 * Fill Read with the sequence, quality and name for the next
403 	 * read in the list of read files.	This function gets called by
404 	 * all the search threads, so we must handle synchronization.
405 	 *
406 	 * Returns pair<bool, int> where bool indicates whether we're
407 	 * completely done, and int indicates how many reads were read.
408 	 */
409 	virtual std::pair<bool, int> nextBatch(
410 		PerThreadReadBuf& pt,
411 		bool batch_a,
412 		bool lock = true);
413 
414 	/**
415 	 * Reset so that next call to nextBatch* gets the first batch.
416 	 * Should only be called by the master thread.
417 	 */
reset()418 	virtual void reset() {
419 		PatternSource::reset();
420 		filecur_ = 0,
421 		open();
422 		filecur_++;
423 	}
424 
425 protected:
426 
427 	/**
428 	 * Light-parse a batch of unpaired reads from current file into the given
429 	 * buffer.	Called from CFilePatternSource.nextBatch().
430 	 */
431 	virtual std::pair<bool, int> nextBatchFromFile(
432 		PerThreadReadBuf& pt,
433 		bool batch_a,
434 		unsigned read_idx) = 0;
435 
436 	/**
437 	 * Reset state to handle a fresh file
438 	 */
resetForNextFile()439 	virtual void resetForNextFile() { }
440 
441 	/**
442 	 * Open the next file in the list of input files.
443 	 */
444 	void open();
445 
getc_wrapper()446 	int getc_wrapper() {
447 		int c;
448 
449 		do {
450 			if (compressionType_ == CompressionType::GZIP)
451 				c = gzgetc(zfp_);
452 #ifdef WITH_ZSTD
453 			else if (compressionType_ == CompressionType::ZSTD)
454 				c = zstdGetc(zstdfp_);
455 #endif
456 			else
457 				c = getc_unlocked(fp_);
458 		} while (c != EOF && c != '\t' && c != '\r' && c != '\n' && !isprint(c));
459 
460 		return c;
461 	}
462 
ungetc_wrapper(int c)463 	int ungetc_wrapper(int c) {
464 		if (compressionType_ == CompressionType::GZIP)
465 			return gzungetc(c, zfp_);
466 #ifdef WITH_ZSTD
467 		else if (compressionType_ == CompressionType::ZSTD)
468 			return zstdUngetc(c, zstdfp_);
469 #endif
470 		else
471 			return ungetc(c, fp_);
472 	}
473 
zread(voidp buf,unsigned len)474 	int zread(voidp buf, unsigned len) {
475 		int r = gzread(zfp_, buf, len);
476 		if (r < 0) {
477 			const char *err = gzerror(zfp_, NULL);
478 			if (err != NULL) {
479 				std::cerr << err << std::endl;
480 			}
481 		}
482 		return r;
483 	}
484 
is_gzipped_file(int fd)485 	bool is_gzipped_file(int fd) {
486 		if (fd == -1) {
487 			return false;
488 		}
489 
490 		uint8_t byte1, byte2;
491 
492 		ssize_t r1 = read(fd, &byte1, sizeof(uint8_t));
493 		ssize_t r2 = read(fd, &byte2, sizeof(uint8_t));
494 
495 		lseek(fd, 0, SEEK_SET);
496                 if (r1 == 0 || r2 == 0) {
497                         std::cerr << "Unable to read file magic number" << std::endl;
498                         return false;
499                 }
500 
501 		if (byte1 == 0x1f && byte2 == 0x8b) {
502 			return true;
503 		}
504 
505 		return false;
506 	}
507 
508 #ifdef WITH_ZSTD
is_zstd_file(int fd)509 	bool is_zstd_file(int fd) {
510 		if (fd == -1)
511 			return false;
512 
513 		int magic;
514 
515                 if (read(fd, &magic, sizeof(int)) != sizeof(int)) {
516 			std::cerr << "is_zstd_file: unable to read magic number" << std::endl;
517 			return false;
518                 }
519 		lseek(fd, 0, SEEK_SET);
520 
521                 return magic == 0xfd2fb528;
522 	}
523 #endif
524 
525 	EList<std::string> infiles_;	 // filenames for read files
526 	EList<bool> errs_;		 // whether we've already printed an error for each file
527 	size_t filecur_;		 // index into infiles_ of next file to read
528 	FILE *fp_;			 // read file currently being read from
529 	gzFile zfp_;			 // compressed version of fp_
530 #ifdef WITH_ZSTD
531 	zstdStrm *zstdfp_;	         // zstd compressed file
532 #endif
533 	bool is_open_;			 // whether fp_ is currently open
534 	TReadId skip_;			 // number of reads to skip
535 	bool first_;			 // parsing first record in first file?
536 	char buf_[64*1024];		 // file buffer
537 	CompressionType compressionType_;
538 
539 private:
540 
541 	pair<bool, int> nextBatchImpl(
542 		PerThreadReadBuf& pt,
543 		bool batch_a);
544 };
545 
546 /**
547  * Synchronized concrete pattern source for a list of FASTA files.
548  */
549 class FastaPatternSource : public CFilePatternSource {
550 
551 public:
552 
FastaPatternSource(const EList<std::string> & infiles,const PatternParams & p,bool interleaved)553 	FastaPatternSource(
554 		const EList<std::string>& infiles,
555 		const PatternParams& p, bool interleaved) :
556 		CFilePatternSource(infiles, p),
557 		first_(true),
558 		interleaved_(interleaved) { }
559 
560 	/**
561 	 * Reset so that next call to nextBatch* gets the first batch.
562 	 * Should only be called by the master thread.
563 	 */
reset()564 	virtual void reset() {
565 		first_ = true;
566 		CFilePatternSource::reset();
567 	}
568 
569 	/**
570 	 * Finalize FASTA parsing outside critical section.
571 	 */
572 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
573 
574 protected:
575 
576 	/**
577 	 * Light-parse a FASTA batch into the given buffer.
578 	 */
579 	virtual std::pair<bool, int> nextBatchFromFile(
580 		PerThreadReadBuf& pt,
581 		bool batch_a,
582 		unsigned read_idx);
583 
584 	/**
585 	 * Scan to the next FASTA record (starting with >) and return the first
586 	 * character of the record (which will always be >).
587 	 */
skipToNextFastaRecord(FileBuf & in)588 	static int skipToNextFastaRecord(FileBuf& in) {
589 		int c;
590 		while((c = in.get()) != '>') {
591 			if(in.eof()) return -1;
592 		}
593 		return c;
594 	}
595 
596 	/**
597 	 * Reset state to handle a fresh file
598 	 */
resetForNextFile()599 	virtual void resetForNextFile() {
600 		first_ = true;
601 	}
602 
603 	bool first_;
604 	bool interleaved_;
605 };
606 
607 /**
608  * Synchronized concrete pattern source for a list of files with tab-
609  * delimited name, seq, qual fields (or, for paired-end reads,
610  * basename, seq1, qual1, seq2, qual2).
611  */
612 class TabbedPatternSource : public CFilePatternSource {
613 
614 public:
615 
TabbedPatternSource(const EList<std::string> & infiles,const PatternParams & p,bool secondName)616 	TabbedPatternSource(
617 		const EList<std::string>& infiles,
618 		const PatternParams& p,
619 		bool  secondName) :  // whether it's --12/--tab5 or --tab6
620 		CFilePatternSource(infiles, p),
621 		secondName_(secondName) { }
622 
623 	/**
624 	 * Finalize tabbed parsing outside critical section.
625 	 */
626 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
627 
628 protected:
629 
630 	/**
631 	 * Light-parse a batch of tabbed-format reads into given buffer.
632 	 */
633 	virtual std::pair<bool, int> nextBatchFromFile(
634 		PerThreadReadBuf& pt,
635 		bool batch_a,
636 		unsigned read_idx);
637 
638 	bool secondName_;	// true if --tab6, false if --tab5
639 };
640 
641 /**
642  * Synchronized concrete pattern source for Illumina Qseq files.  In
643  * Qseq files, each read appears on a separate line and the tab-
644  * delimited fields are:
645  *
646  * 1. Machine name
647  * 2. Run number
648  * 3. Lane number
649  * 4. Tile number
650  * 5. X coordinate of spot
651  * 6. Y coordinate of spot
652  * 7. Index: "Index sequence or 0. For no indexing, or for a file that
653  *	  has not been demultiplexed yet, this field should have a value of
654  *	  0."
655  * 8. Read number: 1 for unpaired, 1 or 2 for paired
656  * 9. Sequence
657  * 10. Quality
658  * 11. Filter: 1 = passed, 0 = didn't
659  */
660 class QseqPatternSource : public CFilePatternSource {
661 
662 public:
663 
QseqPatternSource(const EList<std::string> & infiles,const PatternParams & p)664 	QseqPatternSource(
665 		const EList<std::string>& infiles,
666 		const PatternParams& p) :
667 		CFilePatternSource(infiles, p) { }
668 
669 	/**
670 	 * Finalize qseq parsing outside critical section.
671 	 */
672 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
673 
674 protected:
675 
676 	/**
677 	 * Light-parse a batch into the given buffer.
678 	 */
679 	virtual std::pair<bool, int> nextBatchFromFile(
680 		PerThreadReadBuf& pt,
681 		bool batch_a,
682 		unsigned read_idx);
683 
684 	EList<std::string> qualToks_;
685 };
686 
687 /**
688  * Synchronized concrete pattern source for a list of FASTA files where
689  * reads need to be extracted from long continuous sequences.
690  */
691 class FastaContinuousPatternSource : public CFilePatternSource {
692 public:
FastaContinuousPatternSource(const EList<std::string> & infiles,const PatternParams & p)693 	FastaContinuousPatternSource(
694 		const EList<std::string>& infiles,
695 		const PatternParams& p) :
696 		CFilePatternSource(infiles, p),
697 		length_(p.sampleLen),
698 		freq_(p.sampleFreq),
699 		eat_(length_-1),
700 		beginning_(true),
701 		bufCur_(0),
702 		cur_(0llu),
703 		last_(0llu)
704 	{
705 		assert_gt(freq_, 0);
706 		resetForNextFile();
707 		assert_leq(length_, 1024);
708 	}
709 
reset()710 	virtual void reset() {
711 		CFilePatternSource::reset();
712 		resetForNextFile();
713 	}
714 
715 	/**
716 	 * Finalize FASTA parsing outside critical section.
717 	 */
718 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
719 
720 protected:
721 
722 	/**
723 	 * Light-parse a batch into the given buffer.
724 	 */
725 	virtual std::pair<bool, int> nextBatchFromFile(
726 		PerThreadReadBuf& pt,
727 		bool batch_a,
728 		unsigned read_idx);
729 
730 	/**
731 	 * Reset state to be read for the next file.
732 	 */
resetForNextFile()733 	virtual void resetForNextFile() {
734 		eat_ = length_-1;
735 		name_prefix_buf_.clear();
736 		beginning_ = true;
737 		bufCur_ = 0;
738 		last_ = cur_;
739 	}
740 
741 private:
742 	const size_t length_; /// length of reads to generate
743 	const size_t freq_;   /// frequency to sample reads
744 	size_t eat_;		/// number of characters we need to skip before
745 						/// we have flushed all of the ambiguous or
746 						/// non-existent characters out of our read
747 						/// window
748 	bool beginning_;	/// skipping over the first read length?
749 	char buf_[1024];	/// FASTA sequence buffer
750 	Read::TBuf name_prefix_buf_; /// FASTA sequence name buffer
751 	char name_int_buf_[20]; /// for composing offsets for names
752 	size_t bufCur_;		/// buffer cursor; points to where we should
753 						/// insert the next character
754 	uint64_t cur_;
755 	uint64_t last_;     /// number to subtract from readCnt_ to get
756 						/// the pat id to output (so it resets to 0 for
757 						/// each new sequence)
758 };
759 
760 /**
761  * Read a FASTQ-format file.
762  * See: http://maq.sourceforge.net/fastq.shtml
763  */
764 class FastqPatternSource : public CFilePatternSource {
765 
766 public:
767 
FastqPatternSource(const EList<std::string> & infiles,const PatternParams & p,bool interleaved)768 	FastqPatternSource(
769 		const EList<std::string>& infiles,
770 		const PatternParams& p, bool interleaved) :
771 		CFilePatternSource(infiles, p),
772 		first_(true),
773 		interleaved_(interleaved) { }
774 
reset()775 	virtual void reset() {
776 		first_ = true;
777 		CFilePatternSource::reset();
778 	}
779 
780 	/**
781 	 * Finalize FASTQ parsing outside critical section.
782 	 */
783 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
784 
785 protected:
786 
787 	/**
788 	 * Light-parse a batch into the given buffer.
789 	 */
790 	virtual std::pair<bool, int> nextBatchFromFile(
791 		PerThreadReadBuf& pt,
792 		bool batch_a,
793 		unsigned read_idx);
794 
795 	/**
796 	 * Reset state to be ready for the next file.
797 	 */
resetForNextFile()798 	virtual void resetForNextFile() {
799 		first_ = true;
800 	}
801 
802 	bool first_;		// parsing first read in file
803 	bool interleaved_;	// fastq reads are interleaved
804 };
805 
806 class BAMPatternSource : public CFilePatternSource {
807 	struct hdr_t {
808 		uint8_t id1;
809 		uint8_t id2;
810 		uint8_t cm;
811 		uint8_t flg;
812 		uint32_t mtime;
813 		uint8_t xfl;
814 		uint8_t os;
815 		uint16_t xlen;
816 	};
817 
818 	struct ftr_t {
819 		uint32_t crc32;
820 		uint32_t isize;
821 	};
822 
823 	struct BGZF {
824 		hdr_t hdr;
825 		uint8_t cdata[1 << 16];
826 		ftr_t ftr;
827 	};
828 
829 	struct orphan_mate_t {
orphan_mate_torphan_mate_t830 		orphan_mate_t() :
831 			data(NULL),
832 			size(0),
833 			cap(0),
834 			hash(0) {}
835 
resetorphan_mate_t836 		void reset() {
837 			size = 0;
838 			hash = 0;
839 		}
840 
emptyorphan_mate_t841 		bool empty() const {
842 			return size == 0;
843 		}
844 
845 		uint8_t* data;
846 		uint16_t size;
847 		uint16_t cap;
848 		uint32_t hash;
849 	};
850 
851 	struct BAMField {
852 		enum aln_rec_field_name {
853 			refID,
854 			pos,
855 			l_read_name,
856 			mapq,
857 			bin,
858 			n_cigar_op,
859 			flag,
860 			l_seq,
861 			next_refID,
862 			next_pos,
863 			tlen,
864 			read_name,
865 		};
866 	};
867 
868 public:
869 
BAMPatternSource(const EList<std::string> & infiles,const PatternParams & p)870 	BAMPatternSource(
871 		const EList<std::string>& infiles,
872 		const PatternParams& p) :
873 		CFilePatternSource(infiles, p),
874 		first_(true),
875 		bam_batches_(p.nthreads),
876 		bam_batch_indexes_(p.nthreads),
877 		orphan_mates(p.nthreads * 2),
878 		orphan_mates_mutex_(),
879 		pp_(p) {
880 			// uncompressed size of BGZF block is limited to 2**16 bytes
881 			for (size_t i = 0; i < bam_batches_.size(); ++i) {
882 				bam_batches_[i].reserve(1 << 16);
883 			}
884 		}
885 
reset()886 	virtual void reset() {
887 		first_ = true;
888 		CFilePatternSource::reset();
889 	}
890 
891 	/**
892 	 * Finalize BAM parsing outside critical section.
893 	 */
894 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
895 
~BAMPatternSource()896 	~BAMPatternSource() {
897 		for (size_t i = 0; i < orphan_mates.size(); i++) {
898 			if (orphan_mates[i].data != NULL) {
899 				delete[] orphan_mates[i].data;
900 			}
901 		}
902 	}
903 
904 
905 protected:
906 
907 	virtual std::pair<bool, int> nextBatch(PerThreadReadBuf& pt, bool batch_a, bool lock = true);
908 
909 	uint16_t nextBGZFBlockFromFile(BGZF& block);
910 
911 	/**
912 	 * Reset state to be ready for the next file.
913 	 */
resetForNextFile()914 	virtual void resetForNextFile() {
915 		first_ = true;
916 	}
917 
918 	bool first_; // parsing first read in file
919 
920 private:
nextBatchFromFile(PerThreadReadBuf &,bool,unsigned)921 	virtual std::pair<bool, int> nextBatchFromFile(PerThreadReadBuf&, bool, unsigned) {
922 		return make_pair(true, 0);
923 	}
924 
925 	int decompress_bgzf_block(uint8_t *dst, size_t dst_len, uint8_t *src, size_t src_len);
926 	std::pair<bool, int> get_alignments(PerThreadReadBuf& pt, bool batch_a, unsigned& readi, bool lock);
927 	void store_orphan_mate(const uint8_t* read, size_t read_len);
928 	void get_orphaned_pairs(EList<Read>& buf_a, EList<Read>& buf_b, const size_t max_buf, unsigned& readi);
929 	void get_or_store_orhaned_mate(EList<Read>& buf_a, EList<Read>& buf_b, unsigned& readi, const uint8_t *mate, size_t mate_len);
930 	size_t get_matching_read(const uint8_t* rec);
931 
932 	static const int offset[];
933 	static const uint8_t EOF_MARKER[];
934 
935 	std::vector<std::vector<uint8_t> > bam_batches_;
936 	std::vector<size_t> bam_batch_indexes_;
937 	std::vector<orphan_mate_t> orphan_mates;
938 	MUTEX_T orphan_mates_mutex_;
939 
940 	PatternParams pp_;
941 };
942 
943 /**
944  * Read a Raw-format file (one sequence per line).	No quality strings
945  * allowed.  All qualities are assumed to be 'I' (40 on the Phred-33
946  * scale).
947  */
948 class RawPatternSource : public CFilePatternSource {
949 
950 public:
951 
RawPatternSource(const EList<std::string> & infiles,const PatternParams & p)952 	RawPatternSource(
953 		const EList<std::string>& infiles,
954 		const PatternParams& p) :
955 		CFilePatternSource(infiles, p), first_(true) { }
956 
reset()957 	virtual void reset() {
958 		first_ = true;
959 		CFilePatternSource::reset();
960 	}
961 
962 	/**
963 	 * Finalize raw parsing outside critical section.
964 	 */
965 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
966 
967 protected:
968 
969 	/**
970 	 * Light-parse a batch into the given buffer.
971 	 */
972 	virtual std::pair<bool, int> nextBatchFromFile(
973 		PerThreadReadBuf& pt,
974 		bool batch_a,
975 		unsigned read_idx);
976 
977 	/**
978 	 * Reset state to be ready for the next file.
979 	 */
resetForNextFile()980 	virtual void resetForNextFile() {
981 		first_ = true;
982 	}
983 
984 private:
985 
986 	bool first_;
987 };
988 
989 /**
990  * Abstract parent class for synhconized sources of paired-end reads
991  * (and possibly also single-end reads).
992  */
993 class PatternComposer {
994 public:
PatternComposer(const PatternParams & p)995 	PatternComposer(const PatternParams& p) : mutex_m() { }
996 
~PatternComposer()997 	virtual ~PatternComposer() { }
998 
999 	virtual void reset() = 0;
1000 
1001 	/**
1002 	 * Member function override by concrete, format-specific classes.
1003 	 */
1004 	virtual std::pair<bool, int> nextBatch(PerThreadReadBuf& pt) = 0;
1005 
1006 	/**
1007 	 * Make appropriate call into the format layer to parse individual read.
1008 	 */
1009 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) = 0;
1010 
1011 	/**
1012 	 * Given the values for all of the various arguments used to specify
1013 	 * the read and quality input, create a list of pattern sources to
1014 	 * dispense them.
1015 	 */
1016 	static PatternComposer* setupPatternComposer(
1017 		const EList<std::string>& si,	 // singles, from argv
1018 		const EList<std::string>& m1,	 // mate1's, from -1 arg
1019 		const EList<std::string>& m2,	 // mate2's, from -2 arg
1020 		const EList<std::string>& m12,	 // both mates on each line, from --12
1021 		const EList<std::string>& q,	 // qualities associated with singles
1022 		const EList<std::string>& q1,	 // qualities associated with m1
1023 		const EList<std::string>& q2,	 // qualities associated with m2
1024 #ifdef USE_SRA
1025 		const EList<string>& sra_accs,   // SRA accessions
1026 #endif
1027 		PatternParams& p,		// read-in params
1028 		bool verbose);				// be talkative?
1029 
1030 protected:
1031 
1032 	static void free_EList_pmembers(const EList<PatternSource*>&);
1033 
1034 	/// Lock enforcing mutual exclusion for (a) file I/O, (b) writing fields
1035 	/// of this or another other shared object.
1036 	MUTEX_T mutex_m;
1037 };
1038 
1039 /**
1040  * Encapsulates a synchronized source of both paired-end reads and
1041  * unpaired reads, where the paired-end must come from parallel files.
1042  */
1043 class SoloPatternComposer : public PatternComposer {
1044 
1045 public:
1046 
SoloPatternComposer(const EList<PatternSource * > * src,const PatternParams & p)1047 	SoloPatternComposer(
1048 		const EList<PatternSource*>* src,
1049 		const PatternParams& p) :
1050 		PatternComposer(p),
1051 		cur_(0),
1052 		src_(src)
1053 	{
1054 		assert(src_ != NULL);
1055 		lock_ = p.nthreads > 1;
1056 		for(size_t i = 0; i < src_->size(); i++) {
1057 			assert((*src_)[i] != NULL);
1058 		}
1059 	}
1060 
~SoloPatternComposer()1061 	virtual ~SoloPatternComposer() {
1062 		free_EList_pmembers(*src_);
1063 		delete src_;
1064 	}
1065 
1066 	/**
1067 	 * Reset this object and all the PatternSources under it so that
1068 	 * the next call to nextBatch gets the very first read.
1069 	 */
reset()1070 	virtual void reset() {
1071 		for(size_t i = 0; i < src_->size(); i++) {
1072 			(*src_)[i]->reset();
1073 		}
1074 		cur_ = 0;
1075 	}
1076 
1077 	/**
1078 	 * Calls member functions of the individual PatternSource objects
1079 	 * to get more reads.  Since there's no need to keep two separate
1080 	 * files in sync (as there is for DualPatternComposer),
1081 	 * synchronization can be handed by the PatternSource contained
1082 	 * in the src_ field.
1083 	 */
1084 	virtual std::pair<bool, int> nextBatch(PerThreadReadBuf& pt);
1085 
1086 	/**
1087 	 * Make appropriate call into the format layer to parse individual read.
1088 	 */
parse(Read & ra,Read & rb,TReadId rdid)1089 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) {
1090 		return (*src_)[0]->parse(ra, rb, rdid);
1091 	}
1092 
1093 protected:
1094 	volatile bool lock_;
1095 	volatile size_t cur_; // current element in parallel srca_, srcb_ vectors
1096 	const EList<PatternSource*>* src_; /// PatternSources for paired-end reads
1097 };
1098 
1099 /**
1100  * Encapsulates a synchronized source of both paired-end reads and
1101  * unpaired reads, where the paired-end must come from parallel files.
1102  */
1103 class DualPatternComposer : public PatternComposer {
1104 
1105 public:
1106 
DualPatternComposer(const EList<PatternSource * > * srca,const EList<PatternSource * > * srcb,const PatternParams & p)1107 	DualPatternComposer(
1108 		const EList<PatternSource*>* srca,
1109 		const EList<PatternSource*>* srcb,
1110 		const PatternParams& p) :
1111 		PatternComposer(p), cur_(0), srca_(srca), srcb_(srcb)
1112 	{
1113 		assert(srca_ != NULL);
1114 		assert(srcb_ != NULL);
1115 		// srca_ and srcb_ must be parallel
1116 		assert_eq(srca_->size(), srcb_->size());
1117 		lock_ = p.nthreads > 1;
1118 		for(size_t i = 0; i < srca_->size(); i++) {
1119 			// Can't have NULL first-mate sources.	Second-mate sources
1120 			// can be NULL, in the case when the corresponding first-
1121 			// mate source is unpaired.
1122 			assert((*srca_)[i] != NULL);
1123 			for(size_t j = 0; j < srcb_->size(); j++) {
1124 				assert_neq((*srca_)[i], (*srcb_)[j]);
1125 			}
1126 		}
1127 	}
1128 
~DualPatternComposer()1129 	virtual ~DualPatternComposer() {
1130 		free_EList_pmembers(*srca_);
1131 		delete srca_;
1132 		free_EList_pmembers(*srcb_);
1133 		delete srcb_;
1134 	}
1135 
1136 	/**
1137 	 * Reset this object and all the PatternSources under it so that
1138 	 * the next call to nextBatch gets the very first read.
1139 	 */
reset()1140 	virtual void reset() {
1141 		for(size_t i = 0; i < srca_->size(); i++) {
1142 			(*srca_)[i]->reset();
1143 			if((*srcb_)[i] != NULL) {
1144 				(*srcb_)[i]->reset();
1145 			}
1146 		}
1147 		cur_ = 0;
1148 	}
1149 
1150 	/**
1151 	 * Calls member functions of the individual PatternSource objects
1152 	 * to get more reads.  Since we need to keep the two separate
1153 	 * files in sync, synchronization can be handed at this level, with
1154 	 * one critical section surrounding both calls into the srca_ and
1155 	 * srcb_ member functions.
1156 	 */
1157 	virtual std::pair<bool, int> nextBatch(PerThreadReadBuf& pt);
1158 
1159 	/**
1160 	 * Make appropriate call into the format layer to parse individual read.
1161 	 */
parse(Read & ra,Read & rb,TReadId rdid)1162 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) {
1163 		return (*srca_)[0]->parse(ra, rb, rdid);
1164 	}
1165 
1166 protected:
1167 
1168 	volatile bool lock_;
1169 	volatile size_t cur_; // current element in parallel srca_, srcb_ vectors
1170 	const EList<PatternSource*>* srca_; // for 1st matesunpaired
1171 	const EList<PatternSource*>* srcb_; // for 2nd mates
1172 };
1173 
1174 /**
1175  * Encapsulates a single thread's interaction with the PatternSource.
1176  * Most notably, this class holds the buffers into which the
1177  * PatterSource will write sequences.  This class is *not* threadsafe
1178  * - it doesn't need to be since there's one per thread.  PatternSource
1179  * is thread-safe.
1180  */
1181 class PatternSourcePerThread {
1182 
1183 public:
1184 
PatternSourcePerThread(PatternComposer & composer,const PatternParams & pp,int tid)1185 	PatternSourcePerThread(
1186 		PatternComposer& composer,
1187 		const PatternParams& pp, int tid) :
1188 		composer_(composer),
1189 		buf_(pp.max_buf, tid),
1190 		pp_(pp),
1191 		last_batch_(false),
1192 		last_batch_size_(0) { }
1193 
1194 	/**
1195 	 * Use objects in the PatternSource and/or PatternComposer
1196 	 * hierarchies to populate the per-thread buffers.
1197 	 */
1198 	std::pair<bool, bool> nextReadPair();
1199 
read_a()1200 	Read& read_a() { return buf_.read_a(); }
read_b()1201 	Read& read_b() { return buf_.read_b(); }
1202 
read_a()1203 	const Read& read_a() const { return buf_.read_a(); }
read_b()1204 	const Read& read_b() const { return buf_.read_b(); }
1205 
1206 private:
1207 
1208 	/**
1209 	 * When we've finished fully parsing and dishing out reads in
1210 	 * the current batch, we go get the next one by calling into
1211 	 * the composition layer.
1212 	 */
nextBatch()1213 	std::pair<bool, int> nextBatch() {
1214 		buf_.reset();
1215 		std::pair<bool, int> res = composer_.nextBatch(buf_);
1216 		buf_.init();
1217 		return res;
1218 	}
1219 
1220 	/**
1221 	 * Once name/sequence/qualities have been parsed for an
1222 	 * unpaired read, set all the other key fields of the Read
1223 	 * struct.
1224 	 */
1225 	void finalize(Read& ra);
1226 
1227 	/**
1228 	 * Once name/sequence/qualities have been parsed for a
1229 	 * paired-end read, set all the other key fields of the Read
1230 	 * structs.
1231 	 */
1232 	void finalizePair(Read& ra, Read& rb);
1233 
1234 	/**
1235 	 * Call into composition layer (which in turn calls into
1236 	 * format layer) to parse the read.
1237 	 */
parse(Read & ra,Read & rb)1238 	bool parse(Read& ra, Read& rb) {
1239 		return composer_.parse(ra, rb, buf_.rdid());
1240 	}
1241 
trim(Read & r)1242 	void trim(Read& r) {
1243 		if (pp_.trimTo.second > 0) {
1244 			switch (pp_.trimTo.first) {
1245 				case 3:
1246 					if (r.patFw.length() > pp_.trimTo.second) {
1247 						r.trimmed5 = r.patFw.length() - pp_.trimTo.second;
1248 						r.patFw.trimEnd(r.trimmed5);
1249 						r.qual.trimEnd(r.trimmed5);
1250 					}
1251 					break;
1252 				case 5:
1253 					if (r.patFw.length() > pp_.trimTo.second) {
1254 						r.trimmed3 = r.patFw.length() - pp_.trimTo.second;
1255 						r.patFw.trimBegin(r.trimmed3);
1256 						r.qual.trimBegin(r.trimmed3);
1257 					}
1258 					break;
1259 			}
1260 		}
1261 	}
1262 
1263 	PatternComposer& composer_; // pattern composer
1264 	PerThreadReadBuf buf_;		// read data buffer
1265 	const PatternParams& pp_;	// pattern-related parameters
1266 	bool last_batch_;			// true if this is final batch
1267 	int last_batch_size_;		// # reads read in previous batch
1268 };
1269 
1270 /**
1271  * Abstract parent factory for PatternSourcePerThreads.
1272  */
1273 class PatternSourcePerThreadFactory {
1274 public:
PatternSourcePerThreadFactory(PatternComposer & composer,const PatternParams & pp,int tid)1275 	PatternSourcePerThreadFactory(
1276 		PatternComposer& composer,
1277 		const PatternParams& pp, int tid) :
1278 		composer_(composer),
1279 		pp_(pp),
1280 		tid_(tid) { }
1281 
1282 	/**
1283 	 * Create a new heap-allocated PatternSourcePerThreads.
1284 	 */
create()1285 	virtual PatternSourcePerThread* create() const {
1286 		return new PatternSourcePerThread(composer_, pp_, tid_);
1287 	}
1288 
1289 	/**
1290 	 * Create a new heap-allocated vector of heap-allocated
1291 	 * PatternSourcePerThreads.
1292 	 */
create(uint32_t n)1293 	virtual EList<PatternSourcePerThread*>* create(uint32_t n) const {
1294 		EList<PatternSourcePerThread*>* v = new EList<PatternSourcePerThread*>;
1295 		for(size_t i = 0; i < n; i++) {
1296 			v->push_back(new PatternSourcePerThread(composer_, pp_, tid_));
1297 			assert(v->back() != NULL);
1298 		}
1299 		return v;
1300 	}
1301 
~PatternSourcePerThreadFactory()1302 	virtual ~PatternSourcePerThreadFactory() {}
1303 
1304 private:
1305 	/// Container for obtaining paired reads from PatternSources
1306 	PatternComposer& composer_;
1307 	const PatternParams& pp_;
1308 	int tid_;
1309 };
1310 
1311 #ifdef USE_SRA
1312 
1313 namespace ngs {
1314 	class ReadCollection;
1315 	class ReadIterator;
1316 }
1317 
1318 /**
1319  * Pattern source for reading directly from the SRA archive.
1320  */
1321 class SRAPatternSource : public PatternSource {
1322 public:
SRAPatternSource(const EList<string> & sra_accs,const PatternParams & p)1323 	SRAPatternSource(
1324 		const EList<string>& sra_accs,
1325 		const PatternParams& p) :
1326 		PatternSource(p),
1327 		sra_accs_(sra_accs),
1328 		sra_acc_cur_(0),
1329 		cur_(0),
1330 		first_(true),
1331 		sra_its_(p.nthreads),
1332 		mutex_m(),
1333 		pp_(p)
1334 	{
1335 		assert_gt(sra_accs_.size(), 0);
1336 		errs_.resize(sra_accs_.size());
1337 		errs_.fill(0, sra_accs_.size(), false);
1338 		open(); // open first file in the list
1339 		sra_acc_cur_++;
1340 	}
1341 
~SRAPatternSource()1342 	virtual ~SRAPatternSource() {
1343 		for (size_t i = 0; i < sra_its_.size(); i++) {
1344 			if(sra_its_[i] != NULL) {
1345 				delete sra_its_[i];
1346 				sra_its_[i] = NULL;
1347 			}
1348 		}
1349 	}
1350 
1351 	/**
1352 	 * Fill Read with the sequence, quality and name for the next
1353 	 * read in the list of read files.	This function gets called by
1354 	 * all the search threads, so we must handle synchronization.
1355 	 *
1356 	 * Returns pair<bool, int> where bool indicates whether we're
1357 	 * completely done, and int indicates how many reads were read.
1358 	 */
1359 	virtual std::pair<bool, int> nextBatch(
1360 		PerThreadReadBuf& pt,
1361 		bool batch_a,
1362 		bool lock);
1363 
1364 	/**
1365 	 * Finishes parsing outside the critical section
1366 	 */
1367 	virtual bool parse(Read& ra, Read& rb, TReadId rdid) const;
1368 
1369 	/**
1370 	 * Reset so that next call to nextBatch* gets the first batch.
1371 	 * Should only be called by the master thread.
1372 	 */
reset()1373 	virtual void reset() {
1374 		PatternSource::reset();
1375 		sra_acc_cur_ = 0,
1376 		open();
1377 		sra_acc_cur_++;
1378 	}
1379 
1380 protected:
1381 
1382 	std::pair<bool, int> nextBatchImpl(
1383 		PerThreadReadBuf& pt,
1384 		bool batch_a);
1385 
1386 	/**
1387 	 * Open the next file in the list of input files.
1388 	 */
1389 	void open();
1390 
1391 	EList<string> sra_accs_; // filenames for read files
1392 	EList<bool> errs_;       // whether we've already printed an error for each file
1393 	size_t sra_acc_cur_;     // index into infiles_ of next file to read
1394 	size_t cur_;             // current read id
1395 	bool first_;
1396 
1397 	std::vector<ngs::ReadIterator*> sra_its_;
1398 
1399 	/// Lock enforcing mutual exclusion for (a) file I/O, (b) writing fields
1400 	/// of this or another other shared object.
1401 	MUTEX_T mutex_m;
1402 
1403 	PatternParams pp_;
1404 };
1405 
1406 #endif
1407 
1408 #endif /*PAT_H_*/
1409