1 /******************************************************************************* 2 * thrill/data/stream_data.hpp 3 * 4 * Part of Project Thrill - http://project-thrill.org 5 * 6 * Copyright (C) 2015 Tobias Sturm <mail@tobiassturm.de> 7 * Copyright (C) 2015-2016 Timo Bingmann <tb@panthema.net> 8 * 9 * All rights reserved. Published under the BSD-2 license in the LICENSE file. 10 ******************************************************************************/ 11 12 #pragma once 13 #ifndef THRILL_DATA_STREAM_DATA_HEADER 14 #define THRILL_DATA_STREAM_DATA_HEADER 15 16 #include <thrill/common/stats_counter.hpp> 17 #include <thrill/common/stats_timer.hpp> 18 #include <thrill/data/block_writer.hpp> 19 #include <thrill/data/file.hpp> 20 #include <thrill/data/multiplexer.hpp> 21 #include <tlx/semaphore.hpp> 22 23 #include <mutex> 24 #include <vector> 25 26 namespace thrill { 27 namespace data { 28 29 //! \addtogroup data_layer 30 //! \{ 31 32 using StreamId = size_t; 33 34 enum class MagicByte : uint8_t { 35 Invalid, CatStreamBlock, MixStreamBlock, PartitionBlock 36 }; 37 38 class StreamSink; 39 class StreamSetBase; 40 41 /*! 42 * Base class for common structures for ConcatStream and MixedStream. This is 43 * also a virtual base class use by Multiplexer to pass blocks to streams! 44 * Instead, it contains common items like stats. 45 */ 46 class StreamData : public tlx::ReferenceCounter 47 { 48 public: 49 static constexpr bool debug = false; 50 51 using Writer = BlockWriter<StreamSink>; 52 53 /*! 54 * An extra class derived from std::vector<> for delivery of the 55 * BlockWriters of a Stream. The purpose is to enforce a custom way to close 56 * stream writers cyclically such that PE k first sends it's Close-packet to 57 * k+1, k+2, etc. 58 */ 59 class Writers : public std::vector<BlockWriter<StreamSink> > 60 { 61 public: 62 Writers(size_t my_worker_rank = 0); 63 64 //! copyable: default copy-constructor 65 Writers(const Writers&) = default; 66 //! copyable: default assignment operator 67 Writers& operator = (const Writers&) = default; 68 //! move-constructor: default 69 Writers(Writers&&) = default; 70 //! move-assignment operator: default 71 Writers& operator = (Writers&&) = default; 72 73 //! custom destructor to close writers is a cyclic fashion 74 void Close(); 75 76 //! custom destructor to close writers is a cyclic fashion 77 ~Writers(); 78 79 private: 80 //! rank of this worker 81 size_t my_worker_rank_; 82 }; 83 84 StreamData(StreamSetBase* stream_set_base, 85 Multiplexer& multiplexer, size_t send_size_limit, 86 const StreamId& id, size_t local_worker_id, size_t dia_id); 87 88 virtual ~StreamData(); 89 90 //! Return stream id id() const91 const StreamId& id() const { return id_; } 92 93 //! return stream type string 94 virtual const char * stream_type() = 0; 95 96 //! Returns my_host_rank my_host_rank() const97 size_t my_host_rank() const { return multiplexer_.my_host_rank(); } 98 //! Number of hosts in system num_hosts() const99 size_t num_hosts() const { return multiplexer_.num_hosts(); } 100 //! Number of workers in system num_workers() const101 size_t num_workers() const { return multiplexer_.num_workers(); } 102 103 //! Returns workers_per_host workers_per_host() const104 size_t workers_per_host() const { return multiplexer_.workers_per_host(); } 105 //! Returns my_worker_rank_ my_worker_rank() const106 size_t my_worker_rank() const { 107 return my_host_rank() * workers_per_host() + local_worker_id_; 108 } 109 110 /*------------------------------------------------------------------------*/ 111 112 //! shuts the stream down. 113 virtual void Close() = 0; 114 115 virtual bool closed() const = 0; 116 117 //! Creates BlockWriters for each worker. BlockWriter can only be opened 118 //! once, otherwise the block sequence is incorrectly interleaved! 119 virtual Writers GetWriters() = 0; 120 121 //! method called from StreamSink when it is closed, used to aggregate Close 122 //! messages to remote hosts 123 void OnWriterClosed(size_t peer_worker_rank, bool sent); 124 125 //! method called when all StreamSink writers have finished 126 void OnAllWritersClosed(); 127 128 /*------------------------------------------------------------------------*/ 129 ///////// expose these members - getters would be too java-ish ///////////// 130 131 //! StatsCounter for incoming data transfer. Does not include loopback data 132 //! transfer 133 std::atomic<size_t> 134 rx_net_items_ { 0 }, rx_net_bytes_ { 0 }, rx_net_blocks_ { 0 }; 135 136 //! StatsCounters for outgoing data transfer - shared by all sinks. Does 137 //! not include loopback data transfer 138 std::atomic<size_t> 139 tx_net_items_ { 0 }, tx_net_bytes_ { 0 }, tx_net_blocks_ { 0 }; 140 141 //! StatsCounter for incoming data transfer. Exclusively contains only 142 //! loopback (internal) data transfer 143 std::atomic<size_t> 144 rx_int_items_ { 0 }, rx_int_bytes_ { 0 }, rx_int_blocks_ { 0 }; 145 146 //! StatsCounters for outgoing data transfer - shared by all sinks. 147 //! Exclusively contains only loopback (internal) data transfer 148 std::atomic<size_t> 149 tx_int_items_ { 0 }, tx_int_bytes_ { 0 }, tx_int_blocks_ { 0 }; 150 151 //! Timers from creation of stream until rx / tx direction is closed. 152 common::StatsTimerStart tx_lifetime_, rx_lifetime_; 153 154 //! Timers from first rx / tx package until rx / tx direction is closed. 155 common::StatsTimerStopped tx_timespan_, rx_timespan_; 156 157 //! semaphore to stall the amount of PinnedBlocks (measured in bytes) passed 158 //! to the network layer for transmission. 159 tlx::Semaphore sem_queue_; 160 161 /////////////////////////////////////////////////////////////////////////// 162 163 protected: 164 //! our own stream id. 165 StreamId id_; 166 167 //! pointer to StreamSetBase containing this StreamData 168 StreamSetBase* stream_set_base_; 169 170 //! local worker id 171 size_t local_worker_id_; 172 173 //! associated DIANode id. 174 size_t dia_id_; 175 176 //! reference to multiplexer 177 Multiplexer& multiplexer_; 178 179 //! number of remaining expected stream closing operations. Required to know 180 //! when to stop rx_lifetime 181 std::atomic<size_t> remaining_closing_blocks_; 182 183 //! number of received stream closing Blocks. 184 tlx::Semaphore sem_closing_blocks_; 185 186 //! number of writers closed via StreamSink. 187 size_t writers_closed_ = 0; 188 189 //! bool if all writers were closed 190 bool all_writers_closed_ = false; 191 192 //! friends for access to multiplexer_ 193 friend class StreamSink; 194 }; 195 196 using StreamDataPtr = tlx::CountingPtr<StreamData>; 197 198 /*! 199 * Base class for StreamSet. 200 */ 201 class StreamSetBase : public tlx::ReferenceCounter 202 { 203 public: 204 static constexpr bool debug = false; 205 ~StreamSetBase()206 virtual ~StreamSetBase() { } 207 208 //! Close all streams in the set. 209 virtual void Close() = 0; 210 211 //! method called from StreamSink when it is closed, used to aggregate Close 212 //! messages to remote hosts 213 virtual void OnWriterClosed(size_t peer_worker_rank, bool sent) = 0; 214 }; 215 216 /*! 217 * Simple structure that holds a all stream instances for the workers on the 218 * local host for a given stream id. 219 */ 220 template <typename StreamData> 221 class StreamSet : public StreamSetBase 222 { 223 public: 224 using StreamDataPtr = tlx::CountingPtr<StreamData>; 225 226 //! Creates a StreamSet with the given number of streams (num workers per 227 //! host). 228 StreamSet(Multiplexer& multiplexer, size_t send_size_limit, 229 StreamId id, size_t workers_per_host, size_t dia_id); 230 231 //! Returns the stream that will be consumed by the worker with the given 232 //! local id 233 StreamDataPtr Peer(size_t local_worker_id); 234 235 //! Release local_worker_id, returns true when all individual streams are 236 //! done. 237 bool Release(size_t local_worker_id); 238 239 //! Close all StreamData objects 240 void Close() final; 241 242 //! method called from StreamSink when it is closed, used to aggregate Close 243 //! messages to remote hosts 244 void OnWriterClosed(size_t peer_worker_rank, bool sent); 245 246 //! Returns my_host_rank my_host_rank() const247 size_t my_host_rank() const { return multiplexer_.my_host_rank(); } 248 //! Number of hosts in system num_hosts() const249 size_t num_hosts() const { return multiplexer_.num_hosts(); } 250 //! Returns workers_per_host workers_per_host() const251 size_t workers_per_host() const { return multiplexer_.workers_per_host(); } 252 253 inline MagicByte magic_byte() const; 254 255 private: 256 //! reference to multiplexer 257 Multiplexer& multiplexer_; 258 //! stream id 259 StreamId id_; 260 //! 'owns' all streams belonging to one stream id for all local workers. 261 std::vector<StreamDataPtr> streams_; 262 //! countdown to destruction 263 size_t remaining_; 264 //! number of writers closed per host, message is set when all are closed 265 std::vector<size_t> writers_closed_per_host_; 266 //! number of writers closed per host, message is set when all are closed 267 std::vector<size_t> writers_closed_per_host_sent_; 268 //! mutex for working on the data structure 269 std::mutex mutex_; 270 }; 271 272 //! \} 273 274 } // namespace data 275 } // namespace thrill 276 277 #endif // !THRILL_DATA_STREAM_DATA_HEADER 278 279 /******************************************************************************/ 280