1 /******************************************************************************* 2 * thrill/data/block_queue.hpp 3 * 4 * Part of Project Thrill - http://project-thrill.org 5 * 6 * Copyright (C) 2015 Tobias Sturm <mail@tobiassturm.de> 7 * Copyright (C) 2015 Timo Bingmann <tb@panthema.net> 8 * 9 * All rights reserved. Published under the BSD-2 license in the LICENSE file. 10 ******************************************************************************/ 11 12 #pragma once 13 #ifndef THRILL_DATA_BLOCK_QUEUE_HEADER 14 #define THRILL_DATA_BLOCK_QUEUE_HEADER 15 16 #include <thrill/common/atomic_movable.hpp> 17 #include <thrill/common/concurrent_bounded_queue.hpp> 18 #include <thrill/common/stats_timer.hpp> 19 #include <thrill/data/block.hpp> 20 #include <thrill/data/block_reader.hpp> 21 #include <thrill/data/block_writer.hpp> 22 #include <thrill/data/dyn_block_reader.hpp> 23 #include <thrill/data/file.hpp> 24 25 #include <atomic> 26 27 namespace thrill { 28 namespace data { 29 30 //! \addtogroup data_layer 31 //! \{ 32 33 class BlockQueueSink; 34 class ConsumeBlockQueueSource; 35 36 /*! 37 * A BlockQueue is a thread-safe queue used to hand-over Block objects between 38 * threads. It is currently used by the Multiplexer to queue received Blocks and 39 * deliver them (later) to their destination. 40 * 41 * The BlockQueue itself is also a BlockSink (so one can attach a BlockWriter to 42 * it). To read items from the queue, one needs to use a BlockReader 43 * instantiated with a BlockQueueSource. Both are easily available via 44 * GetWriter() and GetReader(). Each block is available only *once* via the 45 * BlockQueueSource. 46 */ 47 class BlockQueue final : public BlockSink 48 { 49 public: 50 static constexpr bool debug = false; 51 52 using Writer = BlockWriter<BlockQueueSink>; 53 using Reader = DynBlockReader; 54 using ConsumeReader = BlockReader<ConsumeBlockQueueSource>; 55 56 using CloseCallback = tlx::delegate<void (BlockQueue&)>; 57 58 //! Constructor from BlockPool 59 BlockQueue(BlockPool& block_pool, size_t local_worker_id, 60 size_t dia_id, 61 const CloseCallback& close_callback = CloseCallback()); 62 63 //! non-copyable: delete copy-constructor 64 BlockQueue(const BlockQueue&) = delete; 65 //! non-copyable: delete assignment operator 66 BlockQueue& operator = (const BlockQueue&) = delete; 67 //! move-constructor: default 68 BlockQueue(BlockQueue&&) = default; 69 //! move-assignment operator: default 70 BlockQueue& operator = (BlockQueue&&) = default; 71 AppendBlock(const Block & b,bool)72 void AppendBlock(const Block& b, bool /* is_last_block */) final { 73 LOG << "BlockQueue::AppendBlock() " << b; 74 item_counter_ += b.num_items(); 75 byte_counter_ += b.size(); 76 block_counter_++; 77 queue_.emplace(b); 78 } AppendBlock(Block && b,bool)79 void AppendBlock(Block&& b, bool /* is_last_block */) final { 80 LOG << "BlockQueue::AppendBlock() move " << b; 81 item_counter_ += b.num_items(); 82 byte_counter_ += b.size(); 83 block_counter_++; 84 queue_.emplace(std::move(b)); 85 } 86 87 //! Close called by BlockWriter. 88 void Close() final; 89 90 static constexpr bool allocate_can_fail_ = false; 91 Pop()92 Block Pop() { 93 if (read_closed_) return Block(); 94 Block b; 95 queue_.pop(b); 96 read_closed_ = !b.IsValid(); 97 return b; 98 } 99 100 //! change dia_id after construction (needed because it may be unknown at 101 //! construction) set_dia_id(size_t dia_id)102 void set_dia_id(size_t dia_id) { 103 file_.set_dia_id(dia_id); 104 } 105 106 //! set the close callback set_close_callback(const CloseCallback & cb)107 void set_close_callback(const CloseCallback& cb) { 108 close_callback_ = cb; 109 } 110 111 //! check if writer side Close() was called. write_closed() const112 bool write_closed() const { return write_closed_; } 113 empty() const114 bool empty() const { return queue_.empty(); } 115 116 //! check if reader side has returned a closing sentinel block read_closed() const117 bool read_closed() const { return read_closed_; } 118 119 //! return number of block in the queue. Use this ONLY for DEBUGGING! size()120 size_t size() { return queue_.size() - (write_closed() ? 1 : 0); } 121 122 //! Returns item_counter_ item_counter() const123 size_t item_counter() const { return item_counter_; } 124 //! Returns byte_counter_ byte_counter() const125 size_t byte_counter() const { return byte_counter_; } 126 //! Returns block_counter_ block_counter() const127 size_t block_counter() const { return block_counter_; } 128 //! Returns timespan_ timespan() const129 const common::StatsTimer& timespan() const { return timespan_; } 130 131 //! Return a BlockWriter delivering to this BlockQueue. 132 Writer GetWriter(size_t block_size = default_block_size); 133 134 //! return BlockReader specifically for a BlockQueue 135 ConsumeReader GetConsumeReader(size_t local_worker_id); 136 137 //! return polymorphic BlockSource variant 138 DynBlockSource GetBlockSource(bool consume, size_t local_worker_id); 139 140 //! return polymorphic BlockReader variant 141 Reader GetReader(bool consume, size_t local_worker_id); 142 143 private: 144 common::ConcurrentBoundedQueue<Block> queue_; 145 146 common::AtomicMovable<bool> write_closed_ = { false }; 147 148 //! whether Pop() has returned a closing Block; hence, if we received the 149 //! close message from the writer 150 bool read_closed_ = false; 151 152 //! number of items transfered by the Queue 153 size_t item_counter_ = 0; 154 //! number of bytes transfered by the Queue 155 size_t byte_counter_ = 0; 156 //! number of blocks transfered by the Queue 157 size_t block_counter_ = 0; 158 //! timespan of existance 159 common::StatsTimerStart timespan_; 160 161 //! File to cache blocks for implementing CacheBlockQueueSource. 162 File file_; 163 164 //! callback to issue when the writer closes the Queue -- for delivering 165 //! stats 166 CloseCallback close_callback_; 167 168 //! for access to file_ 169 friend class CacheBlockQueueSource; 170 }; 171 172 /*! 173 * BlockSink which interfaces to a File 174 */ 175 class BlockQueueSink final : public BlockSink 176 { 177 static constexpr bool debug = false; 178 179 public: BlockQueueSink()180 BlockQueueSink() 181 : BlockSink(nullptr, -1), queue_(nullptr) 182 { } 183 BlockQueueSink(BlockQueue * queue)184 explicit BlockQueueSink(BlockQueue* queue) 185 : BlockSink(queue->block_pool(), queue->local_worker_id()), 186 queue_(std::move(queue)) { 187 LOG << "BlockQueueSink() new for " << queue; 188 } 189 190 //! default copy-constructor 191 BlockQueueSink(const BlockQueueSink&) = default; 192 //! default assignment operator 193 BlockQueueSink& operator = (const BlockQueueSink&) = default; 194 ~BlockQueueSink()195 ~BlockQueueSink() { 196 LOG << "~BlockQueueSink() for " << queue_; 197 } 198 199 //! \name Methods of a BlockSink 200 //! \{ 201 202 //! Append a block to this file, the block must contain given number of 203 //! items after the offset first. AppendBlock(const Block & b,bool is_last_block)204 void AppendBlock(const Block& b, bool is_last_block) final { 205 assert(queue_); 206 return queue_->AppendBlock(b, is_last_block); 207 } 208 209 //! Append a block to this file, the block must contain given number of 210 //! items after the offset first. AppendBlock(Block && b,bool is_last_block)211 void AppendBlock(Block&& b, bool is_last_block) final { 212 assert(queue_); 213 return queue_->AppendBlock(std::move(b), is_last_block); 214 } 215 Close()216 void Close() final { 217 if (queue_) { 218 queue_->Close(); 219 queue_ = nullptr; 220 } 221 } 222 223 //! \} 224 225 private: 226 BlockQueue* queue_; 227 }; 228 229 /*! 230 * A BlockSource to read Block from a BlockQueue using a BlockReader. Each Block 231 * is *taken* from the BlockQueue, hence the BlockQueue can be read only once! 232 */ 233 class ConsumeBlockQueueSource 234 { 235 static constexpr bool debug = BlockQueue::debug; 236 237 public: 238 //! Start reading from a BlockQueue 239 explicit ConsumeBlockQueueSource(BlockQueue& queue, size_t local_worker_id); 240 241 void Prefetch(size_t /* prefetch */); 242 243 //! Advance to next block of file, delivers current_ and end_ for 244 //! BlockReader. Returns false if the source is empty. 245 PinnedBlock NextBlock(); 246 247 private: 248 //! BlockQueue that blocks are retrieved from 249 BlockQueue& queue_; 250 251 //! local worker id of the thread _reading_ the BlockQueue 252 size_t local_worker_id_; 253 }; 254 255 /*! 256 * A BlockSource to read Blocks from a BlockQueue using a BlockReader, and at 257 * the same time CACHE all items received. All Blocks read from the BlockQueue 258 * are saved in the cache File. If the cache BlockQueue is initially already 259 * closed, then Blocks are read from the File instead. 260 */ 261 class CacheBlockQueueSource 262 { 263 static constexpr bool debug = BlockQueue::debug; 264 265 public: 266 //! Start reading from a BlockQueue 267 explicit CacheBlockQueueSource(BlockQueue* queue, size_t local_worker_id); 268 269 //! non-copyable: delete copy-constructor 270 CacheBlockQueueSource(const CacheBlockQueueSource&) = delete; 271 //! non-copyable: delete assignment operator 272 CacheBlockQueueSource& operator = (const CacheBlockQueueSource&) = delete; 273 //! move-constructor: default 274 CacheBlockQueueSource(CacheBlockQueueSource&& s); 275 276 void Prefetch(size_t /* prefetch */); 277 278 //! Return next block for BlockQueue, store into caching File and return it. 279 PinnedBlock NextBlock(); 280 281 //! Consume remaining blocks and cache them in the File. 282 ~CacheBlockQueueSource(); 283 284 private: 285 //! Reference to BlockQueue 286 BlockQueue* queue_; 287 288 //! local worker id of the thread _reading_ the BlockQueue 289 size_t local_worker_id_; 290 }; 291 292 //! \} 293 294 } // namespace data 295 } // namespace thrill 296 297 #endif // !THRILL_DATA_BLOCK_QUEUE_HEADER 298 299 /******************************************************************************/ 300