1 #ifndef LOCUS_READERS_H
2 #define LOCUS_READERS_H
3 
4 #include "constants.h"
5 #include "MetaPopInfo.h"
6 #include "BamI.h"
7 #include "locus.h"
8 #include "Vcf.h"
9 
10 // BamPopInfo : class to handle BAM read groups and interface
11 // them with MetaPopInfo Samples.
12 class BamPopInfo {
13     unique_ptr<MetaPopInfo> mpopi_; // (Pointer because MetaPopInfo is immovable.)
14 
15     // For read group-based identification.
16     typedef map<string,size_t> RGMap;
17     vector<RGMap> read_groups_;
18     vector<map<const char*, RGMap::iterator, LessCStrs>> rg_to_sample_;
19 
20     // For file name-based identification.
21     vector<size_t> file_samples_;
22 
23 public:
24     BamPopInfo(const vector<Bam*>& bam_fs);
25     BamPopInfo(const vector<Bam*>& bam_fs, const vector<string>& file_samples);
26     void reassign_ids();
27 
mpopi()28     const MetaPopInfo& mpopi() const {return *mpopi_;}
29     size_t sample_of(const BamRecord& rec, size_t bam_f_i=0);
30 };
31 
32 // BamCLocReader: Class to read catalog loci from files by the de novo pipeline.
33 class BamCLocReader {
34     vector<Bam*> bam_fs_;
35 
36     BamPopInfo bpopi_;
37     vector<size_t> loci_n_components_; // ("nc" field in BAM header @RG lines.)
38 
39     int32_t loc_i_; // Index of the next locus (chromosome). Incremented by read_one_locus().
40 
41     vector<BamRecord> next_records_; // The next available record, for each file.
42 
43 public:
44     BamCLocReader(vector<Bam*>&& bam_fs, const vector<string>& samples={});
~BamCLocReader()45     ~BamCLocReader() {for (Bam* bam_f : bam_fs_) delete bam_f;}
46 
bam_fs()47     const vector<Bam*>& bam_fs() const {return bam_fs_;}
n_loci()48     size_t n_loci() const {return bam_fs_[0]->h().n_ref_chroms();}
target2id(size_t target_i)49     int target2id(size_t target_i) const {return atoi(bam_fs_[0]->h().chrom_str(target_i));}
n_catalog_components_of(size_t target_i)50     size_t n_catalog_components_of(size_t target_i) const {return loci_n_components_.at(target_i);}
tally_n_components()51     size_t tally_n_components() const {return std::accumulate(loci_n_components_.begin(), loci_n_components_.end(), size_t(0));}
mpopi()52     const MetaPopInfo& mpopi() const {return bpopi_.mpopi();}
53 
54     // Reads one locus. Returns false on EOF.
55     bool read_one_locus(CLocReadSet& readset);
56 };
57 
58 // BamCLocBuilder: Class to build catalog loci from third party alignment files.
59 class BamCLocBuilder {
60 public:
61     struct Config {
62         bool paired;
63         size_t max_insert_refsize;
64         size_t min_mapq;
65         double max_clipped;
66         size_t min_reads_per_sample;
67         bool ign_pe_reads;
68         size_t min_samples_per_locus;
69     };
70 
71     struct BamStats {
72         size_t n_records;
73         size_t n_ignored_read2_recs;
74         size_t n_primary;
75         size_t n_primary_mapq;
76         size_t n_primary_softclipped;
77         size_t n_primary_kept_read2;
78         size_t n_secondary;
79         size_t n_supplementary;
80         size_t n_unmapped;
81 
n_primary_keptBamStats82         size_t n_primary_kept() const {return n_primary - n_primary_mapq - n_primary_softclipped;}
n_primary_kept_read1BamStats83         size_t n_primary_kept_read1() const {return n_primary_kept() - n_primary_kept_read2;}
84 
85         BamStats& operator+= (const BamStats&);
86     };
87 
88     struct LocStats {
89         size_t n_loci_built;
90         size_t n_fw_reads;
91         OnlineMeanVar insert_lengths_mv;
92 
n_read_pairsLocStats93         size_t n_read_pairs() const {return insert_lengths_mv.n();}
94     };
95 
96     // Constructs the object from a list of BAM file objects. If `samples` is
97     // given, its size must be the same as that of `bam_fs`, and
98     BamCLocBuilder(vector<Bam*>&& bam_fs, const Config& cfg, const vector<string>& samples={});
~BamCLocBuilder()99     ~BamCLocBuilder() {for(Bam* bam_f : bam_fs_) delete bam_f;}
100 
101     // Reads one locus. Returns false on EOF.
102     bool build_one_locus(CLocAlnSet& readset);
103 
mpopi()104     const MetaPopInfo& mpopi() const {return bpopi_.mpopi();}
bam_fs()105     const vector<Bam*>& bam_fs() const {return bam_fs_;}
bam_stats_per_sample()106     const vector<BamStats>& bam_stats_per_sample() const {return bam_stats_;}
loc_stats()107     const LocStats& loc_stats() const {return loc_stats_;}
108 
109 private:
110     // Structure to store the reads' 5prime positions.
111     struct Pos5 {
112         int32_t chrom;
113         int32_t bp;
114         bool    strand; // Plus is true, minus is false.
115 
116         bool operator<(const Pos5& other) const {
117             if (chrom == other.chrom) {
118                 if (bp == other.bp) {
119                     if (strand == other.strand)
120                         return false; // Equal.
121                     return strand == false; // Minus strand first.
122                 }
123                 return bp < other.bp;
124             }
125             return chrom < other.chrom;
126         }
127     };
128 
129     Config cfg_;
130     vector<Bam*> bam_fs_;
131     BamPopInfo bpopi_;
132     vector<BamStats> bam_stats_;
133     LocStats loc_stats_;
134 
135     bool next_record(size_t bam_f_i);
next_record()136     bool next_record() {return next_record(0);}
137     vector<BamRecord> next_records_; // The next record past the current window, for each file.
138     vector<uchar> treat_next_records_as_fw_; // Whether to treat the above as forward reads. c.f. `cfg_.paired`/--paired.
139 
140     // Buffers to store the reads of the sliding window.
141     bool fill_window();
142     void move_next_record_to_the_window(size_t bam_f_i);
143     typedef size_t SampleIdx;
144     map<Pos5,vector<pair<BamRecord,SampleIdx>>> fw_reads_by_5prime_pos_;
145     std::multimap<Pos5,pair<BamRecord,SampleIdx>> pe_reads_by_5prime_pos_;
146     map<const char*,
147         std::multimap<Pos5,pair<BamRecord,SampleIdx>>::iterator,
148         LessCStrs> pe_reads_by_name_;
149 };
150 
151 class VcfCLocReader {
152     VcfParser vcf_f_;
153     VcfRecord next_rec_;
154     bool eof_;
155  public:
VcfCLocReader()156     VcfCLocReader(): vcf_f_(), next_rec_(), eof_(false) {};
157     VcfCLocReader(const string& vcf_path);
158 
159     int  open(string &vcf_path);
header()160     const VcfHeader& header() const {return vcf_f_.header();}
161     void set_sample_ids(MetaPopInfo& mpopi) const;
162 
163     // Reads one locus. Returns false on EOF.
164     bool read_one_locus(vector<VcfRecord>& records);
165 };
166 
167 //
168 // Inline definitions
169 // ==================
170 //
171 
172 inline
BamPopInfo(const vector<Bam * > & bam_fs)173 BamPopInfo::BamPopInfo(const vector<Bam*>& bam_fs)
174 :
175     mpopi_(new MetaPopInfo()),
176     read_groups_(bam_fs.size()),
177     rg_to_sample_(bam_fs.size())
178 {
179     if (bam_fs.empty())
180         DOES_NOT_HAPPEN;
181 
182     struct SampleData {
183         string rg;
184         string name;
185         int stacks_id;
186     };
187     vector<vector<SampleData>> readgroups_per_f  (bam_fs.size());
188 
189     for (size_t bam_f_i=0; bam_f_i<bam_fs.size(); ++bam_f_i) {
190         Bam* bam_f = bam_fs[bam_f_i];
191         size_t line = 1;
192     try {
193         // Parse the @RG header lines.
194         const char* p = bam_f->h().text();
195         string rg;
196         string name;
197         int stacks_id = -1;
198         while (true) {
199             if (strncmp(p, "@RG\t", 4) == 0) {
200                 rg.clear();
201                 name.clear();
202                 stacks_id = -1;
203                 // Parse the line.
204                 p += 4;
205                 while (*p && *p != '\n') {
206                     const char* q = p;
207                     while (*p && *p != '\n' && *p != '\t')
208                         ++p;
209 
210                     if (strncmp(q, "ID:", 3) == 0) {
211                         rg.assign(q+3, p);
212                     } else if (strncmp(q, "SM:", 3) == 0) {
213                         name.assign(q+3, p);
214                     } else if (strncmp(q, "id:", 3) == 0) {
215                         char* end;
216                         stacks_id = strtol(q+3, &end, 10);
217                         if (end != p)
218                             throw exception();
219                     }
220                     if (*p == '\t')
221                         ++p;
222                 }
223                 if (rg.empty()) {
224                     cerr << "Error: @RG line has no ID (identifier) field.\n";
225                     throw exception();
226                 } else if (name.empty()) {
227                     cerr << "Error: @RG line has no SM (sample name) field.\n";
228                     throw exception();
229                 }
230                 // Record the read group.
231                 readgroups_per_f[bam_f_i].push_back({move(rg), move(name), stacks_id});
232             }
233             p = strchr(p, '\n');
234             if (p == NULL)
235                 break;
236             ++p;
237             ++line;
238         }
239     } catch (exception&) {
240         cerr << "Error: Malformed BAM header.\n"
241              << "Error: At header line " << line << " in file '" << bam_f->path << "'.\n";
242         throw;
243     }}
244 
245     //
246     // Initialize all the members.
247     //
248 
249     // Initialize the MetaPopInfo.
250     vector<string> names_all;
251     for (size_t bam_f_i=0; bam_f_i<bam_fs.size(); ++bam_f_i)
252         for (const SampleData& d : readgroups_per_f[bam_f_i])
253             names_all.push_back(d.name);
254     std::sort(names_all.begin(), names_all.end());
255     names_all.erase(std::unique(names_all.begin(), names_all.end()), names_all.end());
256     mpopi_->init_names(names_all);
257 
258     // Set the sample IDs. We allow overwriting.
259     for (size_t bam_f_i=0; bam_f_i<bam_fs.size(); ++bam_f_i)
260         for (const SampleData& d : readgroups_per_f[bam_f_i])
261             if (d.stacks_id != -1)
262                 mpopi_->set_sample_id(mpopi_->get_sample_index(d.name), d.stacks_id);
263 
264     // Initialize `read_groups_`, `rg_to_sample_`.
265     for (size_t bam_f_i=0; bam_f_i<bam_fs.size(); ++bam_f_i) {
266         for (SampleData& d : readgroups_per_f[bam_f_i]) {
267             auto rv = read_groups_[bam_f_i].emplace(d.rg, mpopi_->get_sample_index(d.name));
268             if (rv.second)
269                 rg_to_sample_[bam_f_i].emplace(rv.first->first.c_str(), rv.first);
270         }
271     }
272 
273     //
274     // Run checks.
275     //
276 
277     // Check that all the files have read groups.
278     bool no_rg = false;
279     for (size_t bam_f_i=0; bam_f_i<bam_fs.size(); ++bam_f_i) {
280         if (readgroups_per_f[bam_f_i].empty()) {
281             no_rg = true;
282             cerr << "Error: No read group(s) in BAM file '"
283                  << bam_fs[bam_f_i]->path << "'.\n";
284         }
285     }
286     if (no_rg)
287         throw exception();
288 
289     // Check that duplicated readgroup IDs and sample names appear in a single
290     // (ID,NAME) combination.
291     map<string,pair<string,size_t>> name_to_rg;
292     map<string,pair<string,size_t>> rg_to_name;
293     for (size_t bam_f_i=0; bam_f_i<bam_fs.size(); ++bam_f_i) {
294         for (const SampleData& d : readgroups_per_f[bam_f_i]) {
295             auto rv1 = name_to_rg.insert({d.name, {d.rg, bam_f_i}});
296             auto rv2 = rg_to_name.insert({d.rg, {d.name, bam_f_i}});
297             if (!rv1.second && rv1.first->second.first != d.rg) {
298                 const pair<string,pair<string,size_t>>& n2r = *rv1.first;
299                 cerr << "Error: Incompatible read groups (ID:" << d.rg << ", SM:"
300                      << d.name << ") and (ID:" << n2r.second.first << ", SM:"
301                      << n2r.first << ").\n"
302                      << "Error: In BAM files '" << bam_fs[bam_f_i]->path
303                      << "' and '" << bam_fs[n2r.second.second]->path << ".\n";
304                 throw exception();
305             }
306             if (!rv2.second && rv2.first->second.first != d.name) {
307                 const pair<string,pair<string,size_t>>& r2n = *rv2.first;
308                 cerr << "Error: Incompatible read groups (ID:" << d.rg << ", SM:"
309                      << d.name << ") and (ID:" << r2n.first << ", SM:"
310                      << r2n.second.first << ").\n"
311                      << "Error: In BAM files '" << bam_fs[bam_f_i]->path
312                      << "' and '" << bam_fs[r2n.second.second]->path << ".\n";
313                 throw exception();
314             }
315         }
316     }
317 }
318 
319 inline
BamPopInfo(const vector<Bam * > & bam_fs,const vector<string> & file_samples)320 BamPopInfo::BamPopInfo(const vector<Bam*>& bam_fs, const vector<string>& file_samples)
321 :
322     mpopi_(new MetaPopInfo()),
323     file_samples_(bam_fs.size())
324 {
325     if (bam_fs.size() != file_samples.size())
326         DOES_NOT_HAPPEN;
327 
328     mpopi_->init_names(file_samples);
329     for (size_t bam_f_i=0; bam_f_i<bam_fs.size(); ++bam_f_i)
330         file_samples_[bam_f_i] = mpopi_->get_sample_index(file_samples[bam_f_i]);
331 }
332 
333 inline
reassign_ids()334 void BamPopInfo::reassign_ids() {
335     for (size_t i=0; i<mpopi_->samples().size(); ++i)
336         mpopi_->set_sample_id(i, i+1);
337 }
338 
339 inline
sample_of(const BamRecord & rec,size_t bam_f_i)340 size_t BamPopInfo::sample_of(const BamRecord& rec, size_t bam_f_i)
341 {
342     size_t sample;
343     if (file_samples_.empty()) {
344         const char* rg = rec.read_group();
345         if (rg == NULL) {
346             cerr << "Error: Missing read group for BAM record '" << rec.qname() << "'.\n";
347             throw exception();
348         }
349         try {
350             sample = rg_to_sample_.at(bam_f_i).at(rg)->second;
351         } catch(std::out_of_range&) {
352             cerr << "Error: BAM record '" << rec.qname() << "' has read group '" << rg
353                  << "', which was not declared in the header.\n";
354             throw exception();
355         }
356     } else {
357         sample = file_samples_.at(bam_f_i);
358     }
359     assert(sample < mpopi_->samples().size());
360     return sample;
361 }
362 
363 inline
BamCLocReader(vector<Bam * > && bam_fs,const vector<string> & samples)364 BamCLocReader::BamCLocReader(vector<Bam*>&& bam_fs, const vector<string>& samples)
365 :
366     bam_fs_(move(bam_fs)),
367     bpopi_(samples.empty() ? BamPopInfo(bam_fs_) : BamPopInfo(bam_fs_, samples)),
368     loc_i_(-1),
369     next_records_(bam_fs_.size())
370 {
371     // Check that headers are consistent.
372     size_t bam_f_i=1;
373     if (bam_fs_.empty())
374         DOES_NOT_HAPPEN;
375     try {
376         for (; bam_f_i<bam_fs_.size(); ++bam_f_i)
377             BamHeader::check_same_ref_chroms(bam_fs_[0]->h(), bam_fs_[bam_f_i]->h());
378     } catch (exception& e) {
379         cerr << "Error: Inconsistent BAM headers; in files '" << bam_fs_[0]->path
380              << "' and '" << bam_fs_[bam_f_i]->path << "'.\n";
381         throw;
382     }
383 
384     // Check that Stacks sample IDs are present.
385     for (const Sample& s : mpopi().samples())
386         if (s.id == -1)
387             cerr << "WARNING: Sample '" << s.name << "' is missing its (integer) ID; was tsv2bam run correctly?!\n";
388 
389     //
390     // Retrieve the number of components that make up each catalog locus.
391     loci_n_components_.reserve(bam_fs_[0]->h().n_ref_chroms());
392     const char* p = bam_fs_[0]->h().text();
393     size_t line = 1;
394     while (true) {
395         if (strncmp(p, "@SQ\t", 4) == 0) {
396             loci_n_components_.push_back(0);
397             p += 3;
398             while (*p && *p != '\n') {
399                 assert(*p == '\t');
400                 ++p;
401                 if (strncmp(p, "nc:", 3) == 0) {
402                     loci_n_components_.back() = atoi(p+3);
403                     break; // Skip the rest of the line.
404                 }
405                 while (*p && *p != '\n' && *p != '\t')
406                     ++p;
407             }
408             if (loci_n_components_.back() == 0) {
409                 cerr << "Error: At BAM header line " << line
410                      << ": field \"nc:\", number components of the catalog locus,"
411                      << " is missing or invalid.\n"
412                      << "Error: In file '" << bam_fs_[0]->path << "'\n";
413                 throw exception();
414             }
415         }
416         p = strchr(p, '\n');
417         if (p == NULL)
418             break;
419         ++p;
420         ++line;
421     }
422     if(loci_n_components_.size() != bam_fs_[0]->h().n_ref_chroms())
423         DOES_NOT_HAPPEN;
424 }
425 
426 inline
read_one_locus(CLocReadSet & readset)427 bool BamCLocReader::read_one_locus(CLocReadSet& readset) {
428     assert(&readset.mpopi() == &mpopi()); // Otherwise sample indexes may be misleading.
429 
430     ++loc_i_;
431     if (loc_i_ == 0) {
432         // Read the first records.
433         for (size_t bam_f_i=0; bam_f_i<bam_fs_.size(); ++bam_f_i) {
434             Bam* bam_f = bam_fs_[bam_f_i];
435             BamRecord& rec = next_records_[bam_f_i];
436             if (!bam_f->next_record_ordered(rec)) {
437                 cerr << "Error: No records in BAM file '" << bam_f->path << "'.\n";
438                 throw exception();
439             } else if (!rec.is_primary()) {
440                 cerr << "Error: BAM file '" << bam_f->path
441                      << "' unexpectedly contains non-primary records.\n";
442                 throw exception();
443             }
444         }
445     } else if (loc_i_ == int32_t(n_loci())) {
446         #ifndef NDEBUG
447         for (Bam* bam_f : bam_fs_)
448             assert(bam_f->eof());
449         #endif
450         return false;
451     }
452 
453     readset.clear();
454     readset.bam_i(loc_i_);
455     readset.id(atoi(bam_fs_[0]->h().chrom_str(loc_i_)));
456 
457     for (size_t bam_f_i=0; bam_f_i<bam_fs_.size(); ++bam_f_i) {
458         Bam* bam_f = bam_fs_[bam_f_i];
459         BamRecord& rec = next_records_[bam_f_i];
460 
461         // Read all the reads of the locus, and one more.
462         while (!bam_f->eof() && rec.chrom() == loc_i_) {
463             if (rec.is_read2())
464                 readset.add_pe(SRead(Read(rec.seq(), string(rec.qname())+"/2"), bpopi_.sample_of(rec, bam_f_i)));
465             else if (rec.is_read1())
466                 readset.add(SRead(Read(rec.seq(), string(rec.qname())+"/1"), bpopi_.sample_of(rec, bam_f_i)));
467             else
468                 // If tsv2bam wasn't given paired-end reads, no flag was set and the
469                 // read names were left unchanged, so we also don't touch them.
470                 readset.add(SRead(Read(rec.seq(), string(rec.qname())), bpopi_.sample_of(rec, bam_f_i)));
471 
472             bam_f->next_record_ordered(rec);
473         }
474     }
475 
476     if (readset.reads().empty()) {
477         // A catalog locus might not have reads, e.g. if some samples were
478         // omitted in the `samtools merge`, but this would be weird.
479         static bool warn = false;
480         if (warn) {
481             warn = false;
482             cerr << "Warning: Some catalog loci do not have any reads in the BAM file.\n";
483         }
484     }
485     return true;
486 }
487 
488 inline
BamCLocBuilder(vector<Bam * > && bam_fs,const Config & cfg,const vector<string> & samples)489 BamCLocBuilder::BamCLocBuilder(
490         vector<Bam*>&& bam_fs,
491         const Config& cfg,
492         const vector<string>& samples
493 ) :
494     cfg_(cfg),
495     bam_fs_(move(bam_fs)),
496     bpopi_(samples.empty() ? BamPopInfo(bam_fs_) : BamPopInfo(bam_fs_, samples)),
497     bam_stats_(bpopi_.mpopi().samples().size()),
498     loc_stats_(),
499     next_records_(bam_fs_.size()),
500     treat_next_records_as_fw_(bam_fs_.size())
501 {
502     if (!cfg_.paired)
503         cfg_.max_insert_refsize = 0;
504     bpopi_.reassign_ids();
505 
506     size_t bam_f_i=1;
507     if (bam_fs_.empty())
508         DOES_NOT_HAPPEN;
509     try {
510         for (; bam_f_i<bam_fs_.size(); ++bam_f_i)
511             BamHeader::check_same_ref_chroms(bam_fs_[0]->h(), bam_fs_[bam_f_i]->h());
512     } catch (exception& e) {
513         cerr << "Error: Inconsistent BAM headers; in files '" << bam_fs_[0]->path
514              << "' and '" << bam_fs_[bam_f_i]->path << "'.\n";
515         throw;
516     }
517 }
518 
519 inline
520 BamCLocBuilder::BamStats& BamCLocBuilder::BamStats::operator+= (const BamCLocBuilder::BamStats& other)
521 {
522     n_records += other.n_records;
523     n_ignored_read2_recs += other.n_ignored_read2_recs;
524     n_primary += other.n_primary;
525     n_primary_mapq += other.n_primary_mapq;
526     n_primary_softclipped += other.n_primary_softclipped;
527     n_primary_kept_read2 += other.n_primary_kept_read2;
528     n_secondary += other.n_secondary;
529     n_supplementary += other.n_supplementary;
530     n_unmapped += other.n_unmapped;
531     return *this;
532 }
533 
534 inline
next_record(size_t bam_f_i)535 bool BamCLocBuilder::next_record(size_t bam_f_i)
536 {
537     Bam* bam_f = bam_fs_[bam_f_i];
538     BamRecord& r = next_records_[bam_f_i];
539     uchar& treat_as_fw = treat_next_records_as_fw_[bam_f_i];
540 
541     while (true) {
542         if(!bam_f->next_record_ordered(r))
543             return false;
544         BamStats& bstats = bam_stats_[bpopi_.sample_of(r, bam_f_i)];
545         ++bstats.n_records;
546 
547         if (cfg_.ign_pe_reads && r.is_read2()) {
548             ++bstats.n_ignored_read2_recs;
549             continue;
550         }
551 
552         // Check if the record is primary.
553         if (r.is_unmapped()) {
554             ++bstats.n_unmapped;
555             continue;
556         } else if (r.is_secondary()) {
557             ++bstats.n_secondary;
558             continue;
559         } else if (r.is_supplementary()) {
560             ++bstats.n_supplementary;
561             continue;
562         }
563         assert(r.is_primary());
564         ++bstats.n_primary;
565 
566         // Check the MAPQ
567         if (r.mapq() < cfg_.min_mapq) {
568             ++bstats.n_primary_mapq;
569             continue;
570         }
571 
572         // Assess whether this alignment should be treated as a forward read.
573         if (r.is_read2()) {
574             treat_as_fw = cfg_.paired ? false : true;
575         } else {
576             treat_as_fw = true;
577             if ((bstats.n_primary_kept_read2 > 0 || bstats.n_ignored_read2_recs > 0)
578                     && !r.is_read1()) {
579                 static bool emitted = false;
580                 if (!emitted) {
581                     emitted = true;
582                     cerr << "WARNING: Some records (e.g. '" << r.qname()
583                          << "') are missing a READ1/READ2 flag.\n";
584                 }
585             }
586         }
587         // Check that there's a sequence.
588         if (r.hts_l_seq() <= 0) {
589             cerr << "Error: No sequence at BAM record '" << r.qname() << "'.\n";
590             throw exception();
591         }
592 
593         // Check the CIGAR.
594         if (r.hts_n_cigar() == 0) {
595             cerr << "Error: Empty CIGAR at BAM record '" << r.qname() << "'.\n";
596             throw exception();
597         }
598         if (treat_as_fw) {
599             if (r.is_rev_compl()) {
600                 if(BamRecord::cig_op_t(r.hts_cigar()[r.hts_n_cigar()-1]) != BAM_CMATCH) {
601                     // A cutsite is expected, and it is not aligned.
602                     ++bstats.n_primary_softclipped;
603                     continue;
604                 }
605             } else {
606                 if(BamRecord::cig_op_t(r.hts_cigar()[0]) != BAM_CMATCH) {
607                     ++bstats.n_primary_softclipped;
608                     continue;
609                 }
610             }
611         }
612         size_t softclipped = 0;
613         for (size_t i=0; i<r.hts_n_cigar(); ++i)
614             if (BamRecord::cig_op_t(r.hts_cigar()[i]) == BAM_CSOFT_CLIP)
615                 softclipped += BamRecord::cig_op_len(r.hts_cigar()[i]);
616         // xxx Terminal insertions should count as soft-clipping.
617         if ((double) softclipped / r.hts_l_seq() > cfg_.max_clipped) {
618             // Too much soft-clipping.
619             ++bstats.n_primary_softclipped;
620             continue;
621         }
622 
623         // Keep this record (`next_records_` has been updated).
624         if (r.is_read2())
625             ++bstats.n_primary_kept_read2;
626         break;
627     }
628     return true;
629 }
630 
631 inline
move_next_record_to_the_window(size_t bam_f_i)632 void BamCLocBuilder::move_next_record_to_the_window(size_t bam_f_i)
633 {
634     //
635     // See `fill_window()`.
636     // This inserts `next_record_` into `fw_reads_by_5prime_pos_` (if it is a
637     // forward read or treated as such) or `pe_reads_by_5prime_pos_` (if it is
638     // a paired-end read).
639     //
640 
641     BamRecord& rec = next_records_[bam_f_i];
642 
643     // Determine the 5prime position.
644     Pos5 rec_5prime_pos;
645     rec_5prime_pos.chrom = rec.chrom();
646     if (rec.is_rev_compl()) {
647         rec_5prime_pos.strand = false;
648         rec_5prime_pos.bp = rec.pos() + BamRecord::cig_ref_len(rec) - 1;
649     } else {
650         rec_5prime_pos.strand = true;
651         rec_5prime_pos.bp = rec.pos();
652     }
653 
654     // Determine the sample.
655     size_t sample = bpopi_.sample_of(rec, bam_f_i);
656 
657     // Save the record.
658     if (treat_next_records_as_fw_[bam_f_i]) {
659         auto locus_itr = fw_reads_by_5prime_pos_.insert(
660                 std::make_pair(move(rec_5prime_pos), vector<pair<BamRecord,SampleIdx>>())
661                 ).first;
662         locus_itr->second.push_back({move(rec), sample});
663     } else {
664         auto pe_read_itr = pe_reads_by_5prime_pos_.insert(
665                 std::make_pair(move(rec_5prime_pos), std::make_pair(move(rec), move(sample)))
666                 );
667         // Also add an entry in the {pe_read_name : pe_record} map.
668         pe_reads_by_name_.insert( {pe_read_itr->second.first.qname(), pe_read_itr} );
669     }
670 }
671 
672 inline
fill_window()673 bool BamCLocBuilder::fill_window()
674 {
675     /*
676      * This defines the behavior of the sliding window.
677      *
678      * This guarantees that any usable alignment of which the leftmost base
679      * is within `max_insert_size` bases of the first base of the leftmost
680      * cutsite is loaded.
681      *
682      * `fw_reads_by_5prime_pos_` is a map of {[first cutsite base pos] : BamRecord}
683      * Cutsites can be on the left or right of the read alignment depending on the
684      * strand, but this is easy to handle since `BamCLocBuilder::next_record()`
685      * ensures that the BAM record's TLEN field is properly set.
686      *
687      * `pe_reads_by_5prime_pos_` also uses the 5prime position (i.e. that has been
688      * tlen-corrected if the strand is minus) to ease insert lengths computations.
689      * Paired-end reads need to be sorted by position so that we can discard those
690      * that were not matched and have become too far behind to find one at the given
691      * `max_insert_size`. Paired-end records can also be accessed by name (for
692      * mate matching) via `pe_reads_by_name_`.
693      */
694 
695     for (size_t bam_f_i=0; bam_f_i<bam_fs_.size(); ++bam_f_i) {
696         Bam* bam_f = bam_fs_[bam_f_i];
697         BamRecord& rec = next_records_[bam_f_i];
698         try {
699             if (!bam_f->eof()) {
700                 if (rec.empty()) {
701                     if (!next_record(bam_f_i)) {
702                         if (bam_fs_[bam_f_i]->n_records_read() == 0)
703                             cerr << "Error: No BAM records.\n";
704                         else
705                             cerr << "Error: All records were discarded.\n";
706                         throw exception();
707                     }
708                     assert(!rec.empty());
709                 }
710                 while (!bam_f->eof() && fw_reads_by_5prime_pos_.empty()) {
711                     move_next_record_to_the_window(bam_f_i); // (Note: We may have read a paired-end read.)
712                     next_record(bam_f_i);
713                 }
714 
715                 while (!bam_f->eof()
716                         && (
717                             rec.chrom() < fw_reads_by_5prime_pos_.begin()->first.chrom
718                             || (rec.chrom() == fw_reads_by_5prime_pos_.begin()->first.chrom
719                                 && size_t(rec.pos()) <= fw_reads_by_5prime_pos_.begin()->first.bp + cfg_.max_insert_refsize)
720                         )
721                 ) {
722                     move_next_record_to_the_window(bam_f_i);
723                     next_record(bam_f_i);
724                 }
725             }
726         } catch (exception&) {
727             cerr << "Error: (At the " << bam_fs_[bam_f_i]->n_records_read()
728                  << "th record in file '" << bam_fs_[bam_f_i]->path << "'.)\n";
729             throw;
730         }
731     }
732 
733     if (fw_reads_by_5prime_pos_.empty()) {
734         #ifndef NDEBUG
735         for (Bam* bam_f : bam_fs_)
736             assert(bam_f->eof());
737         #endif
738         return false;
739     }
740 
741     if (cfg_.paired) {
742         // Clean up paired-end reads that are too far behind.
743         Pos5 leftmost_cutsite = fw_reads_by_5prime_pos_.begin()->first;
744         while (!pe_reads_by_5prime_pos_.empty()
745                 && (
746                     pe_reads_by_5prime_pos_.begin()->first.chrom < leftmost_cutsite.chrom
747                     || (pe_reads_by_5prime_pos_.begin()->first.chrom == leftmost_cutsite.chrom
748                         && pe_reads_by_5prime_pos_.begin()->first.bp + cfg_.max_insert_refsize < size_t(leftmost_cutsite.bp) + 1)
749                 )
750         ) {
751             pe_reads_by_name_.erase(pe_reads_by_5prime_pos_.begin()->second.first.qname());
752             pe_reads_by_5prime_pos_.erase(pe_reads_by_5prime_pos_.begin());
753         }
754         // N.B. `pe_reads_by_5prime_pos_` may contain reads for further-ranked
755         // chromosomes. This happens when:
756         // * we are reading from multiple BAM,
757         // * the window starts out empty,
758         // * the first BAM file misses loci at the end of the last chromosome,
759         //   that exist in other samples, so that the window jumps back to the
760         //   previous chromosome when we arrive to a sample that still has
761         //   alignments there.
762     } else {
763         assert(pe_reads_by_5prime_pos_.empty());
764     }
765 
766     return true;
767 }
768 
769 inline
build_one_locus(CLocAlnSet & aln_loc)770 bool BamCLocBuilder::build_one_locus(CLocAlnSet& aln_loc)
771 {
772     aln_loc.clear();
773 
774     //
775     // Find the next locus.
776     //
777 
778     vector<SampleIdx> fw_samples;
779     while(true) {
780         //
781         // Fill the window.
782         //
783         if (!fill_window())
784             return false;
785         assert(!fw_reads_by_5prime_pos_.empty());
786 
787         //
788         // Apply filters to the putative locus.
789         //
790 
791         vector<pair<BamRecord,SampleIdx>>& loc_records = fw_reads_by_5prime_pos_.begin()->second;
792 
793         // Get the samples of the records, and tally sample occurrences.
794         vector<size_t> n_reads_per_sample (mpopi().samples().size(), 0);
795         for (const pair<BamRecord,SampleIdx>& r : loc_records)
796             ++n_reads_per_sample[r.second];
797 
798         // Remove reads from samples that have less than `cfg_.min_reads_per_sample` reads.
799         for (pair<BamRecord,SampleIdx>& r : loc_records)
800             if (n_reads_per_sample[r.second] < cfg_.min_reads_per_sample)
801                 // Bad sample, mark the record for deletion.
802                 r.first.destroy();
803         loc_records.erase(std::remove_if(
804                 loc_records.begin(), loc_records.end(),
805                 [] (const pair<BamRecord,SampleIdx>& r) { return r.first.empty(); }
806                 ), loc_records.end());
807 
808         // Check that we have enough samples.
809         if (cfg_.min_samples_per_locus > 1) {
810             fw_samples.clear();
811             for (auto& r : loc_records)
812                 fw_samples.push_back(r.second);
813             std::sort(fw_samples.begin(), fw_samples.end());
814             fw_samples.erase(std::unique(fw_samples.begin(), fw_samples.end()), fw_samples.end());
815             if (fw_samples.size() < cfg_.min_samples_per_locus)
816                 // Not enough, discard the locus.
817                 loc_records.clear();
818         }
819 
820         if (loc_records.empty()) {
821             // Discard the locus; regenerate the window and retry.
822             fw_reads_by_5prime_pos_.erase(fw_reads_by_5prime_pos_.begin());
823             continue;
824         }
825         break;
826     }
827 
828     //
829     // Build the locus object.
830     //
831 
832     // Position.
833     Pos5 cutsite = fw_reads_by_5prime_pos_.begin()->first;
834     PhyLoc loc_pos (bam_fs_.at(0)->h().chrom_str(cutsite.chrom),
835                     cutsite.bp,
836                     cutsite.strand ? strand_plus : strand_minus);
837     size_t max_ref_span = 0;
838 
839     // Forward reads.
840     const vector<pair<BamRecord,SampleIdx>>& fw_records = fw_reads_by_5prime_pos_.begin()->second;
841     vector<SAlnRead> fw_reads;
842     for (const pair<BamRecord,SampleIdx>& pair : fw_records) {
843         const BamRecord& r = pair.first;
844         size_t sample = pair.second;
845 
846         string name (r.qname());
847         if (r.is_read1())
848             name += "/1";
849         else if (r.is_read2())
850             // (Happens when --paired is off.)
851             name += "/2";
852         DNASeq4 seq = r.seq();
853         Cigar cigar = r.cigar();
854         cigar_simplify_to_MDI(cigar);
855 
856         size_t ref_span;
857         if (cutsite.strand == false) {
858             assert(r.is_rev_compl());
859             std::reverse(cigar.begin(), cigar.end());
860             seq = seq.rev_compl();
861 
862             assert(r.pos() <= cutsite.bp);
863             ref_span = cutsite.bp - r.pos() + 1;
864         } else {
865             ref_span = BamRecord::cig_ref_len(r);
866         }
867         if (ref_span > max_ref_span)
868             max_ref_span = ref_span;
869 
870         fw_reads.push_back(SAlnRead(AlnRead(Read(move(seq), move(name)), move(cigar)), sample));
871     }
872     loc_stats_.n_fw_reads += fw_reads.size();
873 
874     // Paired-end reads.
875     vector<SAlnRead> pe_reads;
876     if (!pe_reads_by_5prime_pos_.empty()) {
877         for (auto& fw_pair : fw_records) {
878             const BamRecord& fw_rec = fw_pair.first;
879 
880             // Find a mate.
881             auto pe_name_itr = pe_reads_by_name_.find(fw_rec.qname());
882             if (pe_name_itr == pe_reads_by_name_.end())
883                 continue;
884             std::multimap<Pos5,pair<BamRecord,SampleIdx>>::iterator pe_rec_itr = pe_name_itr->second;
885 
886             Pos5 pe_pos5 = pe_rec_itr->first;
887             const BamRecord& pe_rec = pe_rec_itr->second.first;
888             size_t pe_sample = pe_rec_itr->second.second;
889 
890             // Check that the mate is in the window and that the reads don't
891             // extend past one another. //xxx Oct 2017 @Nick: This may actually
892             // be okay (c.f. the "clocaln::juxtapose" assert failures upon assembly
893             // of the barcode behind READ1), but should we allow it?
894             if (pe_pos5.chrom != cutsite.chrom)
895                 continue;
896             if (pe_pos5.strand == cutsite.strand)
897                 continue;
898             size_t pe_ref_len;
899             if (cutsite.strand == true) {
900                 assert(pe_pos5.bp >= pe_rec.pos()); // The pe read is reverse complemented, so its 5prime is on the right.
901                 pe_ref_len = pe_pos5.bp - pe_rec.pos() + 1;
902                 if (pe_rec.pos() < cutsite.bp
903                         || size_t(pe_pos5.bp - cutsite.bp + 1) > cfg_.max_insert_refsize)
904                     continue;
905             } else {
906                 assert(pe_pos5.bp == pe_rec.pos());
907                 pe_ref_len = BamRecord::cig_ref_len(pe_rec);
908                 if (pe_rec.pos() + pe_ref_len - 1 > size_t(cutsite.bp)
909                         || size_t(cutsite.bp - pe_pos5.bp + 1) > cfg_.max_insert_refsize)
910                     continue;
911             }
912             size_t insert_length = std::labs(pe_pos5.bp -cutsite.bp) + 1;
913             if (insert_length > max_ref_span)
914                 max_ref_span = insert_length;
915 
916             // Check that the read groups are consistent.
917             if (pe_sample != fw_pair.second) {
918                 static bool emitted = false;
919                 if (!emitted) {
920                     cerr << "Warning: Ignoring reads '" << fw_rec.qname() << "' and '"
921                          << pe_rec.qname() << "' that seem to belong to different samples ('"
922                          << mpopi().samples()[fw_pair.second].name << "' and '"
923                          << mpopi().samples()[pe_sample].name << "').\n";
924                     emitted = true;
925                 }
926                 continue;
927             }
928 
929             // Save the paired-end read.
930             // We need to put the paired-end read in the same reference as the
931             // forward reads. (1) BAM records that are part of a locus on the
932             // minus strand should be reverse complemented.
933             // (2) The first reference base in the cigar should be `cutsite.bp`,
934             // regardless of the direction of the locus.
935             string name (pe_rec.qname());
936             assert(pe_rec.is_read2());
937             name += "/2";
938             DNASeq4 seq = pe_rec.seq();
939             Cigar cigar = pe_rec.cigar();
940             cigar_simplify_to_MDI(cigar);
941             if (cutsite.strand == true) {
942                 assert(pe_rec.pos() >= cutsite.bp);
943                 cigar_extend_left(cigar, pe_rec.pos() - cutsite.bp);
944             } else {
945                 assert(size_t(cutsite.bp) >= pe_rec.pos() + pe_ref_len - 1);
946                 cigar_extend_right(cigar, cutsite.bp - (pe_rec.pos() + pe_ref_len - 1));
947                 std::reverse(cigar.begin(), cigar.end());
948                 seq = seq.rev_compl();
949             }
950             pe_reads.push_back(SAlnRead(AlnRead(Read(move(seq), move(name)), move(cigar)), pe_sample));
951 
952             loc_stats_.insert_lengths_mv.increment(insert_length);
953             pe_reads_by_name_.erase(pe_name_itr);
954             pe_reads_by_5prime_pos_.erase(pe_rec_itr);
955         }
956     }
957 
958     // Build the actual object.
959     aln_loc.reinit(++loc_stats_.n_loci_built, loc_pos, &mpopi());
960     aln_loc.ref(DNASeq4(max_ref_span));
961     for (SAlnRead& read : fw_reads) {
962         cigar_extend_right(read.cigar, max_ref_span - cigar_length_ref(read.cigar));
963         aln_loc.add(move(read));
964     }
965     for (SAlnRead& read : pe_reads) {
966         cigar_extend_right(read.cigar, max_ref_span - cigar_length_ref(read.cigar));
967         aln_loc.add(move(read));
968     }
969 
970     fw_reads_by_5prime_pos_.erase(fw_reads_by_5prime_pos_.begin());
971     return true;
972 }
973 
974 inline
VcfCLocReader(const string & vcf_path)975 VcfCLocReader::VcfCLocReader(const string& vcf_path)
976         : vcf_f_(vcf_path),
977           next_rec_(),
978           eof_(false)
979 {
980     // Read the very first record.
981     if(!vcf_f_.next_record(next_rec_))
982         eof_ = true;
983 }
984 
985 inline int
open(string & vcf_path)986 VcfCLocReader::open(string &vcf_path)
987 {
988     this->vcf_f_.open(vcf_path);
989 
990     // Read the very first record.
991     if(!this->vcf_f_.next_record(next_rec_))
992         eof_ = true;
993 
994     return 0;
995 }
996 
997 inline
set_sample_ids(MetaPopInfo & mpopi)998 void VcfCLocReader::set_sample_ids(MetaPopInfo& mpopi) const {
999     //xxx We could write & retrieve actual sample IDs using the VCF header.
1000     for (size_t i = 0; i < mpopi.samples().size(); ++i)
1001         mpopi.set_sample_id(i, i+1); //id=i+1
1002 }
1003 
1004 inline
read_one_locus(vector<VcfRecord> & records)1005 bool VcfCLocReader::read_one_locus(vector<VcfRecord>& records) {
1006     records.clear();
1007     if (eof_)
1008         return false;
1009 
1010     // Parse the locus ID.
1011     static string curr_chrom;
1012     curr_chrom.assign(next_rec_.chrom());
1013     records.push_back(move(next_rec_));
1014 
1015     // Read all the records of the locus, and one more.
1016     while (strcmp(records.back().chrom(), curr_chrom.c_str()) == 0) {
1017         records.push_back(VcfRecord());
1018         if (!vcf_f_.next_record(records.back())) {
1019             eof_ = true;
1020             break;
1021         }
1022     }
1023 
1024     // Remove the first record of the next locus (or the record for which EOF
1025     // was encountered) from the vector.
1026     next_rec_ = move(records.back());
1027     records.pop_back();
1028 
1029     return true;
1030 }
1031 
1032 #endif
1033