1// File Description
2/// \file CompositeBamReader.inl
3/// \brief Inline implementations for the composite BAM readers, for
4///        working with multiple input files.
5//
6// Author: Derek Barnett
7
8#include "pbbam/CompositeBamReader.h"
9#include <algorithm>
10#include <set>
11#include <sstream>
12#include <stdexcept>
13
14#include "pbbam/MakeUnique.h"
15
16namespace PacBio {
17namespace BAM {
18namespace internal {
19
20// -----------------------------------
21// Merging helpers
22// -----------------------------------
23
24inline CompositeMergeItem::CompositeMergeItem(std::unique_ptr<BamReader> rdr)
25    : reader{std::move(rdr)}
26{ }
27
28inline CompositeMergeItem::CompositeMergeItem(std::unique_ptr<BamReader> rdr,
29                                              BamRecord rec)
30    : reader{std::move(rdr)}
31    , record{std::move(rec)}
32{ }
33
34template<typename CompareType>
35inline bool CompositeMergeItemSorter<CompareType>::operator()(const CompositeMergeItem& lhs,
36                                                              const CompositeMergeItem& rhs)
37{
38    const auto& l = lhs.record;
39    const auto& r = rhs.record;
40    return CompareType()(l, r);
41}
42
43} // namespace internal
44
45// -----------------------------------
46// GenomicIntervalCompositeBamReader
47// -----------------------------------
48
49inline GenomicIntervalCompositeBamReader::GenomicIntervalCompositeBamReader(const GenomicInterval& interval,
50                                                                            const std::vector<BamFile>& bamFiles)
51{
52    filenames_.reserve(bamFiles.size());
53    for(const auto& bamFile : bamFiles)
54        filenames_.push_back(bamFile.Filename());
55    Interval(interval);
56}
57
58inline GenomicIntervalCompositeBamReader::GenomicIntervalCompositeBamReader(const GenomicInterval& interval,
59                                                                            const DataSet& dataset)
60    : GenomicIntervalCompositeBamReader{interval, dataset.BamFiles()}
61{ }
62
63inline bool GenomicIntervalCompositeBamReader::GetNext(BamRecord& record)
64{
65    // nothing left to read
66    if (mergeItems_.empty())
67        return false;
68
69    // non-destructive 'pop' of first item from queue
70    auto firstIter = mergeItems_.begin();
71    auto firstItem = internal::CompositeMergeItem{ std::move(firstIter->reader), std::move(firstIter->record) };
72    mergeItems_.pop_front();
73
74    // store its record in our output record
75    std::swap(record, firstItem.record);
76
77    // try fetch 'next' from first item's reader
78    // if successful, re-insert it into container & re-sort on our new values
79    // otherwise, this item will go out of scope & reader destroyed
80    if (firstItem.reader->GetNext(firstItem.record)) {
81        mergeItems_.push_front(std::move(firstItem));
82        UpdateSort();
83    }
84
85    // return success
86    return true;
87}
88
89inline const GenomicInterval& GenomicIntervalCompositeBamReader::Interval() const
90{ return interval_; }
91
92inline GenomicIntervalCompositeBamReader& GenomicIntervalCompositeBamReader::Interval(const GenomicInterval& interval)
93{
94    std::deque<internal::CompositeMergeItem> updatedMergeItems;
95    std::set<std::string> filesToCreate{filenames_.cbegin(), filenames_.cend()};
96
97    // update existing readers
98    while (!mergeItems_.empty()) {
99
100        // non-destructive 'pop' of first item from queue
101        auto firstIter = mergeItems_.begin();
102        internal::CompositeMergeItem firstItem{ std::move(firstIter->reader), std::move(firstIter->record) };
103        mergeItems_.pop_front();
104
105        // reset interval
106        auto* baiReader = dynamic_cast<BaiIndexedBamReader*>(firstItem.reader.get());
107        assert(baiReader);
108        baiReader->Interval(interval);
109
110        // try fetch 'next' from first item's reader
111        // if successful, re-insert it into container & re-sort on our new values
112        // otherwise, this item will go out of scope & reader destroyed
113        if (firstItem.reader->GetNext(firstItem.record)) {
114            updatedMergeItems.push_front(std::move(firstItem));
115            filesToCreate.erase(firstItem.reader->Filename());
116        }
117    }
118
119    // create readers for files that were not 'active' for the previous
120    std::vector<std::string> missingBai;
121    for (auto&& fn : filesToCreate) {
122        BamFile bamFile{ fn };
123        if (bamFile.StandardIndexExists()) {
124            internal::CompositeMergeItem item{ std::unique_ptr<BamReader>{ new BaiIndexedBamReader{ interval, std::move(bamFile) } } };
125            if (item.reader->GetNext(item.record))
126                updatedMergeItems.push_back(std::move(item));
127            // else not an error, simply no data matching interval
128        }
129        else {
130            // maybe handle PBI-backed interval searches if BAI missing, but for now treat as error
131            missingBai.push_back(bamFile.Filename());
132        }
133    }
134
135    // throw if any files missing BAI
136    if (!missingBai.empty()) {
137        std::ostringstream e;
138        e << "failed to open GenomicIntervalCompositeBamReader because the following files are missing a BAI file:\n";
139        for (const auto& fn : missingBai)
140            e << "  " << fn << '\n';
141        throw std::runtime_error{e.str()};
142    }
143
144    // update our actual container and return
145    mergeItems_ = std::move(updatedMergeItems);
146    UpdateSort();
147    return *this;
148}
149
150struct OrderByPosition
151{
152    static inline bool less_than(const BamRecord& lhs, const BamRecord& rhs)
153    {
154        const int32_t lhsId = lhs.ReferenceId();
155        const int32_t rhsId = rhs.ReferenceId();
156        if (lhsId == -1) return false;
157        if (rhsId == -1) return true;
158
159        if (lhsId == rhsId)
160            return lhs.ReferenceStart() < rhs.ReferenceStart();
161        else return lhsId < rhsId;
162    }
163
164    static inline bool equals(const BamRecord& lhs, const BamRecord& rhs)
165    {
166        return lhs.ReferenceId() == rhs.ReferenceId() &&
167               lhs.ReferenceStart() == rhs.ReferenceStart();
168    }
169};
170
171struct PositionSorter : std::binary_function<internal::CompositeMergeItem, internal::CompositeMergeItem, bool>
172{
173    bool operator()(const internal::CompositeMergeItem& lhs,
174                    const internal::CompositeMergeItem& rhs)
175    {
176        const BamRecord& l = lhs.record;
177        const BamRecord& r = rhs.record;
178        return OrderByPosition::less_than(l, r);
179    }
180};
181
182inline void GenomicIntervalCompositeBamReader::UpdateSort()
183{ std::sort(mergeItems_.begin(), mergeItems_.end(), PositionSorter{ }); }
184
185// ------------------------------
186// PbiRequestCompositeBamReader
187// ------------------------------
188
189template<typename OrderByType>
190inline PbiFilterCompositeBamReader<OrderByType>::PbiFilterCompositeBamReader(const PbiFilter& filter,
191                                                                             const std::vector<BamFile>& bamFiles)
192    : numReads_{0}
193{
194    filenames_.reserve(bamFiles.size());
195    for(const auto& bamFile : bamFiles)
196        filenames_.push_back(bamFile.Filename());
197    Filter(filter);
198}
199
200template<typename OrderByType>
201inline PbiFilterCompositeBamReader<OrderByType>::PbiFilterCompositeBamReader(const PbiFilter& filter,
202                                                                             const DataSet& dataset)
203    : PbiFilterCompositeBamReader{filter, dataset.BamFiles()}
204{ }
205
206template<typename OrderByType>
207inline bool PbiFilterCompositeBamReader<OrderByType>::GetNext(BamRecord& record)
208{
209    // nothing left to read
210    if (mergeQueue_.empty())
211        return false;
212
213    // non-destructive 'pop' of first item from queue
214    auto firstIter = mergeQueue_.begin();
215    value_type firstItem{ std::move(firstIter->reader), std::move(firstIter->record) };
216    mergeQueue_.pop_front();
217
218    // store its record in our output record
219    std::swap(record, firstItem.record);
220
221    // try fetch 'next' from first item's reader
222    // if successful, re-insert it into container & re-sort on our new values
223    // otherwise, this item will go out of scope & reader destroyed
224    if (firstItem.reader->GetNext(firstItem.record)) {
225        mergeQueue_.push_front(std::move(firstItem));
226        UpdateSort();
227    }
228
229    // return success
230    return true;
231}
232
233template<typename OrderByType>
234inline PbiFilterCompositeBamReader<OrderByType>&
235PbiFilterCompositeBamReader<OrderByType>::Filter(const PbiFilter& filter)
236{
237    container_type updatedMergeItems;
238    std::set<std::string> filesToCreate{ filenames_.cbegin(), filenames_.cend() };
239
240    // update existing readers
241    while (!mergeQueue_.empty()) {
242
243        // non-destructive 'pop' of first item from queue
244        auto firstIter = mergeQueue_.begin();
245        internal::CompositeMergeItem firstItem{ std::move(firstIter->reader), std::move(firstIter->record) };
246        mergeQueue_.pop_front();
247
248        // reset request
249        auto* pbiReader = dynamic_cast<PbiIndexedBamReader*>(firstItem.reader.get());
250        assert(pbiReader);
251        pbiReader->Filter(filter);
252
253        // try fetch 'next' from first item's reader
254        // if successful, re-insert it into container & re-sort on our new values
255        // otherwise, this item will go out of scope & reader destroyed
256        if (firstItem.reader->GetNext(firstItem.record)) {
257            updatedMergeItems.push_front(std::move(firstItem));
258            filesToCreate.erase(firstItem.reader->Filename());
259        }
260    }
261
262    // create readers for files that were not 'active' for the previous
263    std::vector<std::string> missingPbi;
264    for (auto&& fn : filesToCreate) {
265        const BamFile bamFile{ fn };
266        if (bamFile.PacBioIndexExists()) {
267            auto item = internal::CompositeMergeItem{ std::unique_ptr<BamReader>{ new PbiIndexedBamReader{ filter, std::move(bamFile) } } };
268            if (item.reader->GetNext(item.record))
269                updatedMergeItems.push_back(std::move(item));
270            // else not an error, simply no data matching filter
271        }
272        else
273            missingPbi.push_back(fn);
274    }
275
276    // throw if any files missing PBI
277    if (!missingPbi.empty()) {
278        std::ostringstream e;
279        e << "failed to open PbiFilterCompositeBamReader because the following files are missing a PBI file:\n";
280        for (const auto& fn : missingPbi)
281            e << "  " << fn << '\n';
282        throw std::runtime_error{e.str()};
283    }
284
285
286    // update our actual container, store num matching reads, sort & and return
287    mergeQueue_ = std::move(updatedMergeItems);
288
289    numReads_ = 0;
290    for (const auto& item : mergeQueue_)
291    {
292        auto* pbiReader = dynamic_cast<PbiIndexedBamReader*>(item.reader.get());
293        numReads_ += pbiReader->NumReads();
294    }
295
296    UpdateSort();
297    return *this;
298}
299
300template<typename OrderByType>
301inline uint32_t PbiFilterCompositeBamReader<OrderByType>::NumReads() const
302{
303    return numReads_;
304}
305
306template<typename OrderByType>
307inline void PbiFilterCompositeBamReader<OrderByType>::UpdateSort()
308{ std::stable_sort(mergeQueue_.begin(), mergeQueue_.end(), merge_sorter_type{}); }
309
310// ------------------------------
311// SequentialCompositeBamReader
312// ------------------------------
313
314inline SequentialCompositeBamReader::SequentialCompositeBamReader(std::vector<BamFile> bamFiles)
315{
316    for (auto&& bamFile : bamFiles)
317        readers_.emplace_back(std::make_unique<BamReader>(std::move(bamFile)));
318}
319
320inline SequentialCompositeBamReader::SequentialCompositeBamReader(const DataSet& dataset)
321    : SequentialCompositeBamReader{dataset.BamFiles()}
322{ }
323
324inline bool SequentialCompositeBamReader::GetNext(BamRecord& record)
325{
326    // try first reader, if successful return true
327    // else pop reader and try next, until all readers exhausted
328    while (!readers_.empty()) {
329        auto& reader = readers_.front();
330        if (reader->GetNext(record))
331            return true;
332        else
333            readers_.pop_front();
334    }
335
336    // no readers available
337    return false;
338}
339
340} // namespace BAM
341} // namespace PacBio
342