1// File Description 2/// \file CompositeBamReader.inl 3/// \brief Inline implementations for the composite BAM readers, for 4/// working with multiple input files. 5// 6// Author: Derek Barnett 7 8#include "pbbam/CompositeBamReader.h" 9#include <algorithm> 10#include <set> 11#include <sstream> 12#include <stdexcept> 13 14#include "pbbam/MakeUnique.h" 15 16namespace PacBio { 17namespace BAM { 18namespace internal { 19 20// ----------------------------------- 21// Merging helpers 22// ----------------------------------- 23 24inline CompositeMergeItem::CompositeMergeItem(std::unique_ptr<BamReader> rdr) 25 : reader{std::move(rdr)} 26{ } 27 28inline CompositeMergeItem::CompositeMergeItem(std::unique_ptr<BamReader> rdr, 29 BamRecord rec) 30 : reader{std::move(rdr)} 31 , record{std::move(rec)} 32{ } 33 34template<typename CompareType> 35inline bool CompositeMergeItemSorter<CompareType>::operator()(const CompositeMergeItem& lhs, 36 const CompositeMergeItem& rhs) 37{ 38 const auto& l = lhs.record; 39 const auto& r = rhs.record; 40 return CompareType()(l, r); 41} 42 43} // namespace internal 44 45// ----------------------------------- 46// GenomicIntervalCompositeBamReader 47// ----------------------------------- 48 49inline GenomicIntervalCompositeBamReader::GenomicIntervalCompositeBamReader(const GenomicInterval& interval, 50 const std::vector<BamFile>& bamFiles) 51{ 52 filenames_.reserve(bamFiles.size()); 53 for(const auto& bamFile : bamFiles) 54 filenames_.push_back(bamFile.Filename()); 55 Interval(interval); 56} 57 58inline GenomicIntervalCompositeBamReader::GenomicIntervalCompositeBamReader(const GenomicInterval& interval, 59 const DataSet& dataset) 60 : GenomicIntervalCompositeBamReader{interval, dataset.BamFiles()} 61{ } 62 63inline bool GenomicIntervalCompositeBamReader::GetNext(BamRecord& record) 64{ 65 // nothing left to read 66 if (mergeItems_.empty()) 67 return false; 68 69 // non-destructive 'pop' of first item from queue 70 auto firstIter = mergeItems_.begin(); 71 auto firstItem = internal::CompositeMergeItem{ std::move(firstIter->reader), std::move(firstIter->record) }; 72 mergeItems_.pop_front(); 73 74 // store its record in our output record 75 std::swap(record, firstItem.record); 76 77 // try fetch 'next' from first item's reader 78 // if successful, re-insert it into container & re-sort on our new values 79 // otherwise, this item will go out of scope & reader destroyed 80 if (firstItem.reader->GetNext(firstItem.record)) { 81 mergeItems_.push_front(std::move(firstItem)); 82 UpdateSort(); 83 } 84 85 // return success 86 return true; 87} 88 89inline const GenomicInterval& GenomicIntervalCompositeBamReader::Interval() const 90{ return interval_; } 91 92inline GenomicIntervalCompositeBamReader& GenomicIntervalCompositeBamReader::Interval(const GenomicInterval& interval) 93{ 94 std::deque<internal::CompositeMergeItem> updatedMergeItems; 95 std::set<std::string> filesToCreate{filenames_.cbegin(), filenames_.cend()}; 96 97 // update existing readers 98 while (!mergeItems_.empty()) { 99 100 // non-destructive 'pop' of first item from queue 101 auto firstIter = mergeItems_.begin(); 102 internal::CompositeMergeItem firstItem{ std::move(firstIter->reader), std::move(firstIter->record) }; 103 mergeItems_.pop_front(); 104 105 // reset interval 106 auto* baiReader = dynamic_cast<BaiIndexedBamReader*>(firstItem.reader.get()); 107 assert(baiReader); 108 baiReader->Interval(interval); 109 110 // try fetch 'next' from first item's reader 111 // if successful, re-insert it into container & re-sort on our new values 112 // otherwise, this item will go out of scope & reader destroyed 113 if (firstItem.reader->GetNext(firstItem.record)) { 114 updatedMergeItems.push_front(std::move(firstItem)); 115 filesToCreate.erase(firstItem.reader->Filename()); 116 } 117 } 118 119 // create readers for files that were not 'active' for the previous 120 std::vector<std::string> missingBai; 121 for (auto&& fn : filesToCreate) { 122 BamFile bamFile{ fn }; 123 if (bamFile.StandardIndexExists()) { 124 internal::CompositeMergeItem item{ std::unique_ptr<BamReader>{ new BaiIndexedBamReader{ interval, std::move(bamFile) } } }; 125 if (item.reader->GetNext(item.record)) 126 updatedMergeItems.push_back(std::move(item)); 127 // else not an error, simply no data matching interval 128 } 129 else { 130 // maybe handle PBI-backed interval searches if BAI missing, but for now treat as error 131 missingBai.push_back(bamFile.Filename()); 132 } 133 } 134 135 // throw if any files missing BAI 136 if (!missingBai.empty()) { 137 std::ostringstream e; 138 e << "failed to open GenomicIntervalCompositeBamReader because the following files are missing a BAI file:\n"; 139 for (const auto& fn : missingBai) 140 e << " " << fn << '\n'; 141 throw std::runtime_error{e.str()}; 142 } 143 144 // update our actual container and return 145 mergeItems_ = std::move(updatedMergeItems); 146 UpdateSort(); 147 return *this; 148} 149 150struct OrderByPosition 151{ 152 static inline bool less_than(const BamRecord& lhs, const BamRecord& rhs) 153 { 154 const int32_t lhsId = lhs.ReferenceId(); 155 const int32_t rhsId = rhs.ReferenceId(); 156 if (lhsId == -1) return false; 157 if (rhsId == -1) return true; 158 159 if (lhsId == rhsId) 160 return lhs.ReferenceStart() < rhs.ReferenceStart(); 161 else return lhsId < rhsId; 162 } 163 164 static inline bool equals(const BamRecord& lhs, const BamRecord& rhs) 165 { 166 return lhs.ReferenceId() == rhs.ReferenceId() && 167 lhs.ReferenceStart() == rhs.ReferenceStart(); 168 } 169}; 170 171struct PositionSorter : std::binary_function<internal::CompositeMergeItem, internal::CompositeMergeItem, bool> 172{ 173 bool operator()(const internal::CompositeMergeItem& lhs, 174 const internal::CompositeMergeItem& rhs) 175 { 176 const BamRecord& l = lhs.record; 177 const BamRecord& r = rhs.record; 178 return OrderByPosition::less_than(l, r); 179 } 180}; 181 182inline void GenomicIntervalCompositeBamReader::UpdateSort() 183{ std::sort(mergeItems_.begin(), mergeItems_.end(), PositionSorter{ }); } 184 185// ------------------------------ 186// PbiRequestCompositeBamReader 187// ------------------------------ 188 189template<typename OrderByType> 190inline PbiFilterCompositeBamReader<OrderByType>::PbiFilterCompositeBamReader(const PbiFilter& filter, 191 const std::vector<BamFile>& bamFiles) 192 : numReads_{0} 193{ 194 filenames_.reserve(bamFiles.size()); 195 for(const auto& bamFile : bamFiles) 196 filenames_.push_back(bamFile.Filename()); 197 Filter(filter); 198} 199 200template<typename OrderByType> 201inline PbiFilterCompositeBamReader<OrderByType>::PbiFilterCompositeBamReader(const PbiFilter& filter, 202 const DataSet& dataset) 203 : PbiFilterCompositeBamReader{filter, dataset.BamFiles()} 204{ } 205 206template<typename OrderByType> 207inline bool PbiFilterCompositeBamReader<OrderByType>::GetNext(BamRecord& record) 208{ 209 // nothing left to read 210 if (mergeQueue_.empty()) 211 return false; 212 213 // non-destructive 'pop' of first item from queue 214 auto firstIter = mergeQueue_.begin(); 215 value_type firstItem{ std::move(firstIter->reader), std::move(firstIter->record) }; 216 mergeQueue_.pop_front(); 217 218 // store its record in our output record 219 std::swap(record, firstItem.record); 220 221 // try fetch 'next' from first item's reader 222 // if successful, re-insert it into container & re-sort on our new values 223 // otherwise, this item will go out of scope & reader destroyed 224 if (firstItem.reader->GetNext(firstItem.record)) { 225 mergeQueue_.push_front(std::move(firstItem)); 226 UpdateSort(); 227 } 228 229 // return success 230 return true; 231} 232 233template<typename OrderByType> 234inline PbiFilterCompositeBamReader<OrderByType>& 235PbiFilterCompositeBamReader<OrderByType>::Filter(const PbiFilter& filter) 236{ 237 container_type updatedMergeItems; 238 std::set<std::string> filesToCreate{ filenames_.cbegin(), filenames_.cend() }; 239 240 // update existing readers 241 while (!mergeQueue_.empty()) { 242 243 // non-destructive 'pop' of first item from queue 244 auto firstIter = mergeQueue_.begin(); 245 internal::CompositeMergeItem firstItem{ std::move(firstIter->reader), std::move(firstIter->record) }; 246 mergeQueue_.pop_front(); 247 248 // reset request 249 auto* pbiReader = dynamic_cast<PbiIndexedBamReader*>(firstItem.reader.get()); 250 assert(pbiReader); 251 pbiReader->Filter(filter); 252 253 // try fetch 'next' from first item's reader 254 // if successful, re-insert it into container & re-sort on our new values 255 // otherwise, this item will go out of scope & reader destroyed 256 if (firstItem.reader->GetNext(firstItem.record)) { 257 updatedMergeItems.push_front(std::move(firstItem)); 258 filesToCreate.erase(firstItem.reader->Filename()); 259 } 260 } 261 262 // create readers for files that were not 'active' for the previous 263 std::vector<std::string> missingPbi; 264 for (auto&& fn : filesToCreate) { 265 const BamFile bamFile{ fn }; 266 if (bamFile.PacBioIndexExists()) { 267 auto item = internal::CompositeMergeItem{ std::unique_ptr<BamReader>{ new PbiIndexedBamReader{ filter, std::move(bamFile) } } }; 268 if (item.reader->GetNext(item.record)) 269 updatedMergeItems.push_back(std::move(item)); 270 // else not an error, simply no data matching filter 271 } 272 else 273 missingPbi.push_back(fn); 274 } 275 276 // throw if any files missing PBI 277 if (!missingPbi.empty()) { 278 std::ostringstream e; 279 e << "failed to open PbiFilterCompositeBamReader because the following files are missing a PBI file:\n"; 280 for (const auto& fn : missingPbi) 281 e << " " << fn << '\n'; 282 throw std::runtime_error{e.str()}; 283 } 284 285 286 // update our actual container, store num matching reads, sort & and return 287 mergeQueue_ = std::move(updatedMergeItems); 288 289 numReads_ = 0; 290 for (const auto& item : mergeQueue_) 291 { 292 auto* pbiReader = dynamic_cast<PbiIndexedBamReader*>(item.reader.get()); 293 numReads_ += pbiReader->NumReads(); 294 } 295 296 UpdateSort(); 297 return *this; 298} 299 300template<typename OrderByType> 301inline uint32_t PbiFilterCompositeBamReader<OrderByType>::NumReads() const 302{ 303 return numReads_; 304} 305 306template<typename OrderByType> 307inline void PbiFilterCompositeBamReader<OrderByType>::UpdateSort() 308{ std::stable_sort(mergeQueue_.begin(), mergeQueue_.end(), merge_sorter_type{}); } 309 310// ------------------------------ 311// SequentialCompositeBamReader 312// ------------------------------ 313 314inline SequentialCompositeBamReader::SequentialCompositeBamReader(std::vector<BamFile> bamFiles) 315{ 316 for (auto&& bamFile : bamFiles) 317 readers_.emplace_back(std::make_unique<BamReader>(std::move(bamFile))); 318} 319 320inline SequentialCompositeBamReader::SequentialCompositeBamReader(const DataSet& dataset) 321 : SequentialCompositeBamReader{dataset.BamFiles()} 322{ } 323 324inline bool SequentialCompositeBamReader::GetNext(BamRecord& record) 325{ 326 // try first reader, if successful return true 327 // else pop reader and try next, until all readers exhausted 328 while (!readers_.empty()) { 329 auto& reader = readers_.front(); 330 if (reader->GetNext(record)) 331 return true; 332 else 333 readers_.pop_front(); 334 } 335 336 // no readers available 337 return false; 338} 339 340} // namespace BAM 341} // namespace PacBio 342