1 /*******************************************************************************
2  * thrill/core/reduce_probing_hash_table.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Matthias Stumpp <mstumpp@gmail.com>
7  * Copyright (C) 2016 Timo Bingmann <tb@panthema.net>
8  * Copyright (C) 2017 Tim Zeitz <dev.tim.zeitz@gmail.com>
9  *
10  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
11  ******************************************************************************/
12 
13 #pragma once
14 #ifndef THRILL_CORE_REDUCE_PROBING_HASH_TABLE_HEADER
15 #define THRILL_CORE_REDUCE_PROBING_HASH_TABLE_HEADER
16 
17 #include <thrill/core/reduce_functional.hpp>
18 #include <thrill/core/reduce_table.hpp>
19 
20 #include <algorithm>
21 #include <functional>
22 #include <limits>
23 #include <utility>
24 #include <vector>
25 
26 namespace thrill {
27 namespace core {
28 
29 /*!
30  * A data structure which takes an arbitrary value and extracts a key using a
31  * key extractor function from that value. A key may also be provided initially
32  * as part of a key/value pair, not requiring to extract a key.
33  *
34  * Afterwards, the key is hashed and the hash is used to assign that key/value
35  * pair to some slot.
36  *
37  * In case a slot already has a key/value pair and the key of that value and the
38  * key of the value to be inserted are them same, the values are reduced
39  * according to some reduce function. No key/value is added to the data
40  * structure.
41  *
42  * If the keys are different, the next slot (moving to the right) is considered.
43  * If the slot is occupied, the same procedure happens again (know as linear
44  * probing.)
45  *
46  * Finally, the key/value pair to be inserted may either:
47  *
48  * 1.) Be reduced with some other key/value pair, sharing the same key.
49  * 2.) Inserted at a free slot.
50  * 3.) Trigger a resize of the data structure in case there are no more free
51  *     slots in the data structure.
52  *
53  * The following illustrations shows the general structure of the data
54  * structure.  The set of slots is divided into 1..n partitions. Each key is
55  * hashed into exactly one partition.
56  *
57  *
58  *     Partition 0 Partition 1 Partition 2 Partition 3 Partition 4
59  *     P00 P01 P02 P10 P11 P12 P20 P21 P22 P30 P31 P32 P40 P41 P42
60  *    +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
61  *    ||  |   |   ||  |   |   ||  |   |   ||  |   |   ||  |   |  ||
62  *    +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+
63  *                <-   LI  ->
64  *                     LI..Local Index
65  *    <-        GI         ->
66  *              GI..Global Index
67  *         PI 0        PI 1        PI 2        PI 3        PI 4
68  *         PI..Partition ID
69  *
70  */
71 template <typename TableItem, typename Key, typename Value,
72           typename KeyExtractor, typename ReduceFunction, typename Emitter,
73           const bool VolatileKey,
74           typename ReduceConfig_,
75           typename IndexFunction,
76           typename KeyEqualFunction = std::equal_to<Key> >
77 class ReduceProbingHashTable
78     : public ReduceTable<TableItem, Key, Value,
79                          KeyExtractor, ReduceFunction, Emitter,
80                          VolatileKey, ReduceConfig_,
81                          IndexFunction, KeyEqualFunction>
82 {
83     using Super = ReduceTable<TableItem, Key, Value,
84                               KeyExtractor, ReduceFunction, Emitter,
85                               VolatileKey, ReduceConfig_, IndexFunction,
86                               KeyEqualFunction>;
87     using Super::debug;
88     static constexpr bool debug_items = false;
89 
90 public:
91     using ReduceConfig = ReduceConfig_;
92 
ReduceProbingHashTable(Context & ctx,size_t dia_id,const KeyExtractor & key_extractor,const ReduceFunction & reduce_function,Emitter & emitter,size_t num_partitions,const ReduceConfig & config=ReduceConfig (),bool immediate_flush=false,const IndexFunction & index_function=IndexFunction (),const KeyEqualFunction & key_equal_function=KeyEqualFunction ())93     ReduceProbingHashTable(
94         Context& ctx, size_t dia_id,
95         const KeyExtractor& key_extractor,
96         const ReduceFunction& reduce_function,
97         Emitter& emitter,
98         size_t num_partitions,
99         const ReduceConfig& config = ReduceConfig(),
100         bool immediate_flush = false,
101         const IndexFunction& index_function = IndexFunction(),
102         const KeyEqualFunction& key_equal_function = KeyEqualFunction())
103         : Super(ctx, dia_id,
104                 key_extractor, reduce_function, emitter,
105                 num_partitions, config, immediate_flush,
106                 index_function, key_equal_function)
107     { assert(num_partitions > 0); }
108 
109     //! Construct the hash table itself. fill it with sentinels. have one extra
110     //! cell beyond the end for reducing the sentinel itself.
Initialize(size_t limit_memory_bytes)111     void Initialize(size_t limit_memory_bytes) {
112         assert(!items_);
113 
114         limit_memory_bytes_ = limit_memory_bytes;
115 
116         // calculate num_buckets_per_partition_ from the memory limit and the
117         // number of partitions required, initialize partition_size_ array.
118 
119         assert(limit_memory_bytes_ >= 0 &&
120                "limit_memory_bytes must be greater than or equal to 0. "
121                "A byte size of zero results in exactly one item per partition");
122 
123         num_buckets_per_partition_ = std::max<size_t>(
124             1,
125             (size_t)(static_cast<double>(limit_memory_bytes_)
126                      / static_cast<double>(sizeof(TableItem))
127                      / static_cast<double>(num_partitions_)));
128 
129         num_buckets_ = num_buckets_per_partition_ * num_partitions_;
130 
131         assert(num_buckets_per_partition_ > 0);
132         assert(num_buckets_ > 0);
133 
134         partition_size_.resize(
135             num_partitions_,
136             std::min(size_t(config_.initial_items_per_partition_),
137                      num_buckets_per_partition_));
138 
139         // calculate limit on the number of items in a partition before these
140         // are spilled to disk or flushed to network.
141 
142         double limit_fill_rate = config_.limit_partition_fill_rate();
143 
144         assert(limit_fill_rate >= 0.0 && limit_fill_rate <= 1.0
145                && "limit_partition_fill_rate must be between 0.0 and 1.0. "
146                "with a fill rate of 0.0, items are immediately flushed.");
147 
148         limit_items_per_partition_.resize(
149             num_partitions_,
150             static_cast<size_t>(
151                 static_cast<double>(partition_size_[0]) * limit_fill_rate));
152 
153         assert(limit_items_per_partition_[0] >= 0);
154 
155         // actually allocate the table and initialize the valid ranges, the + 1
156         // is for the sentinel's slot.
157 
158         items_ = static_cast<TableItem*>(
159             operator new ((num_buckets_ + 1) * sizeof(TableItem)));
160 
161         for (size_t id = 0; id < num_partitions_; ++id) {
162             TableItem* iter = items_ + id * num_buckets_per_partition_;
163             TableItem* pend = iter + partition_size_[id];
164 
165             for ( ; iter != pend; ++iter)
166                 new (iter)TableItem();
167         }
168     }
169 
~ReduceProbingHashTable()170     ~ReduceProbingHashTable() {
171         if (items_) Dispose();
172     }
173 
174     /*!
175      * Inserts a value into the table, potentially reducing it in case both the
176      * key of the value already in the table and the key of the value to be
177      * inserted are the same.
178      *
179      * An insert may trigger a partial flush of the partition with the most
180      * items if the maximal number of items in the table (max_num_items_table)
181      * is reached.
182      *
183      * Alternatively, it may trigger a resize of the table in case the maximal
184      * fill ratio per partition is reached.
185      *
186      * \param kv Value to be inserted into the table.
187      *
188      * \return true if a new key was inserted to the table
189      */
Insert(const TableItem & kv)190     bool Insert(const TableItem& kv) {
191 
192         typename IndexFunction::Result h = calculate_index(kv);
193         assert(h.partition_id < num_partitions_);
194 
195         if (TLX_UNLIKELY(key_equal_function_(key(kv), Key()))) {
196             // handle pairs with sentinel key specially by reducing into last
197             // element of items.
198             TableItem& sentinel = items_[num_buckets_];
199             if (sentinel_partition_ == invalid_partition_) {
200                 // first occurrence of sentinel key
201                 new (&sentinel)TableItem(kv);
202                 sentinel_partition_ = h.partition_id;
203             }
204             else {
205                 sentinel = reduce(sentinel, kv);
206                 return false;
207             }
208             ++items_per_partition_[h.partition_id];
209             ++num_items_;
210 
211             while (TLX_UNLIKELY(
212                        items_per_partition_[h.partition_id] >
213                        limit_items_per_partition_[h.partition_id])) {
214                 GrowAndRehash(h.partition_id);
215             }
216 
217             return true;
218         }
219 
220         // calculate local index depending on the current subtable's size
221         size_t local_index = h.local_index(partition_size_[h.partition_id]);
222 
223         TableItem* pbegin = items_ + h.partition_id * num_buckets_per_partition_;
224         TableItem* pend = pbegin + partition_size_[h.partition_id];
225 
226         TableItem* begin_iter = pbegin + local_index;
227         TableItem* iter = begin_iter;
228 
229         while (!key_equal_function_(key(*iter), Key()))
230         {
231             if (key_equal_function_(key(*iter), key(kv)))
232             {
233                 *iter = reduce(*iter, kv);
234                 return false;
235             }
236 
237             ++iter;
238 
239             // wrap around if beyond the current partition
240             if (TLX_UNLIKELY(iter == pend))
241                 iter = pbegin;
242 
243             // flush partition and retry, if all slots are reserved
244             if (TLX_UNLIKELY(iter == begin_iter)) {
245                 GrowAndRehash(h.partition_id);
246                 return Insert(kv);
247             }
248         }
249 
250         // insert new pair
251         *iter = kv;
252 
253         // increase counter for partition
254         ++items_per_partition_[h.partition_id];
255         ++num_items_;
256 
257         while (TLX_UNLIKELY(
258                    items_per_partition_[h.partition_id] >=
259                    limit_items_per_partition_[h.partition_id])) {
260             LOG << "Grow due to "
261                 << items_per_partition_[h.partition_id] << " >= "
262                 << limit_items_per_partition_[h.partition_id]
263                 << " among " << partition_size_[h.partition_id];
264             GrowAndRehash(h.partition_id);
265         }
266 
267         return true;
268     }
269 
270     //! Deallocate items and memory
Dispose()271     void Dispose() {
272         if (!items_) return;
273 
274         // dispose the items by destructor
275 
276         for (size_t id = 0; id < num_partitions_; ++id) {
277             TableItem* iter = items_ + id * num_buckets_per_partition_;
278             TableItem* pend = iter + partition_size_[id];
279 
280             for ( ; iter != pend; ++iter)
281                 iter->~TableItem();
282         }
283 
284         if (sentinel_partition_ != invalid_partition_)
285             items_[num_buckets_].~TableItem();
286 
287         operator delete (items_);
288         items_ = nullptr;
289 
290         Super::Dispose();
291     }
292 
GrowAndRehash(size_t partition_id)293     void GrowAndRehash(size_t partition_id) {
294 
295         size_t old_size = partition_size_[partition_id];
296         GrowPartition(partition_id);
297         if (partition_size_[partition_id] == old_size) {
298             SpillPartition(partition_id);
299             return;
300         }
301 
302         if (partition_size_[partition_id] % old_size != 0) {
303             // in place rehashing won't work properly so we spill rather than
304             // potentially blasting memory limits by using an extra vector for
305             // temporary item storage
306             SpillPartition(partition_id);
307             return;
308         }
309 
310         // initialize pointers to old range - the second half is still empty
311         TableItem* pbegin =
312             items_ + partition_id * num_buckets_per_partition_;
313         TableItem* iter = pbegin;
314         TableItem* pend = pbegin + old_size;
315 
316         bool passed_first_half = false;
317         bool found_hole = false;
318         while (!passed_first_half || !found_hole) {
319             Key item_key = key(*iter);
320             bool is_empty = key_equal_function_(item_key, Key());
321             if (!is_empty) {
322                 --items_per_partition_[partition_id];
323                 --num_items_;
324                 TableItem item = std::move(*iter);
325                 new (iter)TableItem();
326                 Insert(item);
327             }
328 
329             iter++;
330             found_hole = passed_first_half && is_empty;
331             passed_first_half = passed_first_half || iter == pend;
332         }
333     }
334 
335     //! Grow a partition after a spill or flush (if possible)
GrowPartition(size_t partition_id)336     void GrowPartition(size_t partition_id) {
337 
338         if (TLX_UNLIKELY(mem::memory_exceeded)) {
339             SpillPartition(partition_id);
340             return;
341         }
342 
343         if (partition_size_[partition_id] == num_buckets_per_partition_)
344             return;
345 
346         size_t new_size = std::min(
347             num_buckets_per_partition_, 2 * partition_size_[partition_id]);
348 
349         sLOG << "Growing partition" << partition_id
350              << "from" << partition_size_[partition_id] << "to" << new_size
351              << "limit_items" << new_size * config_.limit_partition_fill_rate();
352 
353         // initialize new items
354 
355         TableItem* pbegin =
356             items_ + partition_id * num_buckets_per_partition_;
357         TableItem* iter = pbegin + partition_size_[partition_id];
358         TableItem* pend = pbegin + new_size;
359 
360         for ( ; iter != pend; ++iter)
361             new (iter)TableItem();
362 
363         partition_size_[partition_id] = new_size;
364         limit_items_per_partition_[partition_id]
365             = new_size * config_.limit_partition_fill_rate();
366     }
367 
368     //! \name Spilling Mechanisms to External Memory Files
369     //! \{
370 
371     //! Spill all items of a partition into an external memory File.
SpillPartition(size_t partition_id)372     void SpillPartition(size_t partition_id) {
373 
374         if (immediate_flush_) {
375             return FlushPartition(
376                 partition_id, /* consume */ true, /* grow */ !mem::memory_exceeded);
377         }
378 
379         LOG << "Spilling " << items_per_partition_[partition_id]
380             << " items of partition with id: " << partition_id;
381 
382         if (items_per_partition_[partition_id] == 0)
383             return;
384 
385         data::File::Writer writer = partition_files_[partition_id].GetWriter();
386 
387         if (sentinel_partition_ == partition_id) {
388             writer.Put(items_[num_buckets_]);
389             items_[num_buckets_].~TableItem();
390             sentinel_partition_ = invalid_partition_;
391         }
392 
393         TableItem* iter = items_ + partition_id * num_buckets_per_partition_;
394         TableItem* pend = iter + partition_size_[partition_id];
395 
396         for ( ; iter != pend; ++iter) {
397             if (!key_equal_function_(key(*iter), Key())) {
398                 writer.Put(*iter);
399                 *iter = TableItem();
400             }
401         }
402 
403         // reset partition specific counter
404         num_items_ -= items_per_partition_[partition_id];
405         items_per_partition_[partition_id] = 0;
406         assert(num_items_ == this->num_items_calc());
407 
408         LOG << "Spilled items of partition with id: " << partition_id;
409     }
410 
411     //! Spill all items of an arbitrary partition into an external memory File.
SpillAnyPartition()412     void SpillAnyPartition() {
413         // maybe make a policy later -tb
414         return SpillLargestPartition();
415     }
416 
417     //! Spill all items of the largest partition into an external memory File.
SpillLargestPartition()418     void SpillLargestPartition() {
419         // get partition with max size
420         size_t size_max = 0, index = 0;
421 
422         for (size_t i = 0; i < num_partitions_; ++i)
423         {
424             if (items_per_partition_[i] > size_max)
425             {
426                 size_max = items_per_partition_[i];
427                 index = i;
428             }
429         }
430 
431         if (size_max == 0) {
432             return;
433         }
434 
435         return SpillPartition(index);
436     }
437 
438     //! \}
439 
440     //! \name Flushing Mechanisms to Next Stage or Phase
441     //! \{
442 
443     template <typename Emit>
FlushPartitionEmit(size_t partition_id,bool consume,bool grow,Emit emit)444     void FlushPartitionEmit(
445         size_t partition_id, bool consume, bool grow, Emit emit) {
446 
447         LOG << "Flushing " << items_per_partition_[partition_id]
448             << " items of partition: " << partition_id;
449 
450         if (sentinel_partition_ == partition_id) {
451             emit(partition_id, items_[num_buckets_]);
452             if (consume) {
453                 items_[num_buckets_].~TableItem();
454                 sentinel_partition_ = invalid_partition_;
455             }
456         }
457 
458         TableItem* iter = items_ + partition_id * num_buckets_per_partition_;
459         TableItem* pend = iter + partition_size_[partition_id];
460 
461         for ( ; iter != pend; ++iter)
462         {
463             if (!key_equal_function_(key(*iter), Key())) {
464                 emit(partition_id, *iter);
465 
466                 if (consume)
467                     *iter = TableItem();
468             }
469         }
470 
471         if (consume) {
472             // reset partition specific counter
473             num_items_ -= items_per_partition_[partition_id];
474             items_per_partition_[partition_id] = 0;
475             assert(num_items_ == this->num_items_calc());
476         }
477 
478         LOG << "Done flushed items of partition: " << partition_id;
479 
480         if (grow)
481             GrowPartition(partition_id);
482     }
483 
FlushPartition(size_t partition_id,bool consume,bool grow)484     void FlushPartition(size_t partition_id, bool consume, bool grow) {
485         FlushPartitionEmit(
486             partition_id, consume, grow,
487             [this](const size_t& partition_id, const TableItem& p) {
488                 this->emitter_.Emit(partition_id, p);
489             });
490     }
491 
FlushAll()492     void FlushAll() {
493         for (size_t i = 0; i < num_partitions_; ++i) {
494             FlushPartition(i, /* consume */ true, /* grow */ false);
495         }
496     }
497 
498     //! \}
499 
500 public:
501     using Super::calculate_index;
502 
503 private:
504     using Super::config_;
505     using Super::immediate_flush_;
506     using Super::index_function_;
507     using Super::items_per_partition_;
508     using Super::key;
509     using Super::key_equal_function_;
510     using Super::limit_memory_bytes_;
511     using Super::num_buckets_;
512     using Super::num_buckets_per_partition_;
513     using Super::num_items_;
514     using Super::num_partitions_;
515     using Super::partition_files_;
516     using Super::reduce;
517 
518     //! Storing the actual hash table.
519     TableItem* items_ = nullptr;
520 
521     //! Current sizes of the partitions because the valid allocated areas grow
522     std::vector<size_t> partition_size_;
523 
524     //! Current limits on the number of items in a partitions, different for
525     //! different partitions, because the valid allocated areas grow.
526     std::vector<size_t> limit_items_per_partition_;
527 
528     //! sentinel for invalid partition or no sentinel.
529     static constexpr size_t invalid_partition_ = size_t(-1);
530 
531     //! store the partition id of the sentinel key. implicitly this also stored
532     //! whether the sentinel key was found and reduced into
533     //! items_[num_buckets_].
534     size_t sentinel_partition_ = invalid_partition_;
535 };
536 
537 template <typename TableItem, typename Key, typename Value,
538           typename KeyExtractor, typename ReduceFunction,
539           typename Emitter, const bool VolatileKey,
540           typename ReduceConfig, typename IndexFunction,
541           typename KeyEqualFunction>
542 class ReduceTableSelect<
543         ReduceTableImpl::PROBING,
544         TableItem, Key, Value, KeyExtractor, ReduceFunction,
545         Emitter, VolatileKey, ReduceConfig, IndexFunction, KeyEqualFunction>
546 {
547 public:
548     using type = ReduceProbingHashTable<
549         TableItem, Key, Value, KeyExtractor, ReduceFunction,
550         Emitter, VolatileKey, ReduceConfig,
551         IndexFunction, KeyEqualFunction>;
552 };
553 
554 } // namespace core
555 } // namespace thrill
556 
557 #endif // !THRILL_CORE_REDUCE_PROBING_HASH_TABLE_HEADER
558 
559 /******************************************************************************/
560