1 /*******************************************************************************
2  * thrill/core/buffered_multiway_merge.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2016 Timo Bingmann <tb@panthema.net>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_CORE_BUFFERED_MULTIWAY_MERGE_HEADER
13 #define THRILL_CORE_BUFFERED_MULTIWAY_MERGE_HEADER
14 
15 #include <tlx/container/loser_tree.hpp>
16 
17 #include <algorithm>
18 #include <utility>
19 #include <vector>
20 
21 namespace thrill {
22 namespace core {
23 
24 template <typename ValueType, typename ReaderIterator, typename Comparator>
25 class BufferedMultiwayMergeTree
26 {
27 public:
28     using Reader = typename std::iterator_traits<ReaderIterator>::value_type;
29 
30     using LoserTreeType = tlx::LoserTree<
31         /* stable */ false, ValueType, Comparator>;
32 
BufferedMultiwayMergeTree(ReaderIterator readers_begin,ReaderIterator readers_end,const Comparator & comp)33     BufferedMultiwayMergeTree(ReaderIterator readers_begin, ReaderIterator readers_end,
34                               const Comparator& comp)
35         : readers_(readers_begin),
36           num_inputs_(static_cast<unsigned>(readers_end - readers_begin)),
37           remaining_inputs_(num_inputs_),
38           lt_(static_cast<unsigned>(num_inputs_), comp),
39           current_(num_inputs_) {
40 
41         for (unsigned t = 0; t < num_inputs_; ++t)
42         {
43             if (TLX_LIKELY(readers_[t].HasNext())) {
44                 current_[t].first = true;
45                 current_[t].second = readers_[t].template Next<ValueType>();
46                 lt_.insert_start(&current_[t].second, t, false);
47             }
48             else {
49                 current_[t].first = false;
50                 lt_.insert_start(nullptr, t, true);
51                 assert(remaining_inputs_ > 0);
52                 --remaining_inputs_;
53             }
54         }
55 
56         lt_.init();
57     }
58 
HasNext() const59     bool HasNext() const {
60         return (remaining_inputs_ != 0);
61     }
62 
Top()63     const ValueType& Top() {
64         return current_[lt_.min_source()].second;
65     }
66 
Update()67     bool Update() {
68         unsigned top = lt_.min_source();
69 
70         if (TLX_LIKELY(readers_[top].HasNext())) {
71             current_[top].first = true;
72             current_[top].second = readers_[top].template Next<ValueType>();
73             lt_.delete_min_insert(&current_[top].second, false);
74             return true;
75         }
76         else {
77             current_[top].first = false;
78             lt_.delete_min_insert(nullptr, true);
79             assert(remaining_inputs_ > 0);
80             --remaining_inputs_;
81             return (remaining_inputs_ > 0);
82         }
83     }
84 
85 private:
86     ReaderIterator readers_;
87     unsigned num_inputs_;
88     size_t remaining_inputs_;
89 
90     LoserTreeType lt_;
91     //! current values in each input (exist flag, value)
92     std::vector<std::pair<bool, ValueType> > current_;
93 };
94 
95 /*!
96  * Sequential multi-way merging switch for a file writer as output
97  *
98  * The decision if based on the branching factor and runtime settings.
99  *
100  * \param seqs_begin Begin iterator of iterator pair input sequence.
101  * \param seqs_end End iterator of iterator pair input sequence.
102  * \param comp Comparator.
103  * \tparam Stable Stable merging incurs a performance penalty.
104  * \tparam Sentinels The sequences have a sentinel element.
105  * \return End iterator of output sequence.
106  */
107 template <typename ValueType, typename ReaderIterator, typename Comparator>
make_buffered_multiway_merge_tree(ReaderIterator seqs_begin,ReaderIterator seqs_end,const Comparator & comp)108 auto make_buffered_multiway_merge_tree(
109     ReaderIterator seqs_begin, ReaderIterator seqs_end,
110     const Comparator& comp) {
111 
112     assert(seqs_end - seqs_begin >= 1);
113     return BufferedMultiwayMergeTree<
114         ValueType, ReaderIterator,
115         Comparator>(seqs_begin, seqs_end, comp);
116 }
117 
118 } // namespace core
119 } // namespace thrill
120 
121 #endif // !THRILL_CORE_BUFFERED_MULTIWAY_MERGE_HEADER
122 
123 /******************************************************************************/
124