1 /*******************************************************************************
2  * thrill/data/mix_block_queue.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #pragma once
12 #ifndef THRILL_DATA_MIX_BLOCK_QUEUE_HEADER
13 #define THRILL_DATA_MIX_BLOCK_QUEUE_HEADER
14 
15 #include <thrill/common/atomic_movable.hpp>
16 #include <thrill/common/concurrent_bounded_queue.hpp>
17 #include <thrill/common/logger.hpp>
18 #include <thrill/data/block.hpp>
19 #include <thrill/data/block_queue.hpp>
20 #include <thrill/data/block_reader.hpp>
21 #include <thrill/data/cat_block_source.hpp>
22 #include <thrill/data/dyn_block_reader.hpp>
23 #include <thrill/data/file.hpp>
24 
25 #include <vector>
26 
27 namespace thrill {
28 namespace data {
29 
30 //! \addtogroup data_layer
31 //! \{
32 
33 class MixStreamData;
34 class MixBlockQueueReader;
35 
36 using MixStreamDataPtr = tlx::CountingPtr<MixStreamData>;
37 
38 /*!
39  * Implements reading an unordered sequence of items from multiple workers,
40  * which sends Blocks. This class is mainly used to implement MixChannel.
41  *
42  * When Blocks arrive from the net, the Multiplexer pushes (src, Blocks) pairs
43  * to MixChannel, which pushes them into a MixBlockQueue. The
44  * MixBlockQueue stores these in a ConcurrentBoundedQueue for atomic reading.
45  *
46  * When the MixChannel should be read, MixBlockQueueReader is used, which
47  * retrieves Blocks from the queue. The Reader contains one complete BlockReader
48  * for each inbound worker, and these BlockReaders are attached to BlockQueue
49  * instances inside the MixBlockQueue.
50  *
51  * To enable unordered reading from multiple workers, the only remaining thing
52  * to do is to fetch Blocks from the main mix queue and put them into the
53  * right BlockQueue for the sub-readers to consume. By taking the Blocks from
54  * the main mix queue, the Reader only blocks when no inbound Blocks are
55  * available.
56  *
57  * To enable switching between items from different workers, the
58  * MixBlockQueueReader keeps track of how many _whole_ items are available on
59  * each reader. This number is simply -1 of the number of items known to start
60  * in the received blocks. The last item _may_ span further Blocks, and cannot
61  * be fetched without infinitely blocking the sub-reader, since no thread will
62  * deliver the next Block.
63  */
64 class MixBlockQueue
65 {
66     static constexpr bool debug = false;
67 
68 public:
69     //! pair of (source worker, Block) stored in the main mix queue.
70     struct SrcBlockPair {
71         size_t src;
72         Block  block;
73     };
74 
75     using Reader = MixBlockQueueReader;
76 
77     //! Constructor from BlockPool
78     MixBlockQueue(BlockPool& block_pool, size_t num_workers,
79                   size_t local_worker_id, size_t dia_id);
80 
81     //! non-copyable: delete copy-constructor
82     MixBlockQueue(const MixBlockQueue&) = delete;
83     //! non-copyable: delete assignment operator
84     MixBlockQueue& operator = (const MixBlockQueue&) = delete;
85     //! move-constructor: default
86     MixBlockQueue(MixBlockQueue&&) = default;
87     //! move-assignment operator: default
88     MixBlockQueue& operator = (MixBlockQueue&&) = default;
89 
90     //! change dia_id after construction (needed because it may be unknown at
91     //! construction)
92     void set_dia_id(size_t dia_id);
93 
94     //! return block pool
block_pool()95     BlockPool& block_pool() { return block_pool_; }
96 
97     //! append block delivered via the network from src.
98     void AppendBlock(size_t src, const Block& block);
99 
100     //! append block delivered via the network from src.
101     void AppendBlock(size_t src, Block&& block);
102 
103     //! append closing sentinel block from src (also delivered via the network).
104     void Close(size_t src);
105 
106     //! Blocking retrieval of a (source,block) pair.
107     SrcBlockPair Pop();
108 
109     //! check if writer side Close() was called.
write_closed() const110     bool write_closed() const { return write_open_count_ == 0; }
111 
112     //! check if reader side has returned a closing sentinel block
read_closed() const113     bool read_closed() const { return read_open_ == 0; }
114 
115     //! check if inbound queue is closed
116     bool is_queue_closed(size_t src);
117 
118 private:
119     BlockPool& block_pool_;
120 
121     size_t local_worker_id_;
122 
123     //! the main mix queue, containing the block in the reception order.
124     common::ConcurrentBoundedQueue<SrcBlockPair> mix_queue_;
125 
126     //! total number of workers in system.
127     size_t num_workers_;
128 
129     //! counter on number of writers still open.
130     common::AtomicMovable<size_t> write_open_count_ { num_workers_ };
131 
132     //! flag to test for closed sources
133     std::vector<unsigned char> write_closed_;
134 
135     //! number of times Pop() has not yet returned a closing Block; hence, if we
136     //! received the close message from the writer.
137     size_t read_open_ = num_workers_;
138 
139     //! BlockQueues to deliver blocks to from mix queue.
140     std::vector<BlockQueue> queues_;
141 
142     //! for access to queues_ and other internals.
143     friend class MixBlockQueueReader;
144 };
145 
146 /*!
147  * Reader to retrieve items in unordered sequence from a MixBlockQueue. This
148  * is not a full implementation of _all_ methods available in a normal
149  * BlockReader. Mainly, this is because only retrieval of _whole_ items are
150  * possible. Due to the unordered sequence, these probably have to be all of
151  * equal type as well.
152  *
153  * The Reader supports all combinations of consuming and keeping. However, do
154  * not assume that the second round of reading delivers items in the same order
155  * as the first. This is because once items are cached inside the BlockQueues of
156  * MixBlockQueue, we use a plain CatReader to deliver them again (which is
157  * probably faster as it has a sequential access pattern).
158  *
159  * See \ref MixBlockQueue for more information on how items are read.
160  */
161 class MixBlockQueueReader
162 {
163     static constexpr bool debug = false;
164 
165 public:
166     using CatBlockSource = data::CatBlockSource<DynBlockSource>;
167     using CatBlockReader = BlockReader<CatBlockSource>;
168 
169     MixBlockQueueReader(MixBlockQueue& mix_queue,
170                         bool consume, size_t local_worker_id);
171 
172     //! non-copyable: delete copy-constructor
173     MixBlockQueueReader(const MixBlockQueueReader&) = delete;
174     //! non-copyable: delete assignment operator
175     MixBlockQueueReader& operator = (const MixBlockQueueReader&) = delete;
176     //! move-constructor: default
177     MixBlockQueueReader(MixBlockQueueReader&&) = default;
178     //! move-assignment operator: default
179     MixBlockQueueReader& operator = (MixBlockQueueReader&&) = default;
180 
181     //! Possibly consume unread blocks.
182     ~MixBlockQueueReader();
183 
184     //! HasNext() returns true if at least one more item is available.
HasNext()185     bool HasNext() {
186         if (reread_) return cat_reader_.HasNext();
187 
188         if (available_) return true;
189         if (open_ == 0) return false;
190 
191         return PullBlock();
192     }
193 
194     //! Next() reads a complete item T
195     template <typename T>
Next()196     T Next() {
197         assert(HasNext());
198 
199         if (reread_) {
200             return cat_reader_.template Next<T>();
201         }
202         else {
203             if (!available_) {
204                 if (!PullBlock()) {
205                     throw std::runtime_error(
206                               "Data underflow in MixBlockQueueReader.");
207                 }
208             }
209 
210             assert(available_ > 0);
211             assert(selected_ < readers_.size());
212 
213             --available_;
214             return readers_[selected_].template Next<T>();
215         }
216     }
217 
218 private:
219     //! reference to mix queue
220     MixBlockQueue& mix_queue_;
221 
222     //! flag whether we are rereading the mix queue by reading the files using
223     //! a cat_reader_.
224     const bool reread_;
225 
226     //! \name Attributes for Mix Reading
227     //! \{
228 
229     //! sub-readers for each block queue in mix queue
230     std::vector<BlockQueue::Reader> readers_;
231 
232     //! reader currently selected
233     size_t selected_ = size_t(-1);
234 
235     //! number of available items on the selected reader
236     size_t available_ = 0;
237 
238     //! number of additional items available at reader (excluding current
239     //! available_)
240     std::vector<size_t> available_at_;
241 
242     //! number of readers still open
243     size_t open_ = mix_queue_.num_workers_;
244 
245     //! \}
246 
247     //! for rereading the mix queue: use a cat reader on the embedded
248     //! BlockQueue's files.
249     CatBlockReader cat_reader_ { CatBlockSource() };
250 
251     bool PullBlock();
252 };
253 
254 //! \}
255 
256 } // namespace data
257 } // namespace thrill
258 
259 #endif // !THRILL_DATA_MIX_BLOCK_QUEUE_HEADER
260 
261 /******************************************************************************/
262