1 /******************************************************************************* 2 * thrill/data/cat_stream.hpp 3 * 4 * Part of Project Thrill - http://project-thrill.org 5 * 6 * Copyright (C) 2015 Timo Bingmann <tb@panthema.net> 7 * Copyright (C) 2015 Tobias Sturm <mail@tobiassturm.de> 8 * 9 * All rights reserved. Published under the BSD-2 license in the LICENSE file. 10 ******************************************************************************/ 11 12 #pragma once 13 #ifndef THRILL_DATA_CAT_STREAM_HEADER 14 #define THRILL_DATA_CAT_STREAM_HEADER 15 16 #include <thrill/data/block_queue.hpp> 17 #include <thrill/data/cat_block_source.hpp> 18 #include <thrill/data/stream.hpp> 19 20 #include <string> 21 #include <vector> 22 23 namespace thrill { 24 namespace data { 25 26 //! \addtogroup data_layer 27 //! \{ 28 29 /*! 30 * A Stream is a virtual set of connections to all other worker instances, 31 * hence a "Stream" bundles them to a logical communication context. We call an 32 * individual connection from a worker to another worker a "Host". 33 * 34 * To use a Stream, one can get a vector of BlockWriter via OpenWriters() of 35 * outbound Stream. The vector is of size of workers in the system. 36 * One can then write items destined to the 37 * corresponding worker. The written items are buffered into a Block and only 38 * sent when the Block is full. To force a send, use BlockWriter::Flush(). When 39 * all items are sent, the BlockWriters **must** be closed using 40 * BlockWriter::Close(). 41 * 42 * To read the inbound Connection items, one can get a vector of BlockReader via 43 * OpenReaders(), which can then be used to read items sent by individual 44 * workers. 45 * 46 * Alternatively, one can use OpenReader() to get a BlockReader which delivers 47 * all items from *all* worker in worker order (concatenating all inbound 48 * Connections). 49 * 50 * As soon as all attached streams of the Stream have been Close() the number of 51 * expected streams is reached, the stream is marked as finished and no more 52 * data will arrive. 53 */ 54 class CatStreamData final : public StreamData 55 { 56 public: 57 static constexpr bool debug = false; 58 static constexpr bool debug_data = false; 59 60 using BlockQueueSource = ConsumeBlockQueueSource; 61 using BlockQueueReader = BlockReader<BlockQueueSource>; 62 63 using CatBlockSource = data::CatBlockSource<DynBlockSource>; 64 using CatBlockReader = BlockReader<CatBlockSource>; 65 66 using Reader = BlockQueueReader; 67 using CatReader = CatBlockReader; 68 69 using Handle = CatStream; 70 71 //! Creates a new stream instance 72 CatStreamData(StreamSetBase* stream_set_base, 73 Multiplexer& multiplexer, size_t send_size_limit, 74 const StreamId& id, size_t local_worker_id, size_t dia_id); 75 76 //! non-copyable: delete copy-constructor 77 CatStreamData(const CatStreamData&) = delete; 78 //! non-copyable: delete assignment operator 79 CatStreamData& operator = (const CatStreamData&) = delete; 80 //! move-constructor: default 81 CatStreamData(CatStreamData&&) = default; 82 83 ~CatStreamData() final; 84 85 //! return stream type string 86 const char * stream_type() final; 87 88 //! change dia_id after construction (needed because it may be unknown at 89 //! construction) 90 void set_dia_id(size_t dia_id); 91 92 //! Creates BlockWriters for each worker. BlockWriter can only be opened 93 //! once, otherwise the block sequence is incorrectly interleaved! 94 Writers GetWriters() final; 95 96 //! Creates a BlockReader for each worker. The BlockReaders are attached to 97 //! the BlockQueues in the Stream and wait for further Blocks to arrive or 98 //! the Stream's remote close. These Readers _always_ consume! 99 std::vector<Reader> GetReaders(); 100 101 //! Gets a CatBlockSource which includes all incoming queues of this stream. 102 CatBlockSource GetCatBlockSource(bool consume); 103 104 //! Creates a BlockReader which concatenates items from all workers in 105 //! worker rank order. The BlockReader is attached to one \ref 106 //! CatBlockSource which includes all incoming queues of this stream. 107 CatReader GetCatReader(bool consume); 108 109 //! Open a CatReader (function name matches a method in File and MixStream). 110 CatReader GetReader(bool consume); 111 112 //! shuts the stream down. 113 void Close() final; 114 115 //! Indicates if the stream is closed - meaning all remaining streams have 116 //! been closed. This does *not* include the loopback stream 117 bool closed() const final; 118 119 //! check if inbound queue is closed 120 bool is_queue_closed(size_t from); 121 122 private: 123 bool is_closed_ = false; 124 125 struct SeqReordering; 126 127 //! Block Sequence numbers 128 std::vector<SeqReordering> seq_; 129 130 //! BlockQueues to store incoming Blocks with no attached destination. 131 std::vector<BlockQueue> queues_; 132 133 //! for calling methods to deliver blocks 134 friend class Multiplexer; 135 136 //! called from Multiplexer when there is a new Block on a 137 //! Stream. 138 void OnStreamBlock(size_t from, uint32_t seq, Block&& b); 139 140 void OnStreamBlockOrdered(size_t from, Block&& b); 141 142 //! Returns the loopback queue for the worker of this stream. 143 BlockQueue * loopback_queue(size_t from_worker_id); 144 }; 145 146 // we have two types of CatStream smart pointers: one for internal use in the 147 // Multiplexer (ordinary CountingPtr), and another for public handles in the 148 // DIANodes. Once all public handles are deleted, the CatStream is deactivated. 149 using CatStreamDataPtr = tlx::CountingPtr<CatStreamData>; 150 151 using CatStreamSet = StreamSet<CatStreamData>; 152 using CatStreamSetPtr = tlx::CountingPtr<CatStreamSet>; 153 154 //! Ownership handle onto a CatStreamData 155 class CatStream final : public Stream 156 { 157 public: 158 using Writer = CatStreamData::Writer; 159 using Reader = CatStreamData::Reader; 160 161 using CatReader = CatStreamData::CatReader; 162 163 explicit CatStream(const CatStreamDataPtr& ptr); 164 165 //! When the user handle is destroyed, close the stream (but maybe not 166 //! destroy the data object) 167 ~CatStream(); 168 169 const StreamId& id() const final; 170 171 //! Return stream data reference 172 StreamData& data() final; 173 174 //! Return stream data reference 175 const StreamData& data() const final; 176 177 //! Creates BlockWriters for each worker. BlockWriter can only be opened 178 //! once, otherwise the block sequence is incorrectly interleaved! 179 Writers GetWriters() final; 180 181 //! Creates a BlockReader for each worker. The BlockReaders are attached to 182 //! the BlockQueues in the Stream and wait for further Blocks to arrive or 183 //! the Stream's remote close. These Readers _always_ consume! 184 std::vector<Reader> GetReaders(); 185 186 //! Creates a BlockReader which concatenates items from all workers in 187 //! worker rank order. The BlockReader is attached to one \ref 188 //! CatBlockSource which includes all incoming queues of this stream. 189 CatReader GetCatReader(bool consume); 190 191 //! Open a CatReader (function name matches a method in File and MixStream). 192 CatReader GetReader(bool consume); 193 194 private: 195 CatStreamDataPtr ptr_; 196 }; 197 198 using CatStreamPtr = tlx::CountingPtr<CatStream>; 199 200 //! \} 201 202 } // namespace data 203 } // namespace thrill 204 205 #endif // !THRILL_DATA_CAT_STREAM_HEADER 206 207 /******************************************************************************/ 208