1 /*******************************************************************************
2 * thrill/core/buffered_multiway_merge.hpp
3 *
4 * Part of Project Thrill - http://project-thrill.org
5 *
6 * Copyright (C) 2016 Timo Bingmann <tb@panthema.net>
7 *
8 * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9 ******************************************************************************/
10
11 #pragma once
12 #ifndef THRILL_CORE_BUFFERED_MULTIWAY_MERGE_HEADER
13 #define THRILL_CORE_BUFFERED_MULTIWAY_MERGE_HEADER
14
15 #include <tlx/container/loser_tree.hpp>
16
17 #include <algorithm>
18 #include <utility>
19 #include <vector>
20
21 namespace thrill {
22 namespace core {
23
24 template <typename ValueType, typename ReaderIterator, typename Comparator>
25 class BufferedMultiwayMergeTree
26 {
27 public:
28 using Reader = typename std::iterator_traits<ReaderIterator>::value_type;
29
30 using LoserTreeType = tlx::LoserTree<
31 /* stable */ false, ValueType, Comparator>;
32
BufferedMultiwayMergeTree(ReaderIterator readers_begin,ReaderIterator readers_end,const Comparator & comp)33 BufferedMultiwayMergeTree(ReaderIterator readers_begin, ReaderIterator readers_end,
34 const Comparator& comp)
35 : readers_(readers_begin),
36 num_inputs_(static_cast<unsigned>(readers_end - readers_begin)),
37 remaining_inputs_(num_inputs_),
38 lt_(static_cast<unsigned>(num_inputs_), comp),
39 current_(num_inputs_) {
40
41 for (unsigned t = 0; t < num_inputs_; ++t)
42 {
43 if (TLX_LIKELY(readers_[t].HasNext())) {
44 current_[t].first = true;
45 current_[t].second = readers_[t].template Next<ValueType>();
46 lt_.insert_start(¤t_[t].second, t, false);
47 }
48 else {
49 current_[t].first = false;
50 lt_.insert_start(nullptr, t, true);
51 assert(remaining_inputs_ > 0);
52 --remaining_inputs_;
53 }
54 }
55
56 lt_.init();
57 }
58
HasNext() const59 bool HasNext() const {
60 return (remaining_inputs_ != 0);
61 }
62
Top()63 const ValueType& Top() {
64 return current_[lt_.min_source()].second;
65 }
66
Update()67 bool Update() {
68 unsigned top = lt_.min_source();
69
70 if (TLX_LIKELY(readers_[top].HasNext())) {
71 current_[top].first = true;
72 current_[top].second = readers_[top].template Next<ValueType>();
73 lt_.delete_min_insert(¤t_[top].second, false);
74 return true;
75 }
76 else {
77 current_[top].first = false;
78 lt_.delete_min_insert(nullptr, true);
79 assert(remaining_inputs_ > 0);
80 --remaining_inputs_;
81 return (remaining_inputs_ > 0);
82 }
83 }
84
85 private:
86 ReaderIterator readers_;
87 unsigned num_inputs_;
88 size_t remaining_inputs_;
89
90 LoserTreeType lt_;
91 //! current values in each input (exist flag, value)
92 std::vector<std::pair<bool, ValueType> > current_;
93 };
94
95 /*!
96 * Sequential multi-way merging switch for a file writer as output
97 *
98 * The decision if based on the branching factor and runtime settings.
99 *
100 * \param seqs_begin Begin iterator of iterator pair input sequence.
101 * \param seqs_end End iterator of iterator pair input sequence.
102 * \param comp Comparator.
103 * \tparam Stable Stable merging incurs a performance penalty.
104 * \tparam Sentinels The sequences have a sentinel element.
105 * \return End iterator of output sequence.
106 */
107 template <typename ValueType, typename ReaderIterator, typename Comparator>
make_buffered_multiway_merge_tree(ReaderIterator seqs_begin,ReaderIterator seqs_end,const Comparator & comp)108 auto make_buffered_multiway_merge_tree(
109 ReaderIterator seqs_begin, ReaderIterator seqs_end,
110 const Comparator& comp) {
111
112 assert(seqs_end - seqs_begin >= 1);
113 return BufferedMultiwayMergeTree<
114 ValueType, ReaderIterator,
115 Comparator>(seqs_begin, seqs_end, comp);
116 }
117
118 } // namespace core
119 } // namespace thrill
120
121 #endif // !THRILL_CORE_BUFFERED_MULTIWAY_MERGE_HEADER
122
123 /******************************************************************************/
124