1 /* 2 * Copyright 2011, Ben Langmead <langmea@cs.jhu.edu> 3 * 4 * This file is part of Bowtie 2. 5 * 6 * Bowtie 2 is free software: you can redistribute it and/or modify 7 * it under the terms of the GNU General Public License as published by 8 * the Free Software Foundation, either version 3 of the License, or 9 * (at your option) any later version. 10 * 11 * Bowtie 2 is distributed in the hope that it will be useful, 12 * but WITHOUT ANY WARRANTY; without even the implied warranty of 13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 * GNU General Public License for more details. 15 * 16 * You should have received a copy of the GNU General Public License 17 * along with Bowtie 2. If not, see <http://www.gnu.org/licenses/>. 18 */ 19 20 #ifndef PAT_H_ 21 #define PAT_H_ 22 23 #include <stdio.h> 24 #include <stdint.h> 25 #include <unistd.h> 26 #include <sys/stat.h> 27 #include <zlib.h> 28 #include <cassert> 29 #include <string> 30 #include <ctype.h> 31 #include <vector> 32 #include "alphabet.h" 33 #include "assert_helpers.h" 34 #include "random_source.h" 35 #include "threading.h" 36 #include "qual.h" 37 #include "search_globals.h" 38 #include "sstring.h" 39 #include "ds.h" 40 #include "read.h" 41 #include "util.h" 42 #ifdef WITH_ZSTD 43 #include "zstd_decompress.h" 44 #endif 45 46 #ifdef USE_SRA 47 #include <ncbi-vdb/NGS.hpp> 48 #include <ngs/ErrorMsg.hpp> 49 #include <ngs/ReadCollection.hpp> 50 #include <ngs/ReadIterator.hpp> 51 #include <ngs/Read.hpp> 52 #endif 53 54 #ifdef _WIN32 55 #define getc_unlocked _fgetc_nolock 56 #endif 57 58 /** 59 * Classes and routines for reading reads from various input sources. 60 */ 61 62 /** 63 * Parameters affecting how reads and read in. 64 */ 65 struct PatternParams { PatternParamsPatternParams66 PatternParams() { } 67 PatternParamsPatternParams68 PatternParams( 69 int format_, 70 bool interleaved_, 71 bool fileParallel_, 72 uint32_t seed_, 73 size_t max_buf_, 74 bool solexa64_, 75 bool phred64_, 76 bool intQuals_, 77 int trim5_, 78 int trim3_, 79 pair<short, size_t> trimTo_, 80 int sampleLen_, 81 int sampleFreq_, 82 size_t skip_, 83 uint64_t upto_, 84 int nthreads_, 85 bool fixName_, 86 bool preserve_tags_, 87 bool align_paired_reads_) : 88 format(format_), 89 interleaved(interleaved_), 90 fileParallel(fileParallel_), 91 seed(seed_), 92 max_buf(max_buf_), 93 solexa64(solexa64_), 94 phred64(phred64_), 95 intQuals(intQuals_), 96 trim5(trim5_), 97 trim3(trim3_), 98 trimTo(trimTo_), 99 sampleLen(sampleLen_), 100 sampleFreq(sampleFreq_), 101 skip(skip_), 102 upto(upto_), 103 nthreads(nthreads_), 104 fixName(fixName_), 105 preserve_tags(preserve_tags_), 106 align_paired_reads(align_paired_reads_) { } 107 108 int format; // file format 109 bool interleaved; // some or all of the FASTQ/FASTA reads are interleaved 110 bool fileParallel; // true -> wrap files with separate PatternComposers 111 uint32_t seed; // pseudo-random seed 112 size_t max_buf; // number of reads to buffer in one read 113 bool solexa64; // true -> qualities are on solexa64 scale 114 bool phred64; // true -> qualities are on phred64 scale 115 bool intQuals; // true -> qualities are space-separated numbers 116 int trim5; // amt to hard clip from 5' end 117 int trim3; // amt to hard clip from 3' end 118 pair<short, size_t> trimTo; 119 int sampleLen; // length of sampled reads for FastaContinuous... 120 int sampleFreq; // frequency of sampled reads for FastaContinuous... 121 size_t skip; // skip the first 'skip' patterns 122 uint64_t upto; // max number of queries to read 123 int nthreads; // number of threads for locking 124 bool fixName; // 125 bool preserve_tags; // keep existing tags when aligning BAM files 126 bool align_paired_reads; 127 }; 128 129 /** 130 * All per-thread storage for input read data. 131 */ 132 struct PerThreadReadBuf { 133 PerThreadReadBufPerThreadReadBuf134 PerThreadReadBuf(size_t max_buf, int tid) : 135 max_buf_(max_buf), 136 bufa_(max_buf), 137 bufb_(max_buf), 138 rdid_(), 139 tid_(tid) 140 { 141 bufa_.resize(max_buf); 142 bufb_.resize(max_buf); 143 reset(); 144 } 145 read_aPerThreadReadBuf146 Read& read_a() { return bufa_[cur_buf_]; } read_bPerThreadReadBuf147 Read& read_b() { return bufb_[cur_buf_]; } 148 read_aPerThreadReadBuf149 const Read& read_a() const { return bufa_[cur_buf_]; } read_bPerThreadReadBuf150 const Read& read_b() const { return bufb_[cur_buf_]; } 151 152 /** 153 * Return read id for read/pair currently in the buffer. 154 */ rdidPerThreadReadBuf155 TReadId rdid() const { 156 assert_neq(rdid_, std::numeric_limits<TReadId>::max()); 157 return rdid_ + cur_buf_; 158 } 159 160 /** 161 * Reset state as though no reads have been read. 162 */ resetPerThreadReadBuf163 void reset() { 164 cur_buf_ = bufa_.size(); 165 for(size_t i = 0; i < max_buf_; i++) { 166 bufa_[i].reset(); 167 bufb_[i].reset(); 168 } 169 rdid_ = std::numeric_limits<TReadId>::max(); 170 } 171 172 /** 173 * Advance cursor to next element 174 */ nextPerThreadReadBuf175 void next() { 176 assert_lt(cur_buf_, bufa_.size()); 177 cur_buf_++; 178 } 179 180 /** 181 * Return true when there's nothing left for next(). 182 */ exhaustedPerThreadReadBuf183 bool exhausted() { 184 assert_leq(cur_buf_, bufa_.size()); 185 return cur_buf_ >= bufa_.size()-1 || bufa_[cur_buf_+1].readOrigBuf.empty(); 186 } 187 188 /** 189 * Just after a new batch has been loaded, use init to 190 * set cur_buf_ appropriately. 191 */ initPerThreadReadBuf192 void init() { 193 cur_buf_ = 0; 194 } 195 196 /** 197 * Set read id of first read in buffer. 198 */ setReadIdPerThreadReadBuf199 void setReadId(TReadId rdid) { 200 rdid_ = rdid; 201 } 202 203 const size_t max_buf_; // max # reads to read into buffer at once 204 EList<Read> bufa_; // Read buffer for mate as 205 EList<Read> bufb_; // Read buffer for mate bs 206 size_t cur_buf_; // Read buffer currently active 207 TReadId rdid_; // index of read at offset 0 of bufa_/bufb_ 208 int tid_; 209 }; 210 211 extern void wrongQualityFormat(const BTString& read_name); 212 extern void tooFewQualities(const BTString& read_name); 213 extern void tooManyQualities(const BTString& read_name); 214 215 /** 216 * Encapsulates a synchronized source of patterns; usually a file. 217 * Optionally reverses reads and quality strings before returning them, 218 * though that is usually more efficiently done by the concrete 219 * subclass. Concrete subclasses should delimit critical sections with 220 * calls to lock() and unlock(). 221 */ 222 class PatternSource { 223 224 public: 225 PatternSource(const PatternParams & p)226 PatternSource(const PatternParams& p) : 227 pp_(p), 228 readCnt_(0), 229 mutex() { } 230 ~PatternSource()231 virtual ~PatternSource() { } 232 233 /** 234 * Implementation to be provided by concrete subclasses. An 235 * implementation for this member is only relevant for formats 236 * where individual input sources look like single-end-read 237 * sources, e.g., formats where paired-end reads are specified in 238 * parallel read files. 239 */ 240 virtual std::pair<bool, int> nextBatch( 241 PerThreadReadBuf& pt, 242 bool batch_a, 243 bool lock = true) = 0; 244 245 /** 246 * Finishes parsing a given read. Happens outside the critical section. 247 */ 248 virtual bool parse(Read& ra, Read& rb, TReadId rdid) const = 0; 249 250 /** 251 * Reset so that next call to nextBatch* gets the first batch. 252 */ reset()253 virtual void reset() { readCnt_ = 0; } 254 255 /** 256 * Return a new dynamically allocated PatternSource for the given 257 * format, using the given list of strings as the filenames to read 258 * from or as the sequences themselves (i.e. if -c was used). 259 */ 260 static PatternSource* patsrcFromStrings( 261 const PatternParams& p, 262 const EList<std::string>& qs); 263 264 /** 265 * Return number of reads light-parsed by this stream so far. 266 */ readCount()267 TReadId readCount() const { return readCnt_; } 268 269 protected: 270 271 272 // Reference to global input-parsing parameters 273 const PatternParams& pp_; 274 275 // The number of reads read by this PatternSource 276 volatile TReadId readCnt_; 277 278 // Lock enforcing mutual exclusion for (a) file I/O, (b) writing fields 279 // of this or another other shared object. 280 MUTEX_T mutex; 281 }; 282 283 /** 284 * Encapsulates a source of patterns which is an in-memory vector. 285 */ 286 class VectorPatternSource : public PatternSource { 287 288 public: 289 290 /** 291 * Populate member lists, v_, quals_, names_, etc, with information parsed 292 * from the given list of strings. 293 */ 294 VectorPatternSource( 295 const EList<std::string>& v, 296 const PatternParams& p); 297 ~VectorPatternSource()298 virtual ~VectorPatternSource() { } 299 300 /** 301 * Read next batch. However, batch concept is not very applicable for this 302 * PatternSource where all the info has already been parsed into the fields 303 * in the contsructor. This essentially modifies the pt as though we read 304 * in some number of patterns. 305 */ 306 virtual std::pair<bool, int> nextBatch( 307 PerThreadReadBuf& pt, 308 bool batch_a, 309 bool lock = true); 310 311 /** 312 * Reset so that next call to nextBatch* gets the first batch. 313 */ reset()314 virtual void reset() { 315 PatternSource::reset(); 316 cur_ = skip_; 317 paired_ = false; 318 } 319 320 /** 321 * Finishes parsing outside the critical section 322 */ 323 virtual bool parse(Read& ra, Read& rb, TReadId rdid) const; 324 325 private: 326 327 pair<bool, int> nextBatchImpl( 328 PerThreadReadBuf& pt, 329 bool batch_a); 330 331 size_t cur_; // index for first read of next batch 332 size_t skip_; // # reads to skip 333 bool paired_; // whether reads are paired 334 EList<string> tokbuf_; // buffer for storing parsed tokens 335 EList<Read::TBuf> bufs_; // per-read buffers 336 char nametmp_[20]; // temp buffer for constructing name 337 }; 338 339 /** 340 * Parent class for PatternSources that read from a file. 341 * Uses unlocked C I/O, on the assumption that all reading 342 * from the file will take place in an otherwise-protected 343 * critical section. 344 */ 345 class CFilePatternSource : public PatternSource { 346 enum CompressionType { 347 NONE, 348 GZIP, 349 #ifdef WITH_ZSTD 350 ZSTD, 351 #endif 352 }; 353 public: CFilePatternSource(const EList<std::string> & infiles,const PatternParams & p)354 CFilePatternSource( 355 const EList<std::string>& infiles, 356 const PatternParams& p) : 357 PatternSource(p), 358 infiles_(infiles), 359 filecur_(0), 360 fp_(NULL), 361 zfp_(NULL), 362 #ifdef WITH_ZSTD 363 zstdfp_(NULL), 364 #endif 365 is_open_(false), 366 skip_(p.skip), 367 first_(true), 368 compressionType_(CompressionType::NONE) 369 { 370 assert_gt(infiles.size(), 0); 371 errs_.resize(infiles_.size()); 372 errs_.fill(0, infiles_.size(), false); 373 open(); // open first file in the list 374 filecur_++; 375 } 376 377 /** 378 * Close open file. 379 */ ~CFilePatternSource()380 virtual ~CFilePatternSource() { 381 if(is_open_) { 382 switch (compressionType_) { 383 case CompressionType::NONE: 384 assert(fp_ != NULL); 385 fclose(fp_); 386 break; 387 case CompressionType::GZIP: 388 assert(zfp_ != NULL); 389 gzclose(zfp_); 390 break; 391 #ifdef WITH_ZSTD 392 case CompressionType::ZSTD: 393 assert(zstdfp_ != NULL); 394 zstdClose(zstdfp_); 395 break; 396 #endif 397 } 398 } 399 } 400 401 /** 402 * Fill Read with the sequence, quality and name for the next 403 * read in the list of read files. This function gets called by 404 * all the search threads, so we must handle synchronization. 405 * 406 * Returns pair<bool, int> where bool indicates whether we're 407 * completely done, and int indicates how many reads were read. 408 */ 409 virtual std::pair<bool, int> nextBatch( 410 PerThreadReadBuf& pt, 411 bool batch_a, 412 bool lock = true); 413 414 /** 415 * Reset so that next call to nextBatch* gets the first batch. 416 * Should only be called by the master thread. 417 */ reset()418 virtual void reset() { 419 PatternSource::reset(); 420 filecur_ = 0, 421 open(); 422 filecur_++; 423 } 424 425 protected: 426 427 /** 428 * Light-parse a batch of unpaired reads from current file into the given 429 * buffer. Called from CFilePatternSource.nextBatch(). 430 */ 431 virtual std::pair<bool, int> nextBatchFromFile( 432 PerThreadReadBuf& pt, 433 bool batch_a, 434 unsigned read_idx) = 0; 435 436 /** 437 * Reset state to handle a fresh file 438 */ resetForNextFile()439 virtual void resetForNextFile() { } 440 441 /** 442 * Open the next file in the list of input files. 443 */ 444 void open(); 445 getc_wrapper()446 int getc_wrapper() { 447 int c; 448 449 do { 450 if (compressionType_ == CompressionType::GZIP) 451 c = gzgetc(zfp_); 452 #ifdef WITH_ZSTD 453 else if (compressionType_ == CompressionType::ZSTD) 454 c = zstdGetc(zstdfp_); 455 #endif 456 else 457 c = getc_unlocked(fp_); 458 } while (c != EOF && c != '\t' && c != '\r' && c != '\n' && !isprint(c)); 459 460 return c; 461 } 462 ungetc_wrapper(int c)463 int ungetc_wrapper(int c) { 464 if (compressionType_ == CompressionType::GZIP) 465 return gzungetc(c, zfp_); 466 #ifdef WITH_ZSTD 467 else if (compressionType_ == CompressionType::ZSTD) 468 return zstdUngetc(c, zstdfp_); 469 #endif 470 else 471 return ungetc(c, fp_); 472 } 473 zread(voidp buf,unsigned len)474 int zread(voidp buf, unsigned len) { 475 int r = gzread(zfp_, buf, len); 476 if (r < 0) { 477 const char *err = gzerror(zfp_, NULL); 478 if (err != NULL) { 479 std::cerr << err << std::endl; 480 } 481 } 482 return r; 483 } 484 is_gzipped_file(int fd)485 bool is_gzipped_file(int fd) { 486 if (fd == -1) { 487 return false; 488 } 489 490 uint8_t byte1, byte2; 491 492 ssize_t r1 = read(fd, &byte1, sizeof(uint8_t)); 493 ssize_t r2 = read(fd, &byte2, sizeof(uint8_t)); 494 495 lseek(fd, 0, SEEK_SET); 496 if (r1 == 0 || r2 == 0) { 497 std::cerr << "Unable to read file magic number" << std::endl; 498 return false; 499 } 500 501 if (byte1 == 0x1f && byte2 == 0x8b) { 502 return true; 503 } 504 505 return false; 506 } 507 508 #ifdef WITH_ZSTD is_zstd_file(int fd)509 bool is_zstd_file(int fd) { 510 if (fd == -1) 511 return false; 512 513 int magic; 514 515 if (read(fd, &magic, sizeof(int)) != sizeof(int)) { 516 std::cerr << "is_zstd_file: unable to read magic number" << std::endl; 517 return false; 518 } 519 lseek(fd, 0, SEEK_SET); 520 521 return magic == 0xfd2fb528; 522 } 523 #endif 524 525 EList<std::string> infiles_; // filenames for read files 526 EList<bool> errs_; // whether we've already printed an error for each file 527 size_t filecur_; // index into infiles_ of next file to read 528 FILE *fp_; // read file currently being read from 529 gzFile zfp_; // compressed version of fp_ 530 #ifdef WITH_ZSTD 531 zstdStrm *zstdfp_; // zstd compressed file 532 #endif 533 bool is_open_; // whether fp_ is currently open 534 TReadId skip_; // number of reads to skip 535 bool first_; // parsing first record in first file? 536 char buf_[64*1024]; // file buffer 537 CompressionType compressionType_; 538 539 private: 540 541 pair<bool, int> nextBatchImpl( 542 PerThreadReadBuf& pt, 543 bool batch_a); 544 }; 545 546 /** 547 * Synchronized concrete pattern source for a list of FASTA files. 548 */ 549 class FastaPatternSource : public CFilePatternSource { 550 551 public: 552 FastaPatternSource(const EList<std::string> & infiles,const PatternParams & p,bool interleaved)553 FastaPatternSource( 554 const EList<std::string>& infiles, 555 const PatternParams& p, bool interleaved) : 556 CFilePatternSource(infiles, p), 557 first_(true), 558 interleaved_(interleaved) { } 559 560 /** 561 * Reset so that next call to nextBatch* gets the first batch. 562 * Should only be called by the master thread. 563 */ reset()564 virtual void reset() { 565 first_ = true; 566 CFilePatternSource::reset(); 567 } 568 569 /** 570 * Finalize FASTA parsing outside critical section. 571 */ 572 virtual bool parse(Read& ra, Read& rb, TReadId rdid) const; 573 574 protected: 575 576 /** 577 * Light-parse a FASTA batch into the given buffer. 578 */ 579 virtual std::pair<bool, int> nextBatchFromFile( 580 PerThreadReadBuf& pt, 581 bool batch_a, 582 unsigned read_idx); 583 584 /** 585 * Scan to the next FASTA record (starting with >) and return the first 586 * character of the record (which will always be >). 587 */ skipToNextFastaRecord(FileBuf & in)588 static int skipToNextFastaRecord(FileBuf& in) { 589 int c; 590 while((c = in.get()) != '>') { 591 if(in.eof()) return -1; 592 } 593 return c; 594 } 595 596 /** 597 * Reset state to handle a fresh file 598 */ resetForNextFile()599 virtual void resetForNextFile() { 600 first_ = true; 601 } 602 603 bool first_; 604 bool interleaved_; 605 }; 606 607 /** 608 * Synchronized concrete pattern source for a list of files with tab- 609 * delimited name, seq, qual fields (or, for paired-end reads, 610 * basename, seq1, qual1, seq2, qual2). 611 */ 612 class TabbedPatternSource : public CFilePatternSource { 613 614 public: 615 TabbedPatternSource(const EList<std::string> & infiles,const PatternParams & p,bool secondName)616 TabbedPatternSource( 617 const EList<std::string>& infiles, 618 const PatternParams& p, 619 bool secondName) : // whether it's --12/--tab5 or --tab6 620 CFilePatternSource(infiles, p), 621 secondName_(secondName) { } 622 623 /** 624 * Finalize tabbed parsing outside critical section. 625 */ 626 virtual bool parse(Read& ra, Read& rb, TReadId rdid) const; 627 628 protected: 629 630 /** 631 * Light-parse a batch of tabbed-format reads into given buffer. 632 */ 633 virtual std::pair<bool, int> nextBatchFromFile( 634 PerThreadReadBuf& pt, 635 bool batch_a, 636 unsigned read_idx); 637 638 bool secondName_; // true if --tab6, false if --tab5 639 }; 640 641 /** 642 * Synchronized concrete pattern source for Illumina Qseq files. In 643 * Qseq files, each read appears on a separate line and the tab- 644 * delimited fields are: 645 * 646 * 1. Machine name 647 * 2. Run number 648 * 3. Lane number 649 * 4. Tile number 650 * 5. X coordinate of spot 651 * 6. Y coordinate of spot 652 * 7. Index: "Index sequence or 0. For no indexing, or for a file that 653 * has not been demultiplexed yet, this field should have a value of 654 * 0." 655 * 8. Read number: 1 for unpaired, 1 or 2 for paired 656 * 9. Sequence 657 * 10. Quality 658 * 11. Filter: 1 = passed, 0 = didn't 659 */ 660 class QseqPatternSource : public CFilePatternSource { 661 662 public: 663 QseqPatternSource(const EList<std::string> & infiles,const PatternParams & p)664 QseqPatternSource( 665 const EList<std::string>& infiles, 666 const PatternParams& p) : 667 CFilePatternSource(infiles, p) { } 668 669 /** 670 * Finalize qseq parsing outside critical section. 671 */ 672 virtual bool parse(Read& ra, Read& rb, TReadId rdid) const; 673 674 protected: 675 676 /** 677 * Light-parse a batch into the given buffer. 678 */ 679 virtual std::pair<bool, int> nextBatchFromFile( 680 PerThreadReadBuf& pt, 681 bool batch_a, 682 unsigned read_idx); 683 684 EList<std::string> qualToks_; 685 }; 686 687 /** 688 * Synchronized concrete pattern source for a list of FASTA files where 689 * reads need to be extracted from long continuous sequences. 690 */ 691 class FastaContinuousPatternSource : public CFilePatternSource { 692 public: FastaContinuousPatternSource(const EList<std::string> & infiles,const PatternParams & p)693 FastaContinuousPatternSource( 694 const EList<std::string>& infiles, 695 const PatternParams& p) : 696 CFilePatternSource(infiles, p), 697 length_(p.sampleLen), 698 freq_(p.sampleFreq), 699 eat_(length_-1), 700 beginning_(true), 701 bufCur_(0), 702 cur_(0llu), 703 last_(0llu) 704 { 705 assert_gt(freq_, 0); 706 resetForNextFile(); 707 assert_leq(length_, 1024); 708 } 709 reset()710 virtual void reset() { 711 CFilePatternSource::reset(); 712 resetForNextFile(); 713 } 714 715 /** 716 * Finalize FASTA parsing outside critical section. 717 */ 718 virtual bool parse(Read& ra, Read& rb, TReadId rdid) const; 719 720 protected: 721 722 /** 723 * Light-parse a batch into the given buffer. 724 */ 725 virtual std::pair<bool, int> nextBatchFromFile( 726 PerThreadReadBuf& pt, 727 bool batch_a, 728 unsigned read_idx); 729 730 /** 731 * Reset state to be read for the next file. 732 */ resetForNextFile()733 virtual void resetForNextFile() { 734 eat_ = length_-1; 735 name_prefix_buf_.clear(); 736 beginning_ = true; 737 bufCur_ = 0; 738 last_ = cur_; 739 } 740 741 private: 742 const size_t length_; /// length of reads to generate 743 const size_t freq_; /// frequency to sample reads 744 size_t eat_; /// number of characters we need to skip before 745 /// we have flushed all of the ambiguous or 746 /// non-existent characters out of our read 747 /// window 748 bool beginning_; /// skipping over the first read length? 749 char buf_[1024]; /// FASTA sequence buffer 750 Read::TBuf name_prefix_buf_; /// FASTA sequence name buffer 751 char name_int_buf_[20]; /// for composing offsets for names 752 size_t bufCur_; /// buffer cursor; points to where we should 753 /// insert the next character 754 uint64_t cur_; 755 uint64_t last_; /// number to subtract from readCnt_ to get 756 /// the pat id to output (so it resets to 0 for 757 /// each new sequence) 758 }; 759 760 /** 761 * Read a FASTQ-format file. 762 * See: http://maq.sourceforge.net/fastq.shtml 763 */ 764 class FastqPatternSource : public CFilePatternSource { 765 766 public: 767 FastqPatternSource(const EList<std::string> & infiles,const PatternParams & p,bool interleaved)768 FastqPatternSource( 769 const EList<std::string>& infiles, 770 const PatternParams& p, bool interleaved) : 771 CFilePatternSource(infiles, p), 772 first_(true), 773 interleaved_(interleaved) { } 774 reset()775 virtual void reset() { 776 first_ = true; 777 CFilePatternSource::reset(); 778 } 779 780 /** 781 * Finalize FASTQ parsing outside critical section. 782 */ 783 virtual bool parse(Read& ra, Read& rb, TReadId rdid) const; 784 785 protected: 786 787 /** 788 * Light-parse a batch into the given buffer. 789 */ 790 virtual std::pair<bool, int> nextBatchFromFile( 791 PerThreadReadBuf& pt, 792 bool batch_a, 793 unsigned read_idx); 794 795 /** 796 * Reset state to be ready for the next file. 797 */ resetForNextFile()798 virtual void resetForNextFile() { 799 first_ = true; 800 } 801 802 bool first_; // parsing first read in file 803 bool interleaved_; // fastq reads are interleaved 804 }; 805 806 class BAMPatternSource : public CFilePatternSource { 807 struct hdr_t { 808 uint8_t id1; 809 uint8_t id2; 810 uint8_t cm; 811 uint8_t flg; 812 uint32_t mtime; 813 uint8_t xfl; 814 uint8_t os; 815 uint16_t xlen; 816 }; 817 818 struct ftr_t { 819 uint32_t crc32; 820 uint32_t isize; 821 }; 822 823 struct BGZF { 824 hdr_t hdr; 825 uint8_t cdata[1 << 16]; 826 ftr_t ftr; 827 }; 828 829 struct orphan_mate_t { orphan_mate_torphan_mate_t830 orphan_mate_t() : 831 data(NULL), 832 size(0), 833 cap(0), 834 hash(0) {} 835 resetorphan_mate_t836 void reset() { 837 size = 0; 838 hash = 0; 839 } 840 emptyorphan_mate_t841 bool empty() const { 842 return size == 0; 843 } 844 845 uint8_t* data; 846 uint16_t size; 847 uint16_t cap; 848 uint32_t hash; 849 }; 850 851 struct BAMField { 852 enum aln_rec_field_name { 853 refID, 854 pos, 855 l_read_name, 856 mapq, 857 bin, 858 n_cigar_op, 859 flag, 860 l_seq, 861 next_refID, 862 next_pos, 863 tlen, 864 read_name, 865 }; 866 }; 867 868 public: 869 BAMPatternSource(const EList<std::string> & infiles,const PatternParams & p)870 BAMPatternSource( 871 const EList<std::string>& infiles, 872 const PatternParams& p) : 873 CFilePatternSource(infiles, p), 874 first_(true), 875 bam_batches_(p.nthreads), 876 bam_batch_indexes_(p.nthreads), 877 orphan_mates(p.nthreads * 2), 878 orphan_mates_mutex_(), 879 pp_(p) { 880 // uncompressed size of BGZF block is limited to 2**16 bytes 881 for (size_t i = 0; i < bam_batches_.size(); ++i) { 882 bam_batches_[i].reserve(1 << 16); 883 } 884 } 885 reset()886 virtual void reset() { 887 first_ = true; 888 CFilePatternSource::reset(); 889 } 890 891 /** 892 * Finalize BAM parsing outside critical section. 893 */ 894 virtual bool parse(Read& ra, Read& rb, TReadId rdid) const; 895 ~BAMPatternSource()896 ~BAMPatternSource() { 897 for (size_t i = 0; i < orphan_mates.size(); i++) { 898 if (orphan_mates[i].data != NULL) { 899 delete[] orphan_mates[i].data; 900 } 901 } 902 } 903 904 905 protected: 906 907 virtual std::pair<bool, int> nextBatch(PerThreadReadBuf& pt, bool batch_a, bool lock = true); 908 909 uint16_t nextBGZFBlockFromFile(BGZF& block); 910 911 /** 912 * Reset state to be ready for the next file. 913 */ resetForNextFile()914 virtual void resetForNextFile() { 915 first_ = true; 916 } 917 918 bool first_; // parsing first read in file 919 920 private: nextBatchFromFile(PerThreadReadBuf &,bool,unsigned)921 virtual std::pair<bool, int> nextBatchFromFile(PerThreadReadBuf&, bool, unsigned) { 922 return make_pair(true, 0); 923 } 924 925 int decompress_bgzf_block(uint8_t *dst, size_t dst_len, uint8_t *src, size_t src_len); 926 std::pair<bool, int> get_alignments(PerThreadReadBuf& pt, bool batch_a, unsigned& readi, bool lock); 927 void store_orphan_mate(const uint8_t* read, size_t read_len); 928 void get_orphaned_pairs(EList<Read>& buf_a, EList<Read>& buf_b, const size_t max_buf, unsigned& readi); 929 void get_or_store_orhaned_mate(EList<Read>& buf_a, EList<Read>& buf_b, unsigned& readi, const uint8_t *mate, size_t mate_len); 930 size_t get_matching_read(const uint8_t* rec); 931 932 static const int offset[]; 933 static const uint8_t EOF_MARKER[]; 934 935 std::vector<std::vector<uint8_t> > bam_batches_; 936 std::vector<size_t> bam_batch_indexes_; 937 std::vector<orphan_mate_t> orphan_mates; 938 MUTEX_T orphan_mates_mutex_; 939 940 PatternParams pp_; 941 }; 942 943 /** 944 * Read a Raw-format file (one sequence per line). No quality strings 945 * allowed. All qualities are assumed to be 'I' (40 on the Phred-33 946 * scale). 947 */ 948 class RawPatternSource : public CFilePatternSource { 949 950 public: 951 RawPatternSource(const EList<std::string> & infiles,const PatternParams & p)952 RawPatternSource( 953 const EList<std::string>& infiles, 954 const PatternParams& p) : 955 CFilePatternSource(infiles, p), first_(true) { } 956 reset()957 virtual void reset() { 958 first_ = true; 959 CFilePatternSource::reset(); 960 } 961 962 /** 963 * Finalize raw parsing outside critical section. 964 */ 965 virtual bool parse(Read& ra, Read& rb, TReadId rdid) const; 966 967 protected: 968 969 /** 970 * Light-parse a batch into the given buffer. 971 */ 972 virtual std::pair<bool, int> nextBatchFromFile( 973 PerThreadReadBuf& pt, 974 bool batch_a, 975 unsigned read_idx); 976 977 /** 978 * Reset state to be ready for the next file. 979 */ resetForNextFile()980 virtual void resetForNextFile() { 981 first_ = true; 982 } 983 984 private: 985 986 bool first_; 987 }; 988 989 /** 990 * Abstract parent class for synhconized sources of paired-end reads 991 * (and possibly also single-end reads). 992 */ 993 class PatternComposer { 994 public: PatternComposer(const PatternParams & p)995 PatternComposer(const PatternParams& p) : mutex_m() { } 996 ~PatternComposer()997 virtual ~PatternComposer() { } 998 999 virtual void reset() = 0; 1000 1001 /** 1002 * Member function override by concrete, format-specific classes. 1003 */ 1004 virtual std::pair<bool, int> nextBatch(PerThreadReadBuf& pt) = 0; 1005 1006 /** 1007 * Make appropriate call into the format layer to parse individual read. 1008 */ 1009 virtual bool parse(Read& ra, Read& rb, TReadId rdid) = 0; 1010 1011 /** 1012 * Given the values for all of the various arguments used to specify 1013 * the read and quality input, create a list of pattern sources to 1014 * dispense them. 1015 */ 1016 static PatternComposer* setupPatternComposer( 1017 const EList<std::string>& si, // singles, from argv 1018 const EList<std::string>& m1, // mate1's, from -1 arg 1019 const EList<std::string>& m2, // mate2's, from -2 arg 1020 const EList<std::string>& m12, // both mates on each line, from --12 1021 const EList<std::string>& q, // qualities associated with singles 1022 const EList<std::string>& q1, // qualities associated with m1 1023 const EList<std::string>& q2, // qualities associated with m2 1024 #ifdef USE_SRA 1025 const EList<string>& sra_accs, // SRA accessions 1026 #endif 1027 PatternParams& p, // read-in params 1028 bool verbose); // be talkative? 1029 1030 protected: 1031 1032 static void free_EList_pmembers(const EList<PatternSource*>&); 1033 1034 /// Lock enforcing mutual exclusion for (a) file I/O, (b) writing fields 1035 /// of this or another other shared object. 1036 MUTEX_T mutex_m; 1037 }; 1038 1039 /** 1040 * Encapsulates a synchronized source of both paired-end reads and 1041 * unpaired reads, where the paired-end must come from parallel files. 1042 */ 1043 class SoloPatternComposer : public PatternComposer { 1044 1045 public: 1046 SoloPatternComposer(const EList<PatternSource * > * src,const PatternParams & p)1047 SoloPatternComposer( 1048 const EList<PatternSource*>* src, 1049 const PatternParams& p) : 1050 PatternComposer(p), 1051 cur_(0), 1052 src_(src) 1053 { 1054 assert(src_ != NULL); 1055 lock_ = p.nthreads > 1; 1056 for(size_t i = 0; i < src_->size(); i++) { 1057 assert((*src_)[i] != NULL); 1058 } 1059 } 1060 ~SoloPatternComposer()1061 virtual ~SoloPatternComposer() { 1062 free_EList_pmembers(*src_); 1063 delete src_; 1064 } 1065 1066 /** 1067 * Reset this object and all the PatternSources under it so that 1068 * the next call to nextBatch gets the very first read. 1069 */ reset()1070 virtual void reset() { 1071 for(size_t i = 0; i < src_->size(); i++) { 1072 (*src_)[i]->reset(); 1073 } 1074 cur_ = 0; 1075 } 1076 1077 /** 1078 * Calls member functions of the individual PatternSource objects 1079 * to get more reads. Since there's no need to keep two separate 1080 * files in sync (as there is for DualPatternComposer), 1081 * synchronization can be handed by the PatternSource contained 1082 * in the src_ field. 1083 */ 1084 virtual std::pair<bool, int> nextBatch(PerThreadReadBuf& pt); 1085 1086 /** 1087 * Make appropriate call into the format layer to parse individual read. 1088 */ parse(Read & ra,Read & rb,TReadId rdid)1089 virtual bool parse(Read& ra, Read& rb, TReadId rdid) { 1090 return (*src_)[0]->parse(ra, rb, rdid); 1091 } 1092 1093 protected: 1094 volatile bool lock_; 1095 volatile size_t cur_; // current element in parallel srca_, srcb_ vectors 1096 const EList<PatternSource*>* src_; /// PatternSources for paired-end reads 1097 }; 1098 1099 /** 1100 * Encapsulates a synchronized source of both paired-end reads and 1101 * unpaired reads, where the paired-end must come from parallel files. 1102 */ 1103 class DualPatternComposer : public PatternComposer { 1104 1105 public: 1106 DualPatternComposer(const EList<PatternSource * > * srca,const EList<PatternSource * > * srcb,const PatternParams & p)1107 DualPatternComposer( 1108 const EList<PatternSource*>* srca, 1109 const EList<PatternSource*>* srcb, 1110 const PatternParams& p) : 1111 PatternComposer(p), cur_(0), srca_(srca), srcb_(srcb) 1112 { 1113 assert(srca_ != NULL); 1114 assert(srcb_ != NULL); 1115 // srca_ and srcb_ must be parallel 1116 assert_eq(srca_->size(), srcb_->size()); 1117 lock_ = p.nthreads > 1; 1118 for(size_t i = 0; i < srca_->size(); i++) { 1119 // Can't have NULL first-mate sources. Second-mate sources 1120 // can be NULL, in the case when the corresponding first- 1121 // mate source is unpaired. 1122 assert((*srca_)[i] != NULL); 1123 for(size_t j = 0; j < srcb_->size(); j++) { 1124 assert_neq((*srca_)[i], (*srcb_)[j]); 1125 } 1126 } 1127 } 1128 ~DualPatternComposer()1129 virtual ~DualPatternComposer() { 1130 free_EList_pmembers(*srca_); 1131 delete srca_; 1132 free_EList_pmembers(*srcb_); 1133 delete srcb_; 1134 } 1135 1136 /** 1137 * Reset this object and all the PatternSources under it so that 1138 * the next call to nextBatch gets the very first read. 1139 */ reset()1140 virtual void reset() { 1141 for(size_t i = 0; i < srca_->size(); i++) { 1142 (*srca_)[i]->reset(); 1143 if((*srcb_)[i] != NULL) { 1144 (*srcb_)[i]->reset(); 1145 } 1146 } 1147 cur_ = 0; 1148 } 1149 1150 /** 1151 * Calls member functions of the individual PatternSource objects 1152 * to get more reads. Since we need to keep the two separate 1153 * files in sync, synchronization can be handed at this level, with 1154 * one critical section surrounding both calls into the srca_ and 1155 * srcb_ member functions. 1156 */ 1157 virtual std::pair<bool, int> nextBatch(PerThreadReadBuf& pt); 1158 1159 /** 1160 * Make appropriate call into the format layer to parse individual read. 1161 */ parse(Read & ra,Read & rb,TReadId rdid)1162 virtual bool parse(Read& ra, Read& rb, TReadId rdid) { 1163 return (*srca_)[0]->parse(ra, rb, rdid); 1164 } 1165 1166 protected: 1167 1168 volatile bool lock_; 1169 volatile size_t cur_; // current element in parallel srca_, srcb_ vectors 1170 const EList<PatternSource*>* srca_; // for 1st matesunpaired 1171 const EList<PatternSource*>* srcb_; // for 2nd mates 1172 }; 1173 1174 /** 1175 * Encapsulates a single thread's interaction with the PatternSource. 1176 * Most notably, this class holds the buffers into which the 1177 * PatterSource will write sequences. This class is *not* threadsafe 1178 * - it doesn't need to be since there's one per thread. PatternSource 1179 * is thread-safe. 1180 */ 1181 class PatternSourcePerThread { 1182 1183 public: 1184 PatternSourcePerThread(PatternComposer & composer,const PatternParams & pp,int tid)1185 PatternSourcePerThread( 1186 PatternComposer& composer, 1187 const PatternParams& pp, int tid) : 1188 composer_(composer), 1189 buf_(pp.max_buf, tid), 1190 pp_(pp), 1191 last_batch_(false), 1192 last_batch_size_(0) { } 1193 1194 /** 1195 * Use objects in the PatternSource and/or PatternComposer 1196 * hierarchies to populate the per-thread buffers. 1197 */ 1198 std::pair<bool, bool> nextReadPair(); 1199 read_a()1200 Read& read_a() { return buf_.read_a(); } read_b()1201 Read& read_b() { return buf_.read_b(); } 1202 read_a()1203 const Read& read_a() const { return buf_.read_a(); } read_b()1204 const Read& read_b() const { return buf_.read_b(); } 1205 1206 private: 1207 1208 /** 1209 * When we've finished fully parsing and dishing out reads in 1210 * the current batch, we go get the next one by calling into 1211 * the composition layer. 1212 */ nextBatch()1213 std::pair<bool, int> nextBatch() { 1214 buf_.reset(); 1215 std::pair<bool, int> res = composer_.nextBatch(buf_); 1216 buf_.init(); 1217 return res; 1218 } 1219 1220 /** 1221 * Once name/sequence/qualities have been parsed for an 1222 * unpaired read, set all the other key fields of the Read 1223 * struct. 1224 */ 1225 void finalize(Read& ra); 1226 1227 /** 1228 * Once name/sequence/qualities have been parsed for a 1229 * paired-end read, set all the other key fields of the Read 1230 * structs. 1231 */ 1232 void finalizePair(Read& ra, Read& rb); 1233 1234 /** 1235 * Call into composition layer (which in turn calls into 1236 * format layer) to parse the read. 1237 */ parse(Read & ra,Read & rb)1238 bool parse(Read& ra, Read& rb) { 1239 return composer_.parse(ra, rb, buf_.rdid()); 1240 } 1241 trim(Read & r)1242 void trim(Read& r) { 1243 if (pp_.trimTo.second > 0) { 1244 switch (pp_.trimTo.first) { 1245 case 3: 1246 if (r.patFw.length() > pp_.trimTo.second) { 1247 r.trimmed5 = r.patFw.length() - pp_.trimTo.second; 1248 r.patFw.trimEnd(r.trimmed5); 1249 r.qual.trimEnd(r.trimmed5); 1250 } 1251 break; 1252 case 5: 1253 if (r.patFw.length() > pp_.trimTo.second) { 1254 r.trimmed3 = r.patFw.length() - pp_.trimTo.second; 1255 r.patFw.trimBegin(r.trimmed3); 1256 r.qual.trimBegin(r.trimmed3); 1257 } 1258 break; 1259 } 1260 } 1261 } 1262 1263 PatternComposer& composer_; // pattern composer 1264 PerThreadReadBuf buf_; // read data buffer 1265 const PatternParams& pp_; // pattern-related parameters 1266 bool last_batch_; // true if this is final batch 1267 int last_batch_size_; // # reads read in previous batch 1268 }; 1269 1270 /** 1271 * Abstract parent factory for PatternSourcePerThreads. 1272 */ 1273 class PatternSourcePerThreadFactory { 1274 public: PatternSourcePerThreadFactory(PatternComposer & composer,const PatternParams & pp,int tid)1275 PatternSourcePerThreadFactory( 1276 PatternComposer& composer, 1277 const PatternParams& pp, int tid) : 1278 composer_(composer), 1279 pp_(pp), 1280 tid_(tid) { } 1281 1282 /** 1283 * Create a new heap-allocated PatternSourcePerThreads. 1284 */ create()1285 virtual PatternSourcePerThread* create() const { 1286 return new PatternSourcePerThread(composer_, pp_, tid_); 1287 } 1288 1289 /** 1290 * Create a new heap-allocated vector of heap-allocated 1291 * PatternSourcePerThreads. 1292 */ create(uint32_t n)1293 virtual EList<PatternSourcePerThread*>* create(uint32_t n) const { 1294 EList<PatternSourcePerThread*>* v = new EList<PatternSourcePerThread*>; 1295 for(size_t i = 0; i < n; i++) { 1296 v->push_back(new PatternSourcePerThread(composer_, pp_, tid_)); 1297 assert(v->back() != NULL); 1298 } 1299 return v; 1300 } 1301 ~PatternSourcePerThreadFactory()1302 virtual ~PatternSourcePerThreadFactory() {} 1303 1304 private: 1305 /// Container for obtaining paired reads from PatternSources 1306 PatternComposer& composer_; 1307 const PatternParams& pp_; 1308 int tid_; 1309 }; 1310 1311 #ifdef USE_SRA 1312 1313 namespace ngs { 1314 class ReadCollection; 1315 class ReadIterator; 1316 } 1317 1318 /** 1319 * Pattern source for reading directly from the SRA archive. 1320 */ 1321 class SRAPatternSource : public PatternSource { 1322 public: SRAPatternSource(const EList<string> & sra_accs,const PatternParams & p)1323 SRAPatternSource( 1324 const EList<string>& sra_accs, 1325 const PatternParams& p) : 1326 PatternSource(p), 1327 sra_accs_(sra_accs), 1328 sra_acc_cur_(0), 1329 cur_(0), 1330 first_(true), 1331 sra_its_(p.nthreads), 1332 mutex_m(), 1333 pp_(p) 1334 { 1335 assert_gt(sra_accs_.size(), 0); 1336 errs_.resize(sra_accs_.size()); 1337 errs_.fill(0, sra_accs_.size(), false); 1338 open(); // open first file in the list 1339 sra_acc_cur_++; 1340 } 1341 ~SRAPatternSource()1342 virtual ~SRAPatternSource() { 1343 for (size_t i = 0; i < sra_its_.size(); i++) { 1344 if(sra_its_[i] != NULL) { 1345 delete sra_its_[i]; 1346 sra_its_[i] = NULL; 1347 } 1348 } 1349 } 1350 1351 /** 1352 * Fill Read with the sequence, quality and name for the next 1353 * read in the list of read files. This function gets called by 1354 * all the search threads, so we must handle synchronization. 1355 * 1356 * Returns pair<bool, int> where bool indicates whether we're 1357 * completely done, and int indicates how many reads were read. 1358 */ 1359 virtual std::pair<bool, int> nextBatch( 1360 PerThreadReadBuf& pt, 1361 bool batch_a, 1362 bool lock); 1363 1364 /** 1365 * Finishes parsing outside the critical section 1366 */ 1367 virtual bool parse(Read& ra, Read& rb, TReadId rdid) const; 1368 1369 /** 1370 * Reset so that next call to nextBatch* gets the first batch. 1371 * Should only be called by the master thread. 1372 */ reset()1373 virtual void reset() { 1374 PatternSource::reset(); 1375 sra_acc_cur_ = 0, 1376 open(); 1377 sra_acc_cur_++; 1378 } 1379 1380 protected: 1381 1382 std::pair<bool, int> nextBatchImpl( 1383 PerThreadReadBuf& pt, 1384 bool batch_a); 1385 1386 /** 1387 * Open the next file in the list of input files. 1388 */ 1389 void open(); 1390 1391 EList<string> sra_accs_; // filenames for read files 1392 EList<bool> errs_; // whether we've already printed an error for each file 1393 size_t sra_acc_cur_; // index into infiles_ of next file to read 1394 size_t cur_; // current read id 1395 bool first_; 1396 1397 std::vector<ngs::ReadIterator*> sra_its_; 1398 1399 /// Lock enforcing mutual exclusion for (a) file I/O, (b) writing fields 1400 /// of this or another other shared object. 1401 MUTEX_T mutex_m; 1402 1403 PatternParams pp_; 1404 }; 1405 1406 #endif 1407 1408 #endif /*PAT_H_*/ 1409