1 /*******************************************************************************
2  * thrill/api/reduce_to_index.hpp
3  *
4  * DIANode for a reduce operation. Performs the actual reduce operation
5  *
6  * Part of Project Thrill - http://project-thrill.org
7  *
8  * Copyright (C) 2015 Alexander Noe <aleexnoe@gmail.com>
9  * Copyright (C) 2015 Sebastian Lamm <seba.lamm@gmail.com>
10  * Copyright (C) 2017 Tim Zeitz <dev.tim.zeitz@gmail.com>
11  *
12  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
13  ******************************************************************************/
14 
15 #pragma once
16 #ifndef THRILL_API_REDUCE_TO_INDEX_HEADER
17 #define THRILL_API_REDUCE_TO_INDEX_HEADER
18 
19 #include <thrill/api/context.hpp>
20 #include <thrill/api/dia.hpp>
21 #include <thrill/api/dop_node.hpp>
22 #include <thrill/common/functional.hpp>
23 #include <thrill/common/logger.hpp>
24 #include <thrill/common/porting.hpp>
25 #include <thrill/core/reduce_by_index_post_phase.hpp>
26 #include <thrill/core/reduce_pre_phase.hpp>
27 
28 #include <functional>
29 #include <thread>
30 #include <type_traits>
31 #include <utility>
32 #include <vector>
33 
34 namespace thrill {
35 namespace api {
36 
37 class DefaultReduceToIndexConfig : public core::DefaultReduceConfig
38 { };
39 
40 /*!
41  * A DIANode which performs a ReduceToIndex operation. ReduceToIndex groups the
42  * elements in a DIA by their key and reduces every key bucket to a single
43  * element each. The ReduceToIndexNode stores the key_extractor and the
44  * reduce_function UDFs. The chainable LOps ahead of the Reduce operation are
45  * stored in the Stack. The ReduceToIndexNode has the type ValueType, which is
46  * the result type of the reduce_function. The key type is an unsigned integer
47  * and the output DIA will have element with key K at index K.
48  *
49  * \tparam ParentType Input type of the Reduce operation
50  * \tparam ValueType Output type of the Reduce operation
51  * \tparam ParentStack Function stack, which contains the chained lambdas between the last and this DIANode.
52  * \tparam KeyExtractor Type of the key_extractor function.
53  * \tparam ReduceFunction Type of the reduce_function
54  *
55  * \ingroup api_layer
56  */
57 template <typename ValueType,
58           typename KeyExtractor, typename ReduceFunction,
59           typename ReduceConfig, bool VolatileKey, bool SkipPreReducePhase>
60 class ReduceToIndexNode final : public DOpNode<ValueType>
61 {
62     static constexpr bool debug = false;
63 
64     using Super = DOpNode<ValueType>;
65     using Super::context_;
66 
67     using Key = typename common::FunctionTraits<KeyExtractor>::result_type;
68 
69     using TableItem =
70         typename std::conditional<
71             VolatileKey, std::pair<Key, ValueType>, ValueType>::type;
72 
73     static_assert(std::is_same<Key, size_t>::value,
74                   "Key must be an unsigned integer");
75 
76     static constexpr bool use_mix_stream_ = ReduceConfig::use_mix_stream_;
77     static constexpr bool use_post_thread_ = ReduceConfig::use_post_thread_;
78 
79 private:
80     //! Emitter for PostPhase to push elements to next DIA object.
81     class Emitter
82     {
83     public:
Emitter(ReduceToIndexNode * node)84         explicit Emitter(ReduceToIndexNode* node) : node_(node) { }
operator ()(const ValueType & item) const85         void operator () (const ValueType& item) const
86         { return node_->PushItem(item); }
87 
88     private:
89         ReduceToIndexNode* node_;
90     };
91 
92 public:
93     /*!
94      * Constructor for a ReduceToIndexNode. Sets the parent, stack,
95      * key_extractor and reduce_function.
96      */
97     template <typename ParentDIA>
ReduceToIndexNode(const ParentDIA & parent,const char * label,const KeyExtractor & key_extractor,const ReduceFunction & reduce_function,size_t result_size,const ValueType & neutral_element,const ReduceConfig & config)98     ReduceToIndexNode(const ParentDIA& parent,
99                       const char* label,
100                       const KeyExtractor& key_extractor,
101                       const ReduceFunction& reduce_function,
102                       size_t result_size,
103                       const ValueType& neutral_element,
104                       const ReduceConfig& config)
105         : Super(parent.ctx(), label, { parent.id() }, { parent.node() }),
106           mix_stream_(use_mix_stream_ ?
107                       parent.ctx().GetNewMixStream(this) : nullptr),
108           cat_stream_(use_mix_stream_ ?
109                       nullptr : parent.ctx().GetNewCatStream(this)),
110           emitters_(use_mix_stream_ ?
111                     mix_stream_->GetWriters() : cat_stream_->GetWriters()),
112           result_size_(result_size),
113           pre_phase_(
114               context_, Super::dia_id(), context_.num_workers(),
115               key_extractor, reduce_function, emitters_,
116               config, core::ReduceByIndex<Key>(0, result_size)),
117           post_phase_(
118               context_, Super::dia_id(),
119               key_extractor, reduce_function, Emitter(this),
120               config, neutral_element) {
121         // Hook PreOp: Locally hash elements of the current DIA onto buckets and
122         // reduce each bucket to a single value, afterwards send data to another
123         // worker given by the shuffle algorithm.
__anon8ffd3e5d0102(const ValueType& input) 124         auto pre_op_fn = [this](const ValueType& input) {
125                              if (SkipPreReducePhase)
126                                  pre_phase_.InsertSkip(input);
127                              else
128                                  pre_phase_.Insert(input);
129                          };
130 
131         // close the function stack with our pre op and register it at parent
132         // node for output
133         auto lop_chain = parent.stack().push(pre_op_fn).fold();
134         parent.node()->AddChild(this, lop_chain);
135     }
136 
PreOpMemUse()137     DIAMemUse PreOpMemUse() final {
138         // request maximum RAM limit, the value is calculated by StageBuilder,
139         // and set as DIABase::mem_limit_.
140         return DIAMemUse::Max();
141     }
142 
StartPreOp(size_t)143     void StartPreOp(size_t /* parent_index */) final {
144         if (!use_post_thread_) {
145             // use pre_phase without extra thread
146             if (!SkipPreReducePhase)
147                 pre_phase_.Initialize(DIABase::mem_limit_);
148             else
149                 pre_phase_.InitializeSkip();
150 
151             // re-parameterize with resulting key range on this worker - this is
152             // only known after Initialize() of the pre_phase_.
153             post_phase_.SetRange(pre_phase_.key_range(context_.my_rank()));
154         }
155         else {
156             if (!SkipPreReducePhase)
157                 pre_phase_.Initialize(DIABase::mem_limit_ / 2);
158             else
159                 pre_phase_.InitializeSkip();
160 
161             // re-parameterize with resulting key range on this worker - this is
162             // only know after Initialize() of the pre_phase_.
163             post_phase_.SetRange(pre_phase_.key_range(context_.my_rank()));
164             post_phase_.Initialize(DIABase::mem_limit_ / 2);
165 
166             // start additional thread to receive from the channel
167             thread_ = common::CreateThread([this] { ProcessChannel(); });
168         }
169     }
170 
StopPreOp(size_t)171     void StopPreOp(size_t /* parent_index */) final {
172         LOG << *this << " running StopPreOp";
173         // Flush hash table before the postOp
174         if (!SkipPreReducePhase)
175             pre_phase_.FlushAll();
176         pre_phase_.CloseAll();
177         if (use_post_thread_) {
178             // waiting for the additional thread to finish the reduce
179             thread_.join();
180             // deallocate stream if already processed
181             use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
182         }
183     }
184 
Execute()185     void Execute() final { }
186 
PushDataMemUse()187     DIAMemUse PushDataMemUse() final {
188         return DIAMemUse::Max();
189     }
190 
PushData(bool consume)191     void PushData(bool consume) final {
192 
193         if (!use_post_thread_ && !reduced_) {
194             // not final reduced, and no additional thread, perform post reduce
195             post_phase_.Initialize(DIABase::mem_limit_);
196             ProcessChannel();
197 
198             // deallocate stream if already processed
199             use_mix_stream_ ? mix_stream_.reset() : cat_stream_.reset();
200 
201             reduced_ = true;
202         }
203         post_phase_.PushData(consume);
204     }
205 
206     //! process the inbound data in the post reduce phase
ProcessChannel()207     void ProcessChannel() {
208         if (use_mix_stream_)
209         {
210             auto reader = mix_stream_->GetMixReader(/* consume */ true);
211             sLOG << "reading data from" << mix_stream_->id()
212                  << "to push into post table which flushes to" << this->dia_id();
213             while (reader.HasNext()) {
214                 post_phase_.Insert(reader.template Next<TableItem>());
215             }
216         }
217         else
218         {
219             auto reader = cat_stream_->GetCatReader(/* consume */ true);
220             sLOG << "reading data from" << cat_stream_->id()
221                  << "to push into post table which flushes to" << this->dia_id();
222             while (reader.HasNext()) {
223                 post_phase_.Insert(reader.template Next<TableItem>());
224             }
225         }
226     }
227 
Dispose()228     void Dispose() final {
229         post_phase_.Dispose();
230     }
231 
232 private:
233     // pointers for both Mix and CatStream. only one is used, the other costs
234     // only a null pointer.
235     data::MixStreamPtr mix_stream_;
236     data::CatStreamPtr cat_stream_;
237 
238     data::Stream::Writers emitters_;
239 
240     size_t result_size_;
241 
242     //! handle to additional thread for post phase
243     std::thread thread_;
244 
245     core::ReducePrePhase<
246         TableItem, Key, ValueType, KeyExtractor, ReduceFunction, VolatileKey,
247         data::Stream::Writer, ReduceConfig, core::ReduceByIndex<Key>
248         > pre_phase_;
249 
250     core::ReduceByIndexPostPhase<
251         TableItem, Key, ValueType, KeyExtractor, ReduceFunction, Emitter,
252         VolatileKey, ReduceConfig> post_phase_;
253 
254     bool reduced_ = false;
255 };
256 
257 template <typename ValueType, typename Stack>
258 template <typename KeyExtractor, typename ReduceFunction, typename ReduceConfig>
ReduceToIndex(const KeyExtractor & key_extractor,const ReduceFunction & reduce_function,size_t size,const ValueType & neutral_element,const ReduceConfig & reduce_config) const259 auto DIA<ValueType, Stack>::ReduceToIndex(
260     const KeyExtractor& key_extractor,
261     const ReduceFunction& reduce_function,
262     size_t size,
263     const ValueType& neutral_element,
264     const ReduceConfig& reduce_config) const {
265     // forward to main function
266     return ReduceToIndex(
267         NoVolatileKeyTag,
268         key_extractor, reduce_function, size, neutral_element, reduce_config);
269 }
270 
271 template <typename ValueType, typename Stack>
272 template <bool VolatileKeyValue,
273           typename KeyExtractor, typename ReduceFunction, typename ReduceConfig>
ReduceToIndex(const VolatileKeyFlag<VolatileKeyValue> &,const KeyExtractor & key_extractor,const ReduceFunction & reduce_function,size_t size,const ValueType & neutral_element,const ReduceConfig & reduce_config) const274 auto DIA<ValueType, Stack>::ReduceToIndex(
275     const VolatileKeyFlag<VolatileKeyValue>&,
276     const KeyExtractor& key_extractor,
277     const ReduceFunction& reduce_function,
278     size_t size,
279     const ValueType& neutral_element,
280     const ReduceConfig& reduce_config) const {
281     assert(IsValid());
282 
283     using DOpResult
284         = typename common::FunctionTraits<ReduceFunction>::result_type;
285 
286     static_assert(
287         std::is_convertible<
288             ValueType,
289             typename common::FunctionTraits<ReduceFunction>::template arg<0>
290             >::value,
291         "ReduceFunction has the wrong input type");
292 
293     static_assert(
294         std::is_convertible<
295             ValueType,
296             typename common::FunctionTraits<ReduceFunction>::template arg<1>
297             >::value,
298         "ReduceFunction has the wrong input type");
299 
300     static_assert(
301         std::is_same<
302             DOpResult,
303             ValueType>::value,
304         "ReduceFunction has the wrong output type");
305 
306     static_assert(
307         std::is_same<
308             typename std::decay<typename common::FunctionTraits<KeyExtractor>::
309                                 template arg<0> >::type,
310             ValueType>::value,
311         "KeyExtractor has the wrong input type");
312 
313     static_assert(
314         std::is_same<
315             typename common::FunctionTraits<KeyExtractor>::result_type,
316             size_t>::value,
317         "The key has to be an unsigned long int (aka. size_t).");
318 
319     using ReduceNode = ReduceToIndexNode<
320         DOpResult, KeyExtractor, ReduceFunction, ReduceConfig,
321         VolatileKeyValue, /* SkipPreReducePhase */ false>;
322 
323     auto node = tlx::make_counting<ReduceNode>(
324         *this, "ReduceToIndex", key_extractor, reduce_function,
325         size, neutral_element, reduce_config);
326 
327     return DIA<DOpResult>(node);
328 }
329 
330 template <typename ValueType, typename Stack>
331 template <typename KeyExtractor, typename ReduceFunction, typename ReduceConfig>
ReduceToIndex(const struct SkipPreReducePhaseTag &,const KeyExtractor & key_extractor,const ReduceFunction & reduce_function,size_t size,const ValueType & neutral_element,const ReduceConfig & reduce_config) const332 auto DIA<ValueType, Stack>::ReduceToIndex(
333     const struct SkipPreReducePhaseTag&,
334     const KeyExtractor& key_extractor,
335     const ReduceFunction& reduce_function,
336     size_t size,
337     const ValueType& neutral_element,
338     const ReduceConfig& reduce_config) const {
339     assert(IsValid());
340 
341     using DOpResult
342         = typename common::FunctionTraits<ReduceFunction>::result_type;
343 
344     static_assert(
345         std::is_convertible<
346             ValueType,
347             typename common::FunctionTraits<ReduceFunction>::template arg<0>
348             >::value,
349         "ReduceFunction has the wrong input type");
350 
351     static_assert(
352         std::is_convertible<
353             ValueType,
354             typename common::FunctionTraits<ReduceFunction>::template arg<1>
355             >::value,
356         "ReduceFunction has the wrong input type");
357 
358     static_assert(
359         std::is_same<
360             DOpResult,
361             ValueType>::value,
362         "ReduceFunction has the wrong output type");
363 
364     static_assert(
365         std::is_same<
366             typename std::decay<typename common::FunctionTraits<KeyExtractor>::
367                                 template arg<0> >::type,
368             ValueType>::value,
369         "KeyExtractor has the wrong input type");
370 
371     static_assert(
372         std::is_same<
373             typename common::FunctionTraits<KeyExtractor>::result_type,
374             size_t>::value,
375         "The key has to be an unsigned long int (aka. size_t).");
376 
377     using ReduceNode = ReduceToIndexNode<
378         DOpResult, KeyExtractor, ReduceFunction, ReduceConfig,
379         /* VolatileKey */ false, /* SkipPreReducePhase */ true>;
380 
381     auto node = tlx::make_counting<ReduceNode>(
382         *this, "ReduceToIndex", key_extractor, reduce_function,
383         size, neutral_element, reduce_config);
384 
385     return DIA<DOpResult>(node);
386 }
387 
388 } // namespace api
389 } // namespace thrill
390 
391 #endif // !THRILL_API_REDUCE_TO_INDEX_HEADER
392 
393 /******************************************************************************/
394