1 /*******************************************************************************
2  * thrill/data/block_queue.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Tobias Sturm <mail@tobiassturm.de>
7  * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_DATA_BLOCK_QUEUE_HEADER
14 #define THRILL_DATA_BLOCK_QUEUE_HEADER
15 
16 #include <thrill/common/atomic_movable.hpp>
17 #include <thrill/common/concurrent_bounded_queue.hpp>
18 #include <thrill/common/stats_timer.hpp>
19 #include <thrill/data/block.hpp>
20 #include <thrill/data/block_reader.hpp>
21 #include <thrill/data/block_writer.hpp>
22 #include <thrill/data/dyn_block_reader.hpp>
23 #include <thrill/data/file.hpp>
24 
25 #include <atomic>
26 
27 namespace thrill {
28 namespace data {
29 
30 //! \addtogroup data_layer
31 //! \{
32 
33 class BlockQueueSink;
34 class ConsumeBlockQueueSource;
35 
36 /*!
37  * A BlockQueue is a thread-safe queue used to hand-over Block objects between
38  * threads. It is currently used by the Multiplexer to queue received Blocks and
39  * deliver them (later) to their destination.
40  *
41  * The BlockQueue itself is also a BlockSink (so one can attach a BlockWriter to
42  * it). To read items from the queue, one needs to use a BlockReader
43  * instantiated with a BlockQueueSource.  Both are easily available via
44  * GetWriter() and GetReader().  Each block is available only *once* via the
45  * BlockQueueSource.
46  */
47 class BlockQueue final : public BlockSink
48 {
49 public:
50     static constexpr bool debug = false;
51 
52     using Writer = BlockWriter<BlockQueueSink>;
53     using Reader = DynBlockReader;
54     using ConsumeReader = BlockReader<ConsumeBlockQueueSource>;
55 
56     using CloseCallback = tlx::delegate<void (BlockQueue&)>;
57 
58     //! Constructor from BlockPool
59     BlockQueue(BlockPool& block_pool, size_t local_worker_id,
60                size_t dia_id,
61                const CloseCallback& close_callback = CloseCallback());
62 
63     //! non-copyable: delete copy-constructor
64     BlockQueue(const BlockQueue&) = delete;
65     //! non-copyable: delete assignment operator
66     BlockQueue& operator = (const BlockQueue&) = delete;
67     //! move-constructor: default
68     BlockQueue(BlockQueue&&) = default;
69     //! move-assignment operator: default
70     BlockQueue& operator = (BlockQueue&&) = default;
71 
AppendBlock(const Block & b,bool)72     void AppendBlock(const Block& b, bool /* is_last_block */) final {
73         LOG << "BlockQueue::AppendBlock() " << b;
74         item_counter_ += b.num_items();
75         byte_counter_ += b.size();
76         block_counter_++;
77         queue_.emplace(b);
78     }
AppendBlock(Block && b,bool)79     void AppendBlock(Block&& b, bool /* is_last_block */) final {
80         LOG << "BlockQueue::AppendBlock() move " << b;
81         item_counter_ += b.num_items();
82         byte_counter_ += b.size();
83         block_counter_++;
84         queue_.emplace(std::move(b));
85     }
86 
87     //! Close called by BlockWriter.
88     void Close() final;
89 
90     static constexpr bool allocate_can_fail_ = false;
91 
Pop()92     Block Pop() {
93         if (read_closed_) return Block();
94         Block b;
95         queue_.pop(b);
96         read_closed_ = !b.IsValid();
97         return b;
98     }
99 
100     //! change dia_id after construction (needed because it may be unknown at
101     //! construction)
set_dia_id(size_t dia_id)102     void set_dia_id(size_t dia_id) {
103         file_.set_dia_id(dia_id);
104     }
105 
106     //! set the close callback
set_close_callback(const CloseCallback & cb)107     void set_close_callback(const CloseCallback& cb) {
108         close_callback_ = cb;
109     }
110 
111     //! check if writer side Close() was called.
write_closed() const112     bool write_closed() const { return write_closed_; }
113 
empty() const114     bool empty() const { return queue_.empty(); }
115 
116     //! check if reader side has returned a closing sentinel block
read_closed() const117     bool read_closed() const { return read_closed_; }
118 
119     //! return number of block in the queue. Use this ONLY for DEBUGGING!
size()120     size_t size() { return queue_.size() - (write_closed() ? 1 : 0); }
121 
122     //! Returns item_counter_
item_counter() const123     size_t item_counter() const { return item_counter_; }
124     //! Returns byte_counter_
byte_counter() const125     size_t byte_counter() const { return byte_counter_; }
126     //! Returns block_counter_
block_counter() const127     size_t block_counter() const { return block_counter_; }
128     //! Returns timespan_
timespan() const129     const common::StatsTimer& timespan() const { return timespan_; }
130 
131     //! Return a BlockWriter delivering to this BlockQueue.
132     Writer GetWriter(size_t block_size = default_block_size);
133 
134     //! return BlockReader specifically for a BlockQueue
135     ConsumeReader GetConsumeReader(size_t local_worker_id);
136 
137     //! return polymorphic BlockSource variant
138     DynBlockSource GetBlockSource(bool consume, size_t local_worker_id);
139 
140     //! return polymorphic BlockReader variant
141     Reader GetReader(bool consume, size_t local_worker_id);
142 
143 private:
144     common::ConcurrentBoundedQueue<Block> queue_;
145 
146     common::AtomicMovable<bool> write_closed_ = { false };
147 
148     //! whether Pop() has returned a closing Block; hence, if we received the
149     //! close message from the writer
150     bool read_closed_ = false;
151 
152     //! number of items transfered by the Queue
153     size_t item_counter_ = 0;
154     //! number of bytes transfered by the Queue
155     size_t byte_counter_ = 0;
156     //! number of blocks transfered by the Queue
157     size_t block_counter_ = 0;
158     //! timespan of existance
159     common::StatsTimerStart timespan_;
160 
161     //! File to cache blocks for implementing CacheBlockQueueSource.
162     File file_;
163 
164     //! callback to issue when the writer closes the Queue -- for delivering
165     //! stats
166     CloseCallback close_callback_;
167 
168     //! for access to file_
169     friend class CacheBlockQueueSource;
170 };
171 
172 /*!
173  * BlockSink which interfaces to a File
174  */
175 class BlockQueueSink final : public BlockSink
176 {
177     static constexpr bool debug = false;
178 
179 public:
BlockQueueSink()180     BlockQueueSink()
181         : BlockSink(nullptr, -1), queue_(nullptr)
182     { }
183 
BlockQueueSink(BlockQueue * queue)184     explicit BlockQueueSink(BlockQueue* queue)
185         : BlockSink(queue->block_pool(), queue->local_worker_id()),
186           queue_(std::move(queue)) {
187         LOG << "BlockQueueSink() new for " << queue;
188     }
189 
190     //! default copy-constructor
191     BlockQueueSink(const BlockQueueSink&) = default;
192     //! default assignment operator
193     BlockQueueSink& operator = (const BlockQueueSink&) = default;
194 
~BlockQueueSink()195     ~BlockQueueSink() {
196         LOG << "~BlockQueueSink() for " << queue_;
197     }
198 
199     //! \name Methods of a BlockSink
200     //! \{
201 
202     //! Append a block to this file, the block must contain given number of
203     //! items after the offset first.
AppendBlock(const Block & b,bool is_last_block)204     void AppendBlock(const Block& b, bool is_last_block) final {
205         assert(queue_);
206         return queue_->AppendBlock(b, is_last_block);
207     }
208 
209     //! Append a block to this file, the block must contain given number of
210     //! items after the offset first.
AppendBlock(Block && b,bool is_last_block)211     void AppendBlock(Block&& b, bool is_last_block) final {
212         assert(queue_);
213         return queue_->AppendBlock(std::move(b), is_last_block);
214     }
215 
Close()216     void Close() final {
217         if (queue_) {
218             queue_->Close();
219             queue_ = nullptr;
220         }
221     }
222 
223     //! \}
224 
225 private:
226     BlockQueue* queue_;
227 };
228 
229 /*!
230  * A BlockSource to read Block from a BlockQueue using a BlockReader. Each Block
231  * is *taken* from the BlockQueue, hence the BlockQueue can be read only once!
232  */
233 class ConsumeBlockQueueSource
234 {
235     static constexpr bool debug = BlockQueue::debug;
236 
237 public:
238     //! Start reading from a BlockQueue
239     explicit ConsumeBlockQueueSource(BlockQueue& queue, size_t local_worker_id);
240 
241     void Prefetch(size_t /* prefetch */);
242 
243     //! Advance to next block of file, delivers current_ and end_ for
244     //! BlockReader. Returns false if the source is empty.
245     PinnedBlock NextBlock();
246 
247 private:
248     //! BlockQueue that blocks are retrieved from
249     BlockQueue& queue_;
250 
251     //! local worker id of the thread _reading_ the BlockQueue
252     size_t local_worker_id_;
253 };
254 
255 /*!
256  * A BlockSource to read Blocks from a BlockQueue using a BlockReader, and at
257  * the same time CACHE all items received. All Blocks read from the BlockQueue
258  * are saved in the cache File. If the cache BlockQueue is initially already
259  * closed, then Blocks are read from the File instead.
260  */
261 class CacheBlockQueueSource
262 {
263     static constexpr bool debug = BlockQueue::debug;
264 
265 public:
266     //! Start reading from a BlockQueue
267     explicit CacheBlockQueueSource(BlockQueue* queue, size_t local_worker_id);
268 
269     //! non-copyable: delete copy-constructor
270     CacheBlockQueueSource(const CacheBlockQueueSource&) = delete;
271     //! non-copyable: delete assignment operator
272     CacheBlockQueueSource& operator = (const CacheBlockQueueSource&) = delete;
273     //! move-constructor: default
274     CacheBlockQueueSource(CacheBlockQueueSource&& s);
275 
276     void Prefetch(size_t /* prefetch */);
277 
278     //! Return next block for BlockQueue, store into caching File and return it.
279     PinnedBlock NextBlock();
280 
281     //! Consume remaining blocks and cache them in the File.
282     ~CacheBlockQueueSource();
283 
284 private:
285     //! Reference to BlockQueue
286     BlockQueue* queue_;
287 
288     //! local worker id of the thread _reading_ the BlockQueue
289     size_t local_worker_id_;
290 };
291 
292 //! \}
293 
294 } // namespace data
295 } // namespace thrill
296 
297 #endif // !THRILL_DATA_BLOCK_QUEUE_HEADER
298 
299 /******************************************************************************/
300