1 /*******************************************************************************
2  * thrill/core/reduce_pre_phase.hpp
3  *
4  * Hash table with support for reduce and partitions.
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Matthias Stumpp <mstumpp@gmail.com>
9  * Copyright (C) 2015 Alexander Noe <aleexnoe@gmail.com>
10  * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #pragma once
16 #ifndef THRILL_CORE_REDUCE_PRE_PHASE_HEADER
17 #define THRILL_CORE_REDUCE_PRE_PHASE_HEADER
18 
19 #include <thrill/common/defines.hpp>
20 #include <thrill/common/logger.hpp>
21 #include <thrill/common/math.hpp>
22 #include <thrill/core/duplicate_detection.hpp>
23 #include <thrill/core/reduce_bucket_hash_table.hpp>
24 #include <thrill/core/reduce_functional.hpp>
25 #include <thrill/core/reduce_old_probing_hash_table.hpp>
26 #include <thrill/core/reduce_probing_hash_table.hpp>
27 #include <thrill/data/block_reader.hpp>
28 #include <thrill/data/block_writer.hpp>
29 #include <thrill/data/file.hpp>
30 
31 #include <algorithm>
32 #include <cassert>
33 #include <cmath>
34 #include <functional>
35 #include <string>
36 #include <utility>
37 #include <vector>
38 
39 namespace thrill {
40 namespace core {
41 
42 //! Emitter implementation to plug into a reduce hash table for
43 //! collecting/flushing items while reducing. Items flushed in the pre-phase are
44 //! transmitted via a network Channel.
45 template <typename TableItem, bool VolatileKey, typename BlockWriter>
46 class ReducePrePhaseEmitter
47 {
48     static constexpr bool debug = false;
49 
50 public:
ReducePrePhaseEmitter(std::vector<BlockWriter> & writer)51     explicit ReducePrePhaseEmitter(std::vector<BlockWriter>& writer)
52         : writer_(writer),
53           stats_(writer.size(), 0) { }
54 
55     //! output an element into a partition, template specialized for robust and
56     //! non-robust keys
Emit(const size_t & partition_id,const TableItem & p)57     void Emit(const size_t& partition_id, const TableItem& p) {
58         assert(partition_id < writer_.size());
59         stats_[partition_id]++;
60         writer_[partition_id].Put(p);
61     }
62 
Flush(size_t partition_id)63     void Flush(size_t partition_id) {
64         assert(partition_id < writer_.size());
65         writer_[partition_id].Flush();
66     }
67 
CloseAll()68     void CloseAll() {
69         sLOG << "emit stats:";
70         size_t i = 0;
71         for (BlockWriter& e : writer_) {
72             e.Close();
73             sLOG << "emitter" << i << "pushed" << stats_[i++];
74         }
75     }
76 
77 public:
78     //! Set of emitters, one per partition.
79     std::vector<BlockWriter>& writer_;
80 
81     //! Emitter stats.
82     std::vector<size_t> stats_;
83 };
84 
85 template <typename TableItem, typename Key, typename Value,
86           typename KeyExtractor, typename ReduceFunction,
87           const bool VolatileKey,
88           typename BlockWriter,
89           typename ReduceConfig_ = DefaultReduceConfig,
90           typename IndexFunction = ReduceByHash<Key>,
91           typename KeyEqualFunction = std::equal_to<Key>,
92           typename HashFunction = std::hash<Key>,
93           bool UseDuplicateDetection = false>
94 class ReducePrePhase;
95 
96 template <typename TableItem, typename Key, typename Value,
97           typename KeyExtractor, typename ReduceFunction,
98           const bool VolatileKey, typename BlockWriter,
99           typename ReduceConfig_,
100           typename IndexFunction,
101           typename KeyEqualFunction,
102           typename HashFunction>
103 class ReducePrePhase<TableItem, Key, Value,
104                      KeyExtractor, ReduceFunction,
105                      VolatileKey, BlockWriter,
106                      ReduceConfig_,
107                      IndexFunction,
108                      KeyEqualFunction,
109                      HashFunction,
110                      false>
111 {
112     static constexpr bool debug = false;
113 
114 public:
115     using ReduceConfig = ReduceConfig_;
116     using Emitter = ReducePrePhaseEmitter<TableItem, VolatileKey, BlockWriter>;
117     using MakeTableItem = ReduceMakeTableItem<Value, TableItem, VolatileKey>;
118 
119     using Table = typename ReduceTableSelect<
120         ReduceConfig::table_impl_,
121         TableItem, Key, Value,
122         KeyExtractor, ReduceFunction, Emitter,
123         VolatileKey, ReduceConfig, IndexFunction, KeyEqualFunction>::type;
124 
125     /*!
126      * A data structure which takes an arbitrary value and extracts a key using
127      * a key extractor function from that value. Afterwards, the value is hashed
128      * based on the key into some slot.
129      */
ReducePrePhase(Context & ctx,size_t dia_id,size_t num_partitions,KeyExtractor key_extractor,ReduceFunction reduce_function,std::vector<BlockWriter> & emit,const ReduceConfig & config=ReduceConfig (),const IndexFunction & index_function=IndexFunction (),const KeyEqualFunction & key_equal_function=KeyEqualFunction (),const HashFunction hash_function=HashFunction (),bool duplicates=false)130     ReducePrePhase(Context& ctx, size_t dia_id,
131                    size_t num_partitions,
132                    KeyExtractor key_extractor,
133                    ReduceFunction reduce_function,
134                    std::vector<BlockWriter>& emit,
135                    const ReduceConfig& config = ReduceConfig(),
136                    const IndexFunction& index_function = IndexFunction(),
137                    const KeyEqualFunction& key_equal_function = KeyEqualFunction(),
138                    const HashFunction hash_function = HashFunction(),
139                    bool duplicates = false)
140         : emit_(emit),
141           key_extractor_(key_extractor),
142           table_(ctx, dia_id,
143                  key_extractor, reduce_function, emit_,
144                  num_partitions, config, !duplicates,
145                  index_function, key_equal_function) {
146 
147         tlx::unused(hash_function);
148 
149         sLOG << "creating ReducePrePhase with" << emit.size() << "output emitters";
150 
151         assert(num_partitions == emit.size());
152     }
153 
154     //! non-copyable: delete copy-constructor
155     ReducePrePhase(const ReducePrePhase&) = delete;
156     //! non-copyable: delete assignment operator
157     ReducePrePhase& operator = (const ReducePrePhase&) = delete;
158 
Initialize(size_t limit_memory_bytes)159     void Initialize(size_t limit_memory_bytes) {
160         table_.Initialize(limit_memory_bytes);
161     }
162 
InitializeSkip()163     void InitializeSkip() {
164         table_.InitializeSkip();
165     }
166 
Insert(const Value & v)167     bool Insert(const Value& v) {
168         // for VolatileKey this makes std::pair and extracts the key
169         return table_.Insert(MakeTableItem::Make(v, table_.key_extractor()));
170     }
171 
InsertSkip(const Value & v)172     void InsertSkip(const Value& v) {
173         TableItem t = MakeTableItem::Make(v, table_.key_extractor());
174         typename IndexFunction::Result h = table_.calculate_index(t);
175         emit_.Emit(h.partition_id, t);
176     }
177 
178     //! Flush all partitions
FlushAll()179     void FlushAll() {
180         for (size_t id = 0; id < table_.num_partitions(); ++id) {
181             FlushPartition(id, /* consume */ true, /* grow */ false);
182         }
183     }
184 
185     //! Flushes a partition
FlushPartition(size_t partition_id,bool consume,bool grow)186     void FlushPartition(size_t partition_id, bool consume, bool grow) {
187         table_.FlushPartition(partition_id, consume, grow);
188         // data is flushed immediately, there is no spilled data
189     }
190 
191     //! Closes all emitter
CloseAll()192     void CloseAll() {
193         emit_.CloseAll();
194         table_.Dispose();
195     }
196 
197     //! \name Accessors
198     //! \{
199 
200     //! Returns the total num of items in the table.
num_items() const201     size_t num_items() const { return table_.num_items(); }
202 
203     //! calculate key range for the given output partition
key_range(size_t partition_id)204     common::Range key_range(size_t partition_id)
205     { return table_.key_range(partition_id); }
206 
207     //! \}
208 
209 protected:
210     //! Emitters used to parameterize hash table for output to network.
211     Emitter emit_;
212 
213     //! extractor function which maps a value to it's key
214     KeyExtractor key_extractor_;
215 
216     //! the first-level hash table implementation
217     Table table_;
218 };
219 
220 template <typename TableItem, typename Key, typename Value,
221           typename KeyExtractor, typename ReduceFunction,
222           const bool VolatileKey, typename BlockWriter,
223           typename ReduceConfig,
224           typename IndexFunction,
225           typename EqualToFunction,
226           typename HashFunction>
227 class ReducePrePhase<TableItem, Key, Value,
228                      KeyExtractor,
229                      ReduceFunction,
230                      VolatileKey,
231                      BlockWriter,
232                      ReduceConfig,
233                      IndexFunction,
234                      EqualToFunction,
235                      HashFunction,
236                      true>
237     : public ReducePrePhase<TableItem, Key, Value,
238                             KeyExtractor,
239                             ReduceFunction,
240                             VolatileKey,
241                             BlockWriter,
242                             ReduceConfig,
243                             IndexFunction,
244                             EqualToFunction,
245                             HashFunction,
246                             false>
247 {
248 
249 public:
250     using Super = ReducePrePhase<TableItem, Key, Value, KeyExtractor,
251                                  ReduceFunction, VolatileKey, BlockWriter,
252                                  ReduceConfig,
253                                  IndexFunction, EqualToFunction, HashFunction,
254                                  false>;
255     using KeyValuePair = std::pair<Key, Value>;
256 
ReducePrePhase(Context & ctx,size_t dia_id,size_t num_partitions,KeyExtractor key_extractor,ReduceFunction reduce_function,std::vector<BlockWriter> & emit,const ReduceConfig & config=ReduceConfig (),const IndexFunction & index_function=IndexFunction (),const EqualToFunction & equal_to_function=EqualToFunction (),const HashFunction hash_function=HashFunction ())257     ReducePrePhase(Context& ctx, size_t dia_id,
258                    size_t num_partitions,
259                    KeyExtractor key_extractor,
260                    ReduceFunction reduce_function,
261                    std::vector<BlockWriter>& emit,
262                    const ReduceConfig& config = ReduceConfig(),
263                    const IndexFunction& index_function = IndexFunction(),
264                    const EqualToFunction& equal_to_function = EqualToFunction(),
265                    const HashFunction hash_function = HashFunction())
266         : Super(ctx, dia_id, num_partitions, key_extractor, reduce_function,
267                 emit, config, index_function, equal_to_function, hash_function,
268                 /*duplicates*/ true),
269           hash_function_(hash_function) { }
270 
Insert(const Value & v)271     void Insert(const Value& v) {
272         if (Super::table_.Insert(
273                 Super::MakeTableItem::Make(v, Super::table_.key_extractor()))) {
274             hashes_.push_back(hash_function_(Super::key_extractor_(v)));
275         }
276     }
277 
278     //! Flush all partitions
FlushAll()279     void FlushAll() {
280         DuplicateDetection dup_detect;
281         max_hash_ = dup_detect.FindNonDuplicates(non_duplicates_,
282                                                  hashes_,
283                                                  Super::table_.ctx(),
284                                                  Super::table_.dia_id());
285 
286         for (size_t id = 0; id < Super::table_.num_partitions(); ++id) {
287             FlushPartition(id, /* consume */ true, /* grow */ false);
288         }
289     }
290 
FlushPartition(size_t partition_id,bool consume,bool grow)291     void FlushPartition(size_t partition_id, bool consume, bool grow) {
292         Super::table_.FlushPartitionEmit(
293             partition_id, consume, grow,
294             [this](const size_t& partition_id, const TableItem& ti) {
295                 Key key = Super::MakeTableItem::GetKey(
296                     ti, Super::table_.key_extractor());
297                 if (!non_duplicates_[hash_function_(key) % max_hash_]) {
298 
299                     duplicated_elements_++;
300                     Super::emit_.Emit(partition_id, ti);
301                 }
302                 else {
303                     non_duplicate_elements_++;
304                     Super::emit_.Emit(Super::table_.ctx().my_rank(), ti);
305                 }
306             });
307 
308         if (Super::table_.has_spilled_data_on_partition(partition_id)) {
309             data::File::Reader reader =
310                 Super::table_.partition_files()[partition_id].GetReader(true);
311             while (reader.HasNext()) {
312                 TableItem ti = reader.Next<TableItem>();
313                 Key key = Super::MakeTableItem::GetKey(
314                     ti, Super::table_.key_extractor());
315                 if (!non_duplicates_[hash_function_(key) % max_hash_]) {
316 
317                     duplicated_elements_++;
318                     Super::emit_.Emit(partition_id, ti);
319                 }
320                 else {
321                     non_duplicate_elements_++;
322                     Super::emit_.Emit(Super::table_.ctx().my_rank(), ti);
323                 }
324             }
325         }
326 
327         // flush elements pushed into emitter
328         Super::emit_.Flush(partition_id);
329         Super::emit_.Flush(Super::table_.ctx().my_rank());
330     }
331 
332     //! \name Duplicate Detection
333     //! \{
334 
335     HashFunction hash_function_;
336     //! Hashes of all keys.
337     std::vector<size_t> hashes_;
338     //! All elements occuring on more than one worker. (Elements not appearing here
339     //! can be reduced locally)
340     std::vector<bool> non_duplicates_;
341     //! Modulo for all hashes in duplicate detection to reduce hash space.
342     size_t max_hash_;
343 
344     size_t duplicated_elements_ = 0;
345     size_t non_duplicate_elements_ = 0;
346 
347     //! \}
348 };
349 
350 } // namespace core
351 } // namespace thrill
352 
353 #endif // !THRILL_CORE_REDUCE_PRE_PHASE_HEADER
354 
355 /******************************************************************************/
356