1 /******************************************************************************* 2 * thrill/data/mix_block_queue.hpp 3 * 4 * Part of Project Thrill - http://project-thrill.org 5 * 6 * Copyright (C) 2015 Timo Bingmann <tb@panthema.net> 7 * 8 * All rights reserved. Published under the BSD-2 license in the LICENSE file. 9 ******************************************************************************/ 10 11 #pragma once 12 #ifndef THRILL_DATA_MIX_BLOCK_QUEUE_HEADER 13 #define THRILL_DATA_MIX_BLOCK_QUEUE_HEADER 14 15 #include <thrill/common/atomic_movable.hpp> 16 #include <thrill/common/concurrent_bounded_queue.hpp> 17 #include <thrill/common/logger.hpp> 18 #include <thrill/data/block.hpp> 19 #include <thrill/data/block_queue.hpp> 20 #include <thrill/data/block_reader.hpp> 21 #include <thrill/data/cat_block_source.hpp> 22 #include <thrill/data/dyn_block_reader.hpp> 23 #include <thrill/data/file.hpp> 24 25 #include <vector> 26 27 namespace thrill { 28 namespace data { 29 30 //! \addtogroup data_layer 31 //! \{ 32 33 class MixStreamData; 34 class MixBlockQueueReader; 35 36 using MixStreamDataPtr = tlx::CountingPtr<MixStreamData>; 37 38 /*! 39 * Implements reading an unordered sequence of items from multiple workers, 40 * which sends Blocks. This class is mainly used to implement MixChannel. 41 * 42 * When Blocks arrive from the net, the Multiplexer pushes (src, Blocks) pairs 43 * to MixChannel, which pushes them into a MixBlockQueue. The 44 * MixBlockQueue stores these in a ConcurrentBoundedQueue for atomic reading. 45 * 46 * When the MixChannel should be read, MixBlockQueueReader is used, which 47 * retrieves Blocks from the queue. The Reader contains one complete BlockReader 48 * for each inbound worker, and these BlockReaders are attached to BlockQueue 49 * instances inside the MixBlockQueue. 50 * 51 * To enable unordered reading from multiple workers, the only remaining thing 52 * to do is to fetch Blocks from the main mix queue and put them into the 53 * right BlockQueue for the sub-readers to consume. By taking the Blocks from 54 * the main mix queue, the Reader only blocks when no inbound Blocks are 55 * available. 56 * 57 * To enable switching between items from different workers, the 58 * MixBlockQueueReader keeps track of how many _whole_ items are available on 59 * each reader. This number is simply -1 of the number of items known to start 60 * in the received blocks. The last item _may_ span further Blocks, and cannot 61 * be fetched without infinitely blocking the sub-reader, since no thread will 62 * deliver the next Block. 63 */ 64 class MixBlockQueue 65 { 66 static constexpr bool debug = false; 67 68 public: 69 //! pair of (source worker, Block) stored in the main mix queue. 70 struct SrcBlockPair { 71 size_t src; 72 Block block; 73 }; 74 75 using Reader = MixBlockQueueReader; 76 77 //! Constructor from BlockPool 78 MixBlockQueue(BlockPool& block_pool, size_t num_workers, 79 size_t local_worker_id, size_t dia_id); 80 81 //! non-copyable: delete copy-constructor 82 MixBlockQueue(const MixBlockQueue&) = delete; 83 //! non-copyable: delete assignment operator 84 MixBlockQueue& operator = (const MixBlockQueue&) = delete; 85 //! move-constructor: default 86 MixBlockQueue(MixBlockQueue&&) = default; 87 //! move-assignment operator: default 88 MixBlockQueue& operator = (MixBlockQueue&&) = default; 89 90 //! change dia_id after construction (needed because it may be unknown at 91 //! construction) 92 void set_dia_id(size_t dia_id); 93 94 //! return block pool block_pool()95 BlockPool& block_pool() { return block_pool_; } 96 97 //! append block delivered via the network from src. 98 void AppendBlock(size_t src, const Block& block); 99 100 //! append block delivered via the network from src. 101 void AppendBlock(size_t src, Block&& block); 102 103 //! append closing sentinel block from src (also delivered via the network). 104 void Close(size_t src); 105 106 //! Blocking retrieval of a (source,block) pair. 107 SrcBlockPair Pop(); 108 109 //! check if writer side Close() was called. write_closed() const110 bool write_closed() const { return write_open_count_ == 0; } 111 112 //! check if reader side has returned a closing sentinel block read_closed() const113 bool read_closed() const { return read_open_ == 0; } 114 115 //! check if inbound queue is closed 116 bool is_queue_closed(size_t src); 117 118 private: 119 BlockPool& block_pool_; 120 121 size_t local_worker_id_; 122 123 //! the main mix queue, containing the block in the reception order. 124 common::ConcurrentBoundedQueue<SrcBlockPair> mix_queue_; 125 126 //! total number of workers in system. 127 size_t num_workers_; 128 129 //! counter on number of writers still open. 130 common::AtomicMovable<size_t> write_open_count_ { num_workers_ }; 131 132 //! flag to test for closed sources 133 std::vector<unsigned char> write_closed_; 134 135 //! number of times Pop() has not yet returned a closing Block; hence, if we 136 //! received the close message from the writer. 137 size_t read_open_ = num_workers_; 138 139 //! BlockQueues to deliver blocks to from mix queue. 140 std::vector<BlockQueue> queues_; 141 142 //! for access to queues_ and other internals. 143 friend class MixBlockQueueReader; 144 }; 145 146 /*! 147 * Reader to retrieve items in unordered sequence from a MixBlockQueue. This 148 * is not a full implementation of _all_ methods available in a normal 149 * BlockReader. Mainly, this is because only retrieval of _whole_ items are 150 * possible. Due to the unordered sequence, these probably have to be all of 151 * equal type as well. 152 * 153 * The Reader supports all combinations of consuming and keeping. However, do 154 * not assume that the second round of reading delivers items in the same order 155 * as the first. This is because once items are cached inside the BlockQueues of 156 * MixBlockQueue, we use a plain CatReader to deliver them again (which is 157 * probably faster as it has a sequential access pattern). 158 * 159 * See \ref MixBlockQueue for more information on how items are read. 160 */ 161 class MixBlockQueueReader 162 { 163 static constexpr bool debug = false; 164 165 public: 166 using CatBlockSource = data::CatBlockSource<DynBlockSource>; 167 using CatBlockReader = BlockReader<CatBlockSource>; 168 169 MixBlockQueueReader(MixBlockQueue& mix_queue, 170 bool consume, size_t local_worker_id); 171 172 //! non-copyable: delete copy-constructor 173 MixBlockQueueReader(const MixBlockQueueReader&) = delete; 174 //! non-copyable: delete assignment operator 175 MixBlockQueueReader& operator = (const MixBlockQueueReader&) = delete; 176 //! move-constructor: default 177 MixBlockQueueReader(MixBlockQueueReader&&) = default; 178 //! move-assignment operator: default 179 MixBlockQueueReader& operator = (MixBlockQueueReader&&) = default; 180 181 //! Possibly consume unread blocks. 182 ~MixBlockQueueReader(); 183 184 //! HasNext() returns true if at least one more item is available. HasNext()185 bool HasNext() { 186 if (reread_) return cat_reader_.HasNext(); 187 188 if (available_) return true; 189 if (open_ == 0) return false; 190 191 return PullBlock(); 192 } 193 194 //! Next() reads a complete item T 195 template <typename T> Next()196 T Next() { 197 assert(HasNext()); 198 199 if (reread_) { 200 return cat_reader_.template Next<T>(); 201 } 202 else { 203 if (!available_) { 204 if (!PullBlock()) { 205 throw std::runtime_error( 206 "Data underflow in MixBlockQueueReader."); 207 } 208 } 209 210 assert(available_ > 0); 211 assert(selected_ < readers_.size()); 212 213 --available_; 214 return readers_[selected_].template Next<T>(); 215 } 216 } 217 218 private: 219 //! reference to mix queue 220 MixBlockQueue& mix_queue_; 221 222 //! flag whether we are rereading the mix queue by reading the files using 223 //! a cat_reader_. 224 const bool reread_; 225 226 //! \name Attributes for Mix Reading 227 //! \{ 228 229 //! sub-readers for each block queue in mix queue 230 std::vector<BlockQueue::Reader> readers_; 231 232 //! reader currently selected 233 size_t selected_ = size_t(-1); 234 235 //! number of available items on the selected reader 236 size_t available_ = 0; 237 238 //! number of additional items available at reader (excluding current 239 //! available_) 240 std::vector<size_t> available_at_; 241 242 //! number of readers still open 243 size_t open_ = mix_queue_.num_workers_; 244 245 //! \} 246 247 //! for rereading the mix queue: use a cat reader on the embedded 248 //! BlockQueue's files. 249 CatBlockReader cat_reader_ { CatBlockSource() }; 250 251 bool PullBlock(); 252 }; 253 254 //! \} 255 256 } // namespace data 257 } // namespace thrill 258 259 #endif // !THRILL_DATA_MIX_BLOCK_QUEUE_HEADER 260 261 /******************************************************************************/ 262