1 /******************************************************************************* 2 * thrill/core/reduce_pre_phase.hpp 3 * 4 * Hash table with support for reduce and partitions. 5 * 6 * Part of Project Thrill - http://project-thrill.org 7 * 8 * Copyright (C) 2015 Matthias Stumpp <mstumpp@gmail.com> 9 * Copyright (C) 2015 Alexander Noe <aleexnoe@gmail.com> 10 * Copyright (C) 2015 Timo Bingmann <tb@panthema.net> 11 * 12 * All rights reserved. Published under the BSD-2 license in the LICENSE file. 13 ******************************************************************************/ 14 15 #pragma once 16 #ifndef THRILL_CORE_REDUCE_PRE_PHASE_HEADER 17 #define THRILL_CORE_REDUCE_PRE_PHASE_HEADER 18 19 #include <thrill/common/defines.hpp> 20 #include <thrill/common/logger.hpp> 21 #include <thrill/common/math.hpp> 22 #include <thrill/core/duplicate_detection.hpp> 23 #include <thrill/core/reduce_bucket_hash_table.hpp> 24 #include <thrill/core/reduce_functional.hpp> 25 #include <thrill/core/reduce_old_probing_hash_table.hpp> 26 #include <thrill/core/reduce_probing_hash_table.hpp> 27 #include <thrill/data/block_reader.hpp> 28 #include <thrill/data/block_writer.hpp> 29 #include <thrill/data/file.hpp> 30 31 #include <algorithm> 32 #include <cassert> 33 #include <cmath> 34 #include <functional> 35 #include <string> 36 #include <utility> 37 #include <vector> 38 39 namespace thrill { 40 namespace core { 41 42 //! Emitter implementation to plug into a reduce hash table for 43 //! collecting/flushing items while reducing. Items flushed in the pre-phase are 44 //! transmitted via a network Channel. 45 template <typename TableItem, bool VolatileKey, typename BlockWriter> 46 class ReducePrePhaseEmitter 47 { 48 static constexpr bool debug = false; 49 50 public: ReducePrePhaseEmitter(std::vector<BlockWriter> & writer)51 explicit ReducePrePhaseEmitter(std::vector<BlockWriter>& writer) 52 : writer_(writer), 53 stats_(writer.size(), 0) { } 54 55 //! output an element into a partition, template specialized for robust and 56 //! non-robust keys Emit(const size_t & partition_id,const TableItem & p)57 void Emit(const size_t& partition_id, const TableItem& p) { 58 assert(partition_id < writer_.size()); 59 stats_[partition_id]++; 60 writer_[partition_id].Put(p); 61 } 62 Flush(size_t partition_id)63 void Flush(size_t partition_id) { 64 assert(partition_id < writer_.size()); 65 writer_[partition_id].Flush(); 66 } 67 CloseAll()68 void CloseAll() { 69 sLOG << "emit stats:"; 70 size_t i = 0; 71 for (BlockWriter& e : writer_) { 72 e.Close(); 73 sLOG << "emitter" << i << "pushed" << stats_[i++]; 74 } 75 } 76 77 public: 78 //! Set of emitters, one per partition. 79 std::vector<BlockWriter>& writer_; 80 81 //! Emitter stats. 82 std::vector<size_t> stats_; 83 }; 84 85 template <typename TableItem, typename Key, typename Value, 86 typename KeyExtractor, typename ReduceFunction, 87 const bool VolatileKey, 88 typename BlockWriter, 89 typename ReduceConfig_ = DefaultReduceConfig, 90 typename IndexFunction = ReduceByHash<Key>, 91 typename KeyEqualFunction = std::equal_to<Key>, 92 typename HashFunction = std::hash<Key>, 93 bool UseDuplicateDetection = false> 94 class ReducePrePhase; 95 96 template <typename TableItem, typename Key, typename Value, 97 typename KeyExtractor, typename ReduceFunction, 98 const bool VolatileKey, typename BlockWriter, 99 typename ReduceConfig_, 100 typename IndexFunction, 101 typename KeyEqualFunction, 102 typename HashFunction> 103 class ReducePrePhase<TableItem, Key, Value, 104 KeyExtractor, ReduceFunction, 105 VolatileKey, BlockWriter, 106 ReduceConfig_, 107 IndexFunction, 108 KeyEqualFunction, 109 HashFunction, 110 false> 111 { 112 static constexpr bool debug = false; 113 114 public: 115 using ReduceConfig = ReduceConfig_; 116 using Emitter = ReducePrePhaseEmitter<TableItem, VolatileKey, BlockWriter>; 117 using MakeTableItem = ReduceMakeTableItem<Value, TableItem, VolatileKey>; 118 119 using Table = typename ReduceTableSelect< 120 ReduceConfig::table_impl_, 121 TableItem, Key, Value, 122 KeyExtractor, ReduceFunction, Emitter, 123 VolatileKey, ReduceConfig, IndexFunction, KeyEqualFunction>::type; 124 125 /*! 126 * A data structure which takes an arbitrary value and extracts a key using 127 * a key extractor function from that value. Afterwards, the value is hashed 128 * based on the key into some slot. 129 */ ReducePrePhase(Context & ctx,size_t dia_id,size_t num_partitions,KeyExtractor key_extractor,ReduceFunction reduce_function,std::vector<BlockWriter> & emit,const ReduceConfig & config=ReduceConfig (),const IndexFunction & index_function=IndexFunction (),const KeyEqualFunction & key_equal_function=KeyEqualFunction (),const HashFunction hash_function=HashFunction (),bool duplicates=false)130 ReducePrePhase(Context& ctx, size_t dia_id, 131 size_t num_partitions, 132 KeyExtractor key_extractor, 133 ReduceFunction reduce_function, 134 std::vector<BlockWriter>& emit, 135 const ReduceConfig& config = ReduceConfig(), 136 const IndexFunction& index_function = IndexFunction(), 137 const KeyEqualFunction& key_equal_function = KeyEqualFunction(), 138 const HashFunction hash_function = HashFunction(), 139 bool duplicates = false) 140 : emit_(emit), 141 key_extractor_(key_extractor), 142 table_(ctx, dia_id, 143 key_extractor, reduce_function, emit_, 144 num_partitions, config, !duplicates, 145 index_function, key_equal_function) { 146 147 tlx::unused(hash_function); 148 149 sLOG << "creating ReducePrePhase with" << emit.size() << "output emitters"; 150 151 assert(num_partitions == emit.size()); 152 } 153 154 //! non-copyable: delete copy-constructor 155 ReducePrePhase(const ReducePrePhase&) = delete; 156 //! non-copyable: delete assignment operator 157 ReducePrePhase& operator = (const ReducePrePhase&) = delete; 158 Initialize(size_t limit_memory_bytes)159 void Initialize(size_t limit_memory_bytes) { 160 table_.Initialize(limit_memory_bytes); 161 } 162 InitializeSkip()163 void InitializeSkip() { 164 table_.InitializeSkip(); 165 } 166 Insert(const Value & v)167 bool Insert(const Value& v) { 168 // for VolatileKey this makes std::pair and extracts the key 169 return table_.Insert(MakeTableItem::Make(v, table_.key_extractor())); 170 } 171 InsertSkip(const Value & v)172 void InsertSkip(const Value& v) { 173 TableItem t = MakeTableItem::Make(v, table_.key_extractor()); 174 typename IndexFunction::Result h = table_.calculate_index(t); 175 emit_.Emit(h.partition_id, t); 176 } 177 178 //! Flush all partitions FlushAll()179 void FlushAll() { 180 for (size_t id = 0; id < table_.num_partitions(); ++id) { 181 FlushPartition(id, /* consume */ true, /* grow */ false); 182 } 183 } 184 185 //! Flushes a partition FlushPartition(size_t partition_id,bool consume,bool grow)186 void FlushPartition(size_t partition_id, bool consume, bool grow) { 187 table_.FlushPartition(partition_id, consume, grow); 188 // data is flushed immediately, there is no spilled data 189 } 190 191 //! Closes all emitter CloseAll()192 void CloseAll() { 193 emit_.CloseAll(); 194 table_.Dispose(); 195 } 196 197 //! \name Accessors 198 //! \{ 199 200 //! Returns the total num of items in the table. num_items() const201 size_t num_items() const { return table_.num_items(); } 202 203 //! calculate key range for the given output partition key_range(size_t partition_id)204 common::Range key_range(size_t partition_id) 205 { return table_.key_range(partition_id); } 206 207 //! \} 208 209 protected: 210 //! Emitters used to parameterize hash table for output to network. 211 Emitter emit_; 212 213 //! extractor function which maps a value to it's key 214 KeyExtractor key_extractor_; 215 216 //! the first-level hash table implementation 217 Table table_; 218 }; 219 220 template <typename TableItem, typename Key, typename Value, 221 typename KeyExtractor, typename ReduceFunction, 222 const bool VolatileKey, typename BlockWriter, 223 typename ReduceConfig, 224 typename IndexFunction, 225 typename EqualToFunction, 226 typename HashFunction> 227 class ReducePrePhase<TableItem, Key, Value, 228 KeyExtractor, 229 ReduceFunction, 230 VolatileKey, 231 BlockWriter, 232 ReduceConfig, 233 IndexFunction, 234 EqualToFunction, 235 HashFunction, 236 true> 237 : public ReducePrePhase<TableItem, Key, Value, 238 KeyExtractor, 239 ReduceFunction, 240 VolatileKey, 241 BlockWriter, 242 ReduceConfig, 243 IndexFunction, 244 EqualToFunction, 245 HashFunction, 246 false> 247 { 248 249 public: 250 using Super = ReducePrePhase<TableItem, Key, Value, KeyExtractor, 251 ReduceFunction, VolatileKey, BlockWriter, 252 ReduceConfig, 253 IndexFunction, EqualToFunction, HashFunction, 254 false>; 255 using KeyValuePair = std::pair<Key, Value>; 256 ReducePrePhase(Context & ctx,size_t dia_id,size_t num_partitions,KeyExtractor key_extractor,ReduceFunction reduce_function,std::vector<BlockWriter> & emit,const ReduceConfig & config=ReduceConfig (),const IndexFunction & index_function=IndexFunction (),const EqualToFunction & equal_to_function=EqualToFunction (),const HashFunction hash_function=HashFunction ())257 ReducePrePhase(Context& ctx, size_t dia_id, 258 size_t num_partitions, 259 KeyExtractor key_extractor, 260 ReduceFunction reduce_function, 261 std::vector<BlockWriter>& emit, 262 const ReduceConfig& config = ReduceConfig(), 263 const IndexFunction& index_function = IndexFunction(), 264 const EqualToFunction& equal_to_function = EqualToFunction(), 265 const HashFunction hash_function = HashFunction()) 266 : Super(ctx, dia_id, num_partitions, key_extractor, reduce_function, 267 emit, config, index_function, equal_to_function, hash_function, 268 /*duplicates*/ true), 269 hash_function_(hash_function) { } 270 Insert(const Value & v)271 void Insert(const Value& v) { 272 if (Super::table_.Insert( 273 Super::MakeTableItem::Make(v, Super::table_.key_extractor()))) { 274 hashes_.push_back(hash_function_(Super::key_extractor_(v))); 275 } 276 } 277 278 //! Flush all partitions FlushAll()279 void FlushAll() { 280 DuplicateDetection dup_detect; 281 max_hash_ = dup_detect.FindNonDuplicates(non_duplicates_, 282 hashes_, 283 Super::table_.ctx(), 284 Super::table_.dia_id()); 285 286 for (size_t id = 0; id < Super::table_.num_partitions(); ++id) { 287 FlushPartition(id, /* consume */ true, /* grow */ false); 288 } 289 } 290 FlushPartition(size_t partition_id,bool consume,bool grow)291 void FlushPartition(size_t partition_id, bool consume, bool grow) { 292 Super::table_.FlushPartitionEmit( 293 partition_id, consume, grow, 294 [this](const size_t& partition_id, const TableItem& ti) { 295 Key key = Super::MakeTableItem::GetKey( 296 ti, Super::table_.key_extractor()); 297 if (!non_duplicates_[hash_function_(key) % max_hash_]) { 298 299 duplicated_elements_++; 300 Super::emit_.Emit(partition_id, ti); 301 } 302 else { 303 non_duplicate_elements_++; 304 Super::emit_.Emit(Super::table_.ctx().my_rank(), ti); 305 } 306 }); 307 308 if (Super::table_.has_spilled_data_on_partition(partition_id)) { 309 data::File::Reader reader = 310 Super::table_.partition_files()[partition_id].GetReader(true); 311 while (reader.HasNext()) { 312 TableItem ti = reader.Next<TableItem>(); 313 Key key = Super::MakeTableItem::GetKey( 314 ti, Super::table_.key_extractor()); 315 if (!non_duplicates_[hash_function_(key) % max_hash_]) { 316 317 duplicated_elements_++; 318 Super::emit_.Emit(partition_id, ti); 319 } 320 else { 321 non_duplicate_elements_++; 322 Super::emit_.Emit(Super::table_.ctx().my_rank(), ti); 323 } 324 } 325 } 326 327 // flush elements pushed into emitter 328 Super::emit_.Flush(partition_id); 329 Super::emit_.Flush(Super::table_.ctx().my_rank()); 330 } 331 332 //! \name Duplicate Detection 333 //! \{ 334 335 HashFunction hash_function_; 336 //! Hashes of all keys. 337 std::vector<size_t> hashes_; 338 //! All elements occuring on more than one worker. (Elements not appearing here 339 //! can be reduced locally) 340 std::vector<bool> non_duplicates_; 341 //! Modulo for all hashes in duplicate detection to reduce hash space. 342 size_t max_hash_; 343 344 size_t duplicated_elements_ = 0; 345 size_t non_duplicate_elements_ = 0; 346 347 //! \} 348 }; 349 350 } // namespace core 351 } // namespace thrill 352 353 #endif // !THRILL_CORE_REDUCE_PRE_PHASE_HEADER 354 355 /******************************************************************************/ 356