1 /******************************************************************************* 2 * thrill/core/reduce_probing_hash_table.hpp 3 * 4 * Part of Project Thrill - http://project-thrill.org 5 * 6 * Copyright (C) 2015 Matthias Stumpp <mstumpp@gmail.com> 7 * Copyright (C) 2016 Timo Bingmann <tb@panthema.net> 8 * Copyright (C) 2017 Tim Zeitz <dev.tim.zeitz@gmail.com> 9 * 10 * All rights reserved. Published under the BSD-2 license in the LICENSE file. 11 ******************************************************************************/ 12 13 #pragma once 14 #ifndef THRILL_CORE_REDUCE_PROBING_HASH_TABLE_HEADER 15 #define THRILL_CORE_REDUCE_PROBING_HASH_TABLE_HEADER 16 17 #include <thrill/core/reduce_functional.hpp> 18 #include <thrill/core/reduce_table.hpp> 19 20 #include <algorithm> 21 #include <functional> 22 #include <limits> 23 #include <utility> 24 #include <vector> 25 26 namespace thrill { 27 namespace core { 28 29 /*! 30 * A data structure which takes an arbitrary value and extracts a key using a 31 * key extractor function from that value. A key may also be provided initially 32 * as part of a key/value pair, not requiring to extract a key. 33 * 34 * Afterwards, the key is hashed and the hash is used to assign that key/value 35 * pair to some slot. 36 * 37 * In case a slot already has a key/value pair and the key of that value and the 38 * key of the value to be inserted are them same, the values are reduced 39 * according to some reduce function. No key/value is added to the data 40 * structure. 41 * 42 * If the keys are different, the next slot (moving to the right) is considered. 43 * If the slot is occupied, the same procedure happens again (know as linear 44 * probing.) 45 * 46 * Finally, the key/value pair to be inserted may either: 47 * 48 * 1.) Be reduced with some other key/value pair, sharing the same key. 49 * 2.) Inserted at a free slot. 50 * 3.) Trigger a resize of the data structure in case there are no more free 51 * slots in the data structure. 52 * 53 * The following illustrations shows the general structure of the data 54 * structure. The set of slots is divided into 1..n partitions. Each key is 55 * hashed into exactly one partition. 56 * 57 * 58 * Partition 0 Partition 1 Partition 2 Partition 3 Partition 4 59 * P00 P01 P02 P10 P11 P12 P20 P21 P22 P30 P31 P32 P40 P41 P42 60 * +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ 61 * || | | || | | || | | || | | || | | || 62 * +---+---+---+---+---+---+---+---+---+---+---+---+---+---+---+ 63 * <- LI -> 64 * LI..Local Index 65 * <- GI -> 66 * GI..Global Index 67 * PI 0 PI 1 PI 2 PI 3 PI 4 68 * PI..Partition ID 69 * 70 */ 71 template <typename TableItem, typename Key, typename Value, 72 typename KeyExtractor, typename ReduceFunction, typename Emitter, 73 const bool VolatileKey, 74 typename ReduceConfig_, 75 typename IndexFunction, 76 typename KeyEqualFunction = std::equal_to<Key> > 77 class ReduceProbingHashTable 78 : public ReduceTable<TableItem, Key, Value, 79 KeyExtractor, ReduceFunction, Emitter, 80 VolatileKey, ReduceConfig_, 81 IndexFunction, KeyEqualFunction> 82 { 83 using Super = ReduceTable<TableItem, Key, Value, 84 KeyExtractor, ReduceFunction, Emitter, 85 VolatileKey, ReduceConfig_, IndexFunction, 86 KeyEqualFunction>; 87 using Super::debug; 88 static constexpr bool debug_items = false; 89 90 public: 91 using ReduceConfig = ReduceConfig_; 92 ReduceProbingHashTable(Context & ctx,size_t dia_id,const KeyExtractor & key_extractor,const ReduceFunction & reduce_function,Emitter & emitter,size_t num_partitions,const ReduceConfig & config=ReduceConfig (),bool immediate_flush=false,const IndexFunction & index_function=IndexFunction (),const KeyEqualFunction & key_equal_function=KeyEqualFunction ())93 ReduceProbingHashTable( 94 Context& ctx, size_t dia_id, 95 const KeyExtractor& key_extractor, 96 const ReduceFunction& reduce_function, 97 Emitter& emitter, 98 size_t num_partitions, 99 const ReduceConfig& config = ReduceConfig(), 100 bool immediate_flush = false, 101 const IndexFunction& index_function = IndexFunction(), 102 const KeyEqualFunction& key_equal_function = KeyEqualFunction()) 103 : Super(ctx, dia_id, 104 key_extractor, reduce_function, emitter, 105 num_partitions, config, immediate_flush, 106 index_function, key_equal_function) 107 { assert(num_partitions > 0); } 108 109 //! Construct the hash table itself. fill it with sentinels. have one extra 110 //! cell beyond the end for reducing the sentinel itself. Initialize(size_t limit_memory_bytes)111 void Initialize(size_t limit_memory_bytes) { 112 assert(!items_); 113 114 limit_memory_bytes_ = limit_memory_bytes; 115 116 // calculate num_buckets_per_partition_ from the memory limit and the 117 // number of partitions required, initialize partition_size_ array. 118 119 assert(limit_memory_bytes_ >= 0 && 120 "limit_memory_bytes must be greater than or equal to 0. " 121 "A byte size of zero results in exactly one item per partition"); 122 123 num_buckets_per_partition_ = std::max<size_t>( 124 1, 125 (size_t)(static_cast<double>(limit_memory_bytes_) 126 / static_cast<double>(sizeof(TableItem)) 127 / static_cast<double>(num_partitions_))); 128 129 num_buckets_ = num_buckets_per_partition_ * num_partitions_; 130 131 assert(num_buckets_per_partition_ > 0); 132 assert(num_buckets_ > 0); 133 134 partition_size_.resize( 135 num_partitions_, 136 std::min(size_t(config_.initial_items_per_partition_), 137 num_buckets_per_partition_)); 138 139 // calculate limit on the number of items in a partition before these 140 // are spilled to disk or flushed to network. 141 142 double limit_fill_rate = config_.limit_partition_fill_rate(); 143 144 assert(limit_fill_rate >= 0.0 && limit_fill_rate <= 1.0 145 && "limit_partition_fill_rate must be between 0.0 and 1.0. " 146 "with a fill rate of 0.0, items are immediately flushed."); 147 148 limit_items_per_partition_.resize( 149 num_partitions_, 150 static_cast<size_t>( 151 static_cast<double>(partition_size_[0]) * limit_fill_rate)); 152 153 assert(limit_items_per_partition_[0] >= 0); 154 155 // actually allocate the table and initialize the valid ranges, the + 1 156 // is for the sentinel's slot. 157 158 items_ = static_cast<TableItem*>( 159 operator new ((num_buckets_ + 1) * sizeof(TableItem))); 160 161 for (size_t id = 0; id < num_partitions_; ++id) { 162 TableItem* iter = items_ + id * num_buckets_per_partition_; 163 TableItem* pend = iter + partition_size_[id]; 164 165 for ( ; iter != pend; ++iter) 166 new (iter)TableItem(); 167 } 168 } 169 ~ReduceProbingHashTable()170 ~ReduceProbingHashTable() { 171 if (items_) Dispose(); 172 } 173 174 /*! 175 * Inserts a value into the table, potentially reducing it in case both the 176 * key of the value already in the table and the key of the value to be 177 * inserted are the same. 178 * 179 * An insert may trigger a partial flush of the partition with the most 180 * items if the maximal number of items in the table (max_num_items_table) 181 * is reached. 182 * 183 * Alternatively, it may trigger a resize of the table in case the maximal 184 * fill ratio per partition is reached. 185 * 186 * \param kv Value to be inserted into the table. 187 * 188 * \return true if a new key was inserted to the table 189 */ Insert(const TableItem & kv)190 bool Insert(const TableItem& kv) { 191 192 typename IndexFunction::Result h = calculate_index(kv); 193 assert(h.partition_id < num_partitions_); 194 195 if (TLX_UNLIKELY(key_equal_function_(key(kv), Key()))) { 196 // handle pairs with sentinel key specially by reducing into last 197 // element of items. 198 TableItem& sentinel = items_[num_buckets_]; 199 if (sentinel_partition_ == invalid_partition_) { 200 // first occurrence of sentinel key 201 new (&sentinel)TableItem(kv); 202 sentinel_partition_ = h.partition_id; 203 } 204 else { 205 sentinel = reduce(sentinel, kv); 206 return false; 207 } 208 ++items_per_partition_[h.partition_id]; 209 ++num_items_; 210 211 while (TLX_UNLIKELY( 212 items_per_partition_[h.partition_id] > 213 limit_items_per_partition_[h.partition_id])) { 214 GrowAndRehash(h.partition_id); 215 } 216 217 return true; 218 } 219 220 // calculate local index depending on the current subtable's size 221 size_t local_index = h.local_index(partition_size_[h.partition_id]); 222 223 TableItem* pbegin = items_ + h.partition_id * num_buckets_per_partition_; 224 TableItem* pend = pbegin + partition_size_[h.partition_id]; 225 226 TableItem* begin_iter = pbegin + local_index; 227 TableItem* iter = begin_iter; 228 229 while (!key_equal_function_(key(*iter), Key())) 230 { 231 if (key_equal_function_(key(*iter), key(kv))) 232 { 233 *iter = reduce(*iter, kv); 234 return false; 235 } 236 237 ++iter; 238 239 // wrap around if beyond the current partition 240 if (TLX_UNLIKELY(iter == pend)) 241 iter = pbegin; 242 243 // flush partition and retry, if all slots are reserved 244 if (TLX_UNLIKELY(iter == begin_iter)) { 245 GrowAndRehash(h.partition_id); 246 return Insert(kv); 247 } 248 } 249 250 // insert new pair 251 *iter = kv; 252 253 // increase counter for partition 254 ++items_per_partition_[h.partition_id]; 255 ++num_items_; 256 257 while (TLX_UNLIKELY( 258 items_per_partition_[h.partition_id] >= 259 limit_items_per_partition_[h.partition_id])) { 260 LOG << "Grow due to " 261 << items_per_partition_[h.partition_id] << " >= " 262 << limit_items_per_partition_[h.partition_id] 263 << " among " << partition_size_[h.partition_id]; 264 GrowAndRehash(h.partition_id); 265 } 266 267 return true; 268 } 269 270 //! Deallocate items and memory Dispose()271 void Dispose() { 272 if (!items_) return; 273 274 // dispose the items by destructor 275 276 for (size_t id = 0; id < num_partitions_; ++id) { 277 TableItem* iter = items_ + id * num_buckets_per_partition_; 278 TableItem* pend = iter + partition_size_[id]; 279 280 for ( ; iter != pend; ++iter) 281 iter->~TableItem(); 282 } 283 284 if (sentinel_partition_ != invalid_partition_) 285 items_[num_buckets_].~TableItem(); 286 287 operator delete (items_); 288 items_ = nullptr; 289 290 Super::Dispose(); 291 } 292 GrowAndRehash(size_t partition_id)293 void GrowAndRehash(size_t partition_id) { 294 295 size_t old_size = partition_size_[partition_id]; 296 GrowPartition(partition_id); 297 if (partition_size_[partition_id] == old_size) { 298 SpillPartition(partition_id); 299 return; 300 } 301 302 if (partition_size_[partition_id] % old_size != 0) { 303 // in place rehashing won't work properly so we spill rather than 304 // potentially blasting memory limits by using an extra vector for 305 // temporary item storage 306 SpillPartition(partition_id); 307 return; 308 } 309 310 // initialize pointers to old range - the second half is still empty 311 TableItem* pbegin = 312 items_ + partition_id * num_buckets_per_partition_; 313 TableItem* iter = pbegin; 314 TableItem* pend = pbegin + old_size; 315 316 bool passed_first_half = false; 317 bool found_hole = false; 318 while (!passed_first_half || !found_hole) { 319 Key item_key = key(*iter); 320 bool is_empty = key_equal_function_(item_key, Key()); 321 if (!is_empty) { 322 --items_per_partition_[partition_id]; 323 --num_items_; 324 TableItem item = std::move(*iter); 325 new (iter)TableItem(); 326 Insert(item); 327 } 328 329 iter++; 330 found_hole = passed_first_half && is_empty; 331 passed_first_half = passed_first_half || iter == pend; 332 } 333 } 334 335 //! Grow a partition after a spill or flush (if possible) GrowPartition(size_t partition_id)336 void GrowPartition(size_t partition_id) { 337 338 if (TLX_UNLIKELY(mem::memory_exceeded)) { 339 SpillPartition(partition_id); 340 return; 341 } 342 343 if (partition_size_[partition_id] == num_buckets_per_partition_) 344 return; 345 346 size_t new_size = std::min( 347 num_buckets_per_partition_, 2 * partition_size_[partition_id]); 348 349 sLOG << "Growing partition" << partition_id 350 << "from" << partition_size_[partition_id] << "to" << new_size 351 << "limit_items" << new_size * config_.limit_partition_fill_rate(); 352 353 // initialize new items 354 355 TableItem* pbegin = 356 items_ + partition_id * num_buckets_per_partition_; 357 TableItem* iter = pbegin + partition_size_[partition_id]; 358 TableItem* pend = pbegin + new_size; 359 360 for ( ; iter != pend; ++iter) 361 new (iter)TableItem(); 362 363 partition_size_[partition_id] = new_size; 364 limit_items_per_partition_[partition_id] 365 = new_size * config_.limit_partition_fill_rate(); 366 } 367 368 //! \name Spilling Mechanisms to External Memory Files 369 //! \{ 370 371 //! Spill all items of a partition into an external memory File. SpillPartition(size_t partition_id)372 void SpillPartition(size_t partition_id) { 373 374 if (immediate_flush_) { 375 return FlushPartition( 376 partition_id, /* consume */ true, /* grow */ !mem::memory_exceeded); 377 } 378 379 LOG << "Spilling " << items_per_partition_[partition_id] 380 << " items of partition with id: " << partition_id; 381 382 if (items_per_partition_[partition_id] == 0) 383 return; 384 385 data::File::Writer writer = partition_files_[partition_id].GetWriter(); 386 387 if (sentinel_partition_ == partition_id) { 388 writer.Put(items_[num_buckets_]); 389 items_[num_buckets_].~TableItem(); 390 sentinel_partition_ = invalid_partition_; 391 } 392 393 TableItem* iter = items_ + partition_id * num_buckets_per_partition_; 394 TableItem* pend = iter + partition_size_[partition_id]; 395 396 for ( ; iter != pend; ++iter) { 397 if (!key_equal_function_(key(*iter), Key())) { 398 writer.Put(*iter); 399 *iter = TableItem(); 400 } 401 } 402 403 // reset partition specific counter 404 num_items_ -= items_per_partition_[partition_id]; 405 items_per_partition_[partition_id] = 0; 406 assert(num_items_ == this->num_items_calc()); 407 408 LOG << "Spilled items of partition with id: " << partition_id; 409 } 410 411 //! Spill all items of an arbitrary partition into an external memory File. SpillAnyPartition()412 void SpillAnyPartition() { 413 // maybe make a policy later -tb 414 return SpillLargestPartition(); 415 } 416 417 //! Spill all items of the largest partition into an external memory File. SpillLargestPartition()418 void SpillLargestPartition() { 419 // get partition with max size 420 size_t size_max = 0, index = 0; 421 422 for (size_t i = 0; i < num_partitions_; ++i) 423 { 424 if (items_per_partition_[i] > size_max) 425 { 426 size_max = items_per_partition_[i]; 427 index = i; 428 } 429 } 430 431 if (size_max == 0) { 432 return; 433 } 434 435 return SpillPartition(index); 436 } 437 438 //! \} 439 440 //! \name Flushing Mechanisms to Next Stage or Phase 441 //! \{ 442 443 template <typename Emit> FlushPartitionEmit(size_t partition_id,bool consume,bool grow,Emit emit)444 void FlushPartitionEmit( 445 size_t partition_id, bool consume, bool grow, Emit emit) { 446 447 LOG << "Flushing " << items_per_partition_[partition_id] 448 << " items of partition: " << partition_id; 449 450 if (sentinel_partition_ == partition_id) { 451 emit(partition_id, items_[num_buckets_]); 452 if (consume) { 453 items_[num_buckets_].~TableItem(); 454 sentinel_partition_ = invalid_partition_; 455 } 456 } 457 458 TableItem* iter = items_ + partition_id * num_buckets_per_partition_; 459 TableItem* pend = iter + partition_size_[partition_id]; 460 461 for ( ; iter != pend; ++iter) 462 { 463 if (!key_equal_function_(key(*iter), Key())) { 464 emit(partition_id, *iter); 465 466 if (consume) 467 *iter = TableItem(); 468 } 469 } 470 471 if (consume) { 472 // reset partition specific counter 473 num_items_ -= items_per_partition_[partition_id]; 474 items_per_partition_[partition_id] = 0; 475 assert(num_items_ == this->num_items_calc()); 476 } 477 478 LOG << "Done flushed items of partition: " << partition_id; 479 480 if (grow) 481 GrowPartition(partition_id); 482 } 483 FlushPartition(size_t partition_id,bool consume,bool grow)484 void FlushPartition(size_t partition_id, bool consume, bool grow) { 485 FlushPartitionEmit( 486 partition_id, consume, grow, 487 [this](const size_t& partition_id, const TableItem& p) { 488 this->emitter_.Emit(partition_id, p); 489 }); 490 } 491 FlushAll()492 void FlushAll() { 493 for (size_t i = 0; i < num_partitions_; ++i) { 494 FlushPartition(i, /* consume */ true, /* grow */ false); 495 } 496 } 497 498 //! \} 499 500 public: 501 using Super::calculate_index; 502 503 private: 504 using Super::config_; 505 using Super::immediate_flush_; 506 using Super::index_function_; 507 using Super::items_per_partition_; 508 using Super::key; 509 using Super::key_equal_function_; 510 using Super::limit_memory_bytes_; 511 using Super::num_buckets_; 512 using Super::num_buckets_per_partition_; 513 using Super::num_items_; 514 using Super::num_partitions_; 515 using Super::partition_files_; 516 using Super::reduce; 517 518 //! Storing the actual hash table. 519 TableItem* items_ = nullptr; 520 521 //! Current sizes of the partitions because the valid allocated areas grow 522 std::vector<size_t> partition_size_; 523 524 //! Current limits on the number of items in a partitions, different for 525 //! different partitions, because the valid allocated areas grow. 526 std::vector<size_t> limit_items_per_partition_; 527 528 //! sentinel for invalid partition or no sentinel. 529 static constexpr size_t invalid_partition_ = size_t(-1); 530 531 //! store the partition id of the sentinel key. implicitly this also stored 532 //! whether the sentinel key was found and reduced into 533 //! items_[num_buckets_]. 534 size_t sentinel_partition_ = invalid_partition_; 535 }; 536 537 template <typename TableItem, typename Key, typename Value, 538 typename KeyExtractor, typename ReduceFunction, 539 typename Emitter, const bool VolatileKey, 540 typename ReduceConfig, typename IndexFunction, 541 typename KeyEqualFunction> 542 class ReduceTableSelect< 543 ReduceTableImpl::PROBING, 544 TableItem, Key, Value, KeyExtractor, ReduceFunction, 545 Emitter, VolatileKey, ReduceConfig, IndexFunction, KeyEqualFunction> 546 { 547 public: 548 using type = ReduceProbingHashTable< 549 TableItem, Key, Value, KeyExtractor, ReduceFunction, 550 Emitter, VolatileKey, ReduceConfig, 551 IndexFunction, KeyEqualFunction>; 552 }; 553 554 } // namespace core 555 } // namespace thrill 556 557 #endif // !THRILL_CORE_REDUCE_PROBING_HASH_TABLE_HEADER 558 559 /******************************************************************************/ 560