1 /*******************************************************************************
2 * thrill/api/reduce_to_index.hpp
3 *
4 * DIANode for a reduce operation. Performs the actual reduce operation
5 *
6 * Part of Project Thrill - http://project-thrill.org
7 *
8 * Copyright (C) 2015 Alexander Noe <aleexnoe@gmail.com>
9 * Copyright (C) 2015 Sebastian Lamm <seba.lamm@gmail.com>
10 * Copyright (C) 2017 Tim Zeitz <dev.tim.zeitz@gmail.com>
11 *
12 * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13 ******************************************************************************/
14
15 #pragma once
16 #ifndef THRILL_API_REDUCE_TO_INDEX_HEADER
17 #define THRILL_API_REDUCE_TO_INDEX_HEADER
18
19 #include <thrill/api/context.hpp>
20 #include <thrill/api/dia.hpp>
21 #include <thrill/api/dop_node.hpp>
22 #include <thrill/common/functional.hpp>
23 #include <thrill/common/logger.hpp>
24 #include <thrill/common/porting.hpp>
25 #include <thrill/core/reduce_by_index_post_phase.hpp>
26 #include <thrill/core/reduce_pre_phase.hpp>
27
28 #include <functional>
29 #include <thread>
30 #include <type_traits>
31 #include <utility>
32 #include <vector>
33
34 namespace thrill {
35 namespace api {
36
37 class DefaultReduceToIndexConfig : public core::DefaultReduceConfig
38 { };
39
40 /*!
41 * A DIANode which performs a ReduceToIndex operation. ReduceToIndex groups the
42 * elements in a DIA by their key and reduces every key bucket to a single
43 * element each. The ReduceToIndexNode stores the key_extractor and the
44 * reduce_function UDFs. The chainable LOps ahead of the Reduce operation are
45 * stored in the Stack. The ReduceToIndexNode has the type ValueType, which is
46 * the result type of the reduce_function. The key type is an unsigned integer
47 * and the output DIA will have element with key K at index K.
48 *
49 * \tparam ParentType Input type of the Reduce operation
50 * \tparam ValueType Output type of the Reduce operation
51 * \tparam ParentStack Function stack, which contains the chained lambdas between the last and this DIANode.
52 * \tparam KeyExtractor Type of the key_extractor function.
53 * \tparam ReduceFunction Type of the reduce_function
54 *
55 * \ingroup api_layer
56 */
57 template <typename ValueType,
58 typename KeyExtractor, typename ReduceFunction,
59 typename ReduceConfig, bool VolatileKey, bool SkipPreReducePhase>
60 class ReduceToIndexNode final : public DOpNode<ValueType>
61 {
62 static constexpr bool debug = false;
63
64 using Super = DOpNode<ValueType>;
65 using Super::context_;
66
67 using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
68
69 using TableItem =
70 typename std::conditional<
71 VolatileKey, std::pair<Key, ValueType>, ValueType>::type;
72
73 static_assert(std::is_same<Key, size_t>::value,
74 "Key must be an unsigned integer");
75
76 static constexpr bool use_mix_stream_ = ReduceConfig::use_mix_stream_;
77 static constexpr bool use_post_thread_ = ReduceConfig::use_post_thread_;
78
79 private:
80 //! Emitter for PostPhase to push elements to next DIA object.
81 class Emitter
82 {
83 public:
Emitter(ReduceToIndexNode * node)84 explicit Emitter(ReduceToIndexNode* node) : node_(node) { }
operator ()(const ValueType & item) const85 void operator () (const ValueType& item) const
86 { return node_->PushItem(item); }
87
88 private:
89 ReduceToIndexNode* node_;
90 };
91
92 public:
93 /*!
94 * Constructor for a ReduceToIndexNode. Sets the parent, stack,
95 * key_extractor and reduce_function.
96 */
97 template <typename ParentDIA>
ReduceToIndexNode(const ParentDIA & parent,const char * label,const KeyExtractor & key_extractor,const ReduceFunction & reduce_function,size_t result_size,const ValueType & neutral_element,const ReduceConfig & config)98 ReduceToIndexNode(const ParentDIA& parent,
99 const char* label,
100 const KeyExtractor& key_extractor,
101 const ReduceFunction& reduce_function,
102 size_t result_size,
103 const ValueType& neutral_element,
104 const ReduceConfig& config)
105 : Super(parent.ctx(), label, { parent.id() }, { parent.node() }),
106 mix_stream_(use_mix_stream_ ?
107 parent.ctx().GetNewMixStream(this) : nullptr),
108 cat_stream_(use_mix_stream_ ?
109 nullptr : parent.ctx().GetNewCatStream(this)),
110 emitters_(use_mix_stream_ ?
111 mix_stream_->GetWriters() : cat_stream_->GetWriters()),
112 result_size_(result_size),
113 pre_phase_(
114 context_, Super::dia_id(), context_.num_workers(),
115 key_extractor, reduce_function, emitters_,
116 config, core::ReduceByIndex<Key>(0, result_size)),
117 post_phase_(
118 context_, Super::dia_id(),
119 key_extractor, reduce_function, Emitter(this),
120 config, neutral_element) {
121 // Hook PreOp: Locally hash elements of the current DIA onto buckets and
122 // reduce each bucket to a single value, afterwards send data to another
123 // worker given by the shuffle algorithm.
__anon8ffd3e5d0102(const ValueType& input) 124 auto pre_op_fn = [this](const ValueType& input) {
125 if (SkipPreReducePhase)
126 pre_phase_.InsertSkip(input);
127 else
128 pre_phase_.Insert(input);
129 };
130
131 // close the function stack with our pre op and register it at parent
132 // node for output
133 auto lop_chain = parent.stack().push(pre_op_fn).fold();
134 parent.node()->AddChild(this, lop_chain);
135 }
136
PreOpMemUse()137 DIAMemUse PreOpMemUse() final {
138 // request maximum RAM limit, the value is calculated by StageBuilder,
139 // and set as DIABase::mem_limit_.
140 return DIAMemUse::Max();
141 }
142
StartPreOp(size_t)143 void StartPreOp(size_t /* parent_index */) final {
144 if (!use_post_thread_) {
145 // use pre_phase without extra thread
146 if (!SkipPreReducePhase)
147 pre_phase_.Initialize(DIABase::mem_limit_);
148 else
149 pre_phase_.InitializeSkip();
150
151 // re-parameterize with resulting key range on this worker - this is
152 // only known after Initialize() of the pre_phase_.
153 post_phase_.SetRange(pre_phase_.key_range(context_.my_rank()));
154 }
155 else {
156 if (!SkipPreReducePhase)
157 pre_phase_.Initialize(DIABase::mem_limit_ / 2);
158 else
159 pre_phase_.InitializeSkip();
160
161 // re-parameterize with resulting key range on this worker - this is
162 // only know after Initialize() of the pre_phase_.
163 post_phase_.SetRange(pre_phase_.key_range(context_.my_rank()));
164 post_phase_.Initialize(DIABase::mem_limit_ / 2);
165
166 // start additional thread to receive from the channel
167 thread_ = common::CreateThread([this] { ProcessChannel(); });
168 }
169 }
170
StopPreOp(size_t)171 void StopPreOp(size_t /* parent_index */) final {
172 LOG << *this << " running StopPreOp";
173 // Flush hash table before the postOp
174 if (!SkipPreReducePhase)
175 pre_phase_.FlushAll();
176 pre_phase_.CloseAll();
177 if (use_post_thread_) {
178 // waiting for the additional thread to finish the reduce
179 thread_.join();
180 // deallocate stream if already processed
181 use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
182 }
183 }
184
Execute()185 void Execute() final { }
186
PushDataMemUse()187 DIAMemUse PushDataMemUse() final {
188 return DIAMemUse::Max();
189 }
190
PushData(bool consume)191 void PushData(bool consume) final {
192
193 if (!use_post_thread_ && !reduced_) {
194 // not final reduced, and no additional thread, perform post reduce
195 post_phase_.Initialize(DIABase::mem_limit_);
196 ProcessChannel();
197
198 // deallocate stream if already processed
199 use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
200
201 reduced_ = true;
202 }
203 post_phase_.PushData(consume);
204 }
205
206 //! process the inbound data in the post reduce phase
ProcessChannel()207 void ProcessChannel() {
208 if (use_mix_stream_)
209 {
210 auto reader = mix_stream_->GetMixReader(/* consume */ true);
211 sLOG << "reading data from" << mix_stream_->id()
212 << "to push into post table which flushes to" << this->dia_id();
213 while (reader.HasNext()) {
214 post_phase_.Insert(reader.template Next<TableItem>());
215 }
216 }
217 else
218 {
219 auto reader = cat_stream_->GetCatReader(/* consume */ true);
220 sLOG << "reading data from" << cat_stream_->id()
221 << "to push into post table which flushes to" << this->dia_id();
222 while (reader.HasNext()) {
223 post_phase_.Insert(reader.template Next<TableItem>());
224 }
225 }
226 }
227
Dispose()228 void Dispose() final {
229 post_phase_.Dispose();
230 }
231
232 private:
233 // pointers for both Mix and CatStream. only one is used, the other costs
234 // only a null pointer.
235 data::MixStreamPtr mix_stream_;
236 data::CatStreamPtr cat_stream_;
237
238 data::Stream::Writers emitters_;
239
240 size_t result_size_;
241
242 //! handle to additional thread for post phase
243 std::thread thread_;
244
245 core::ReducePrePhase<
246 TableItem, Key, ValueType, KeyExtractor, ReduceFunction, VolatileKey,
247 data::Stream::Writer, ReduceConfig, core::ReduceByIndex<Key>
248 > pre_phase_;
249
250 core::ReduceByIndexPostPhase<
251 TableItem, Key, ValueType, KeyExtractor, ReduceFunction, Emitter,
252 VolatileKey, ReduceConfig> post_phase_;
253
254 bool reduced_ = false;
255 };
256
257 template <typename ValueType, typename Stack>
258 template <typename KeyExtractor, typename ReduceFunction, typename ReduceConfig>
ReduceToIndex(const KeyExtractor & key_extractor,const ReduceFunction & reduce_function,size_t size,const ValueType & neutral_element,const ReduceConfig & reduce_config) const259 auto DIA<ValueType, Stack>::ReduceToIndex(
260 const KeyExtractor& key_extractor,
261 const ReduceFunction& reduce_function,
262 size_t size,
263 const ValueType& neutral_element,
264 const ReduceConfig& reduce_config) const {
265 // forward to main function
266 return ReduceToIndex(
267 NoVolatileKeyTag,
268 key_extractor, reduce_function, size, neutral_element, reduce_config);
269 }
270
271 template <typename ValueType, typename Stack>
272 template <bool VolatileKeyValue,
273 typename KeyExtractor, typename ReduceFunction, typename ReduceConfig>
ReduceToIndex(const VolatileKeyFlag<VolatileKeyValue> &,const KeyExtractor & key_extractor,const ReduceFunction & reduce_function,size_t size,const ValueType & neutral_element,const ReduceConfig & reduce_config) const274 auto DIA<ValueType, Stack>::ReduceToIndex(
275 const VolatileKeyFlag<VolatileKeyValue>&,
276 const KeyExtractor& key_extractor,
277 const ReduceFunction& reduce_function,
278 size_t size,
279 const ValueType& neutral_element,
280 const ReduceConfig& reduce_config) const {
281 assert(IsValid());
282
283 using DOpResult
284 = typename common::FunctionTraits<ReduceFunction>::result_type;
285
286 static_assert(
287 std::is_convertible<
288 ValueType,
289 typename common::FunctionTraits<ReduceFunction>::template arg<0>
290 >::value,
291 "ReduceFunction has the wrong input type");
292
293 static_assert(
294 std::is_convertible<
295 ValueType,
296 typename common::FunctionTraits<ReduceFunction>::template arg<1>
297 >::value,
298 "ReduceFunction has the wrong input type");
299
300 static_assert(
301 std::is_same<
302 DOpResult,
303 ValueType>::value,
304 "ReduceFunction has the wrong output type");
305
306 static_assert(
307 std::is_same<
308 typename std::decay<typename common::FunctionTraits<KeyExtractor>::
309 template arg<0> >::type,
310 ValueType>::value,
311 "KeyExtractor has the wrong input type");
312
313 static_assert(
314 std::is_same<
315 typename common::FunctionTraits<KeyExtractor>::result_type,
316 size_t>::value,
317 "The key has to be an unsigned long int (aka. size_t).");
318
319 using ReduceNode = ReduceToIndexNode<
320 DOpResult, KeyExtractor, ReduceFunction, ReduceConfig,
321 VolatileKeyValue, /* SkipPreReducePhase */ false>;
322
323 auto node = tlx::make_counting<ReduceNode>(
324 *this, "ReduceToIndex", key_extractor, reduce_function,
325 size, neutral_element, reduce_config);
326
327 return DIA<DOpResult>(node);
328 }
329
330 template <typename ValueType, typename Stack>
331 template <typename KeyExtractor, typename ReduceFunction, typename ReduceConfig>
ReduceToIndex(const struct SkipPreReducePhaseTag &,const KeyExtractor & key_extractor,const ReduceFunction & reduce_function,size_t size,const ValueType & neutral_element,const ReduceConfig & reduce_config) const332 auto DIA<ValueType, Stack>::ReduceToIndex(
333 const struct SkipPreReducePhaseTag&,
334 const KeyExtractor& key_extractor,
335 const ReduceFunction& reduce_function,
336 size_t size,
337 const ValueType& neutral_element,
338 const ReduceConfig& reduce_config) const {
339 assert(IsValid());
340
341 using DOpResult
342 = typename common::FunctionTraits<ReduceFunction>::result_type;
343
344 static_assert(
345 std::is_convertible<
346 ValueType,
347 typename common::FunctionTraits<ReduceFunction>::template arg<0>
348 >::value,
349 "ReduceFunction has the wrong input type");
350
351 static_assert(
352 std::is_convertible<
353 ValueType,
354 typename common::FunctionTraits<ReduceFunction>::template arg<1>
355 >::value,
356 "ReduceFunction has the wrong input type");
357
358 static_assert(
359 std::is_same<
360 DOpResult,
361 ValueType>::value,
362 "ReduceFunction has the wrong output type");
363
364 static_assert(
365 std::is_same<
366 typename std::decay<typename common::FunctionTraits<KeyExtractor>::
367 template arg<0> >::type,
368 ValueType>::value,
369 "KeyExtractor has the wrong input type");
370
371 static_assert(
372 std::is_same<
373 typename common::FunctionTraits<KeyExtractor>::result_type,
374 size_t>::value,
375 "The key has to be an unsigned long int (aka. size_t).");
376
377 using ReduceNode = ReduceToIndexNode<
378 DOpResult, KeyExtractor, ReduceFunction, ReduceConfig,
379 /* VolatileKey */ false, /* SkipPreReducePhase */ true>;
380
381 auto node = tlx::make_counting<ReduceNode>(
382 *this, "ReduceToIndex", key_extractor, reduce_function,
383 size, neutral_element, reduce_config);
384
385 return DIA<DOpResult>(node);
386 }
387
388 } // namespace api
389 } // namespace thrill
390
391 #endif // !THRILL_API_REDUCE_TO_INDEX_HEADER
392
393 /******************************************************************************/
394