1 /******************************************************************************* 2 * thrill/data/multiplexer.hpp 3 * 4 * Part of Project Thrill - http://project-thrill.org 5 * 6 * Copyright (C) 2015 Timo Bingmann <tb@panthema.net> 7 * Copyright (C) 2015 Tobias Sturm <mail@tobiassturm.de> 8 * 9 * All rights reserved. Published under the BSD-2 license in the LICENSE file. 10 ******************************************************************************/ 11 12 #pragma once 13 #ifndef THRILL_DATA_MULTIPLEXER_HEADER 14 #define THRILL_DATA_MULTIPLEXER_HEADER 15 16 #include <thrill/common/json_logger.hpp> 17 #include <thrill/net/dispatcher_thread.hpp> 18 #include <thrill/net/group.hpp> 19 20 #include <algorithm> 21 #include <atomic> 22 #include <memory> 23 24 namespace thrill { 25 namespace data { 26 27 //! \addtogroup data_layer 28 //! \{ 29 30 class StreamSetBase; 31 32 template <typename Stream> 33 class StreamSet; 34 35 class CatStreamData; 36 using CatStreamDataPtr = tlx::CountingPtr<CatStreamData>; 37 class CatStream; 38 using CatStreamPtr = tlx::CountingPtr<CatStream>; 39 40 class MixStreamData; 41 using MixStreamDataPtr = tlx::CountingPtr<MixStreamData>; 42 class MixStream; 43 using MixStreamPtr = tlx::CountingPtr<MixStream>; 44 45 using CatStreamSet = StreamSet<CatStreamData>; 46 using MixStreamSet = StreamSet<MixStreamData>; 47 48 class BlockQueue; 49 class MixBlockQueueSink; 50 51 class StreamMultiplexerHeader; 52 53 /*! 54 * Multiplexes virtual Connections on Dispatcher. 55 * 56 * A worker as a TCP conneciton to each other worker to exchange large amounts 57 * of data. Since multiple exchanges can occur at the same time on this single 58 * connection we use multiplexing. The slices are called Blocks and are 59 * indicated by a \ref MultiplexerHeader. Multiple Blocks form a Stream on a 60 * single TCP connection. The multiplexer multiplexes all streams on all 61 * sockets. 62 * 63 * All sockets are polled for headers. As soon as the a header arrives it is 64 * either attached to an existing stream or a new stream instance is 65 * created. 66 */ 67 class Multiplexer 68 { 69 static constexpr bool debug = false; 70 71 public: 72 Multiplexer(mem::Manager& mem_manager, BlockPool& block_pool, 73 net::DispatcherThread& dispatcher, net::Group& group, 74 size_t workers_per_host); 75 76 //! non-copyable: delete copy-constructor 77 Multiplexer(const Multiplexer&) = delete; 78 //! non-copyable: delete assignment operator 79 Multiplexer& operator = (const Multiplexer&) = delete; 80 81 //! Closes all client connections 82 ~Multiplexer(); 83 84 //! Closes all client connections 85 void Close(); 86 87 //! total number of hosts. num_hosts() const88 size_t num_hosts() const { 89 return group_.num_hosts(); 90 } 91 92 //! my rank among the hosts. my_host_rank() const93 size_t my_host_rank() const { 94 return group_.my_host_rank(); 95 } 96 97 //! total number of workers. num_workers() const98 size_t num_workers() const { 99 return num_hosts() * workers_per_host_; 100 } 101 102 //! number of workers per host workers_per_host() const103 size_t workers_per_host() const { 104 return workers_per_host_; 105 } 106 107 //! Get the used BlockPool block_pool()108 BlockPool& block_pool() { return block_pool_; } 109 110 //! Get the JsonLogger from the BlockPool 111 common::JsonLogger& logger(); 112 113 //! get network dispatcher dispatcher()114 net::DispatcherThread& dispatcher() { return dispatcher_; } 115 116 //! get network group connection group()117 net::Group& group() { return group_; } 118 119 //! \name CatStreamData 120 //! \{ 121 122 //! Allocate the next stream 123 size_t AllocateCatStreamId(size_t local_worker_id); 124 125 //! Get stream with given id, if it does not exist, create it. 126 CatStreamDataPtr GetOrCreateCatStreamData( 127 size_t id, size_t local_worker_id, size_t dia_id); 128 129 //! Request next stream. 130 CatStreamPtr GetNewCatStream(size_t local_worker_id, size_t dia_id); 131 132 //! \} 133 134 //! \name MixStream 135 //! \{ 136 137 //! Allocate the next stream 138 size_t AllocateMixStreamId(size_t local_worker_id); 139 140 //! Get stream with given id, if it does not exist, create it. 141 MixStreamDataPtr GetOrCreateMixStreamData( 142 size_t id, size_t local_worker_id, size_t dia_id); 143 144 //! Request next stream. 145 MixStreamPtr GetNewMixStream(size_t local_worker_id, size_t dia_id); 146 147 //! \} 148 149 private: 150 //! reference to host-global memory manager 151 mem::Manager& mem_manager_; 152 153 //! reference to host-global BlockPool. 154 BlockPool& block_pool_; 155 156 //! dispatcher used for all communication by data::Multiplexer, the thread 157 //! never leaves the data components! 158 net::DispatcherThread& dispatcher_; 159 160 //! Holds NetConnections for outgoing Streams 161 net::Group& group_; 162 163 //! Number of workers per host 164 size_t workers_per_host_; 165 166 //! protects critical sections 167 std::mutex mutex_; 168 169 //! closed 170 bool closed_ = false; 171 172 //! number of parallel recv requests 173 size_t num_parallel_async_; 174 175 //! Calculated send queue size limit for StreamData semaphores 176 size_t send_size_limit_; 177 178 //! number of active Cat/MixStreams 179 std::atomic<size_t> active_streams_ { 0 }; 180 181 //! maximu number of active Cat/MixStreams 182 std::atomic<size_t> max_active_streams_ { 0 }; 183 184 //! friends for access to network components 185 friend class CatStreamData; 186 friend class MixStreamData; 187 friend class StreamSink; 188 189 //! Pointer to queue that is used for communication between two workers on 190 //! the same host. 191 CatStreamDataPtr CatLoopback(size_t stream_id, size_t to_worker_id); 192 MixStreamDataPtr MixLoopback(size_t stream_id, size_t to_worker_id); 193 194 /**************************************************************************/ 195 196 //! pimpl data structure 197 struct Data; 198 199 //! pimpl data structure 200 std::unique_ptr<Data> d_; 201 202 CatStreamDataPtr IntGetOrCreateCatStreamData( 203 size_t id, size_t local_worker_id, size_t dia_id); 204 MixStreamDataPtr IntGetOrCreateMixStreamData( 205 size_t id, size_t local_worker_id, size_t dia_id); 206 207 //! release pointer onto a CatStreamData object 208 void IntReleaseCatStream(size_t id, size_t local_worker_id); 209 //! release pointer onto a MixStream object 210 void IntReleaseMixStream(size_t id, size_t local_worker_id); 211 212 /**************************************************************************/ 213 214 using Connection = net::Connection; 215 216 //! expects the next MultiplexerHeader from a socket and passes to 217 //! OnMultiplexerHeader 218 void AsyncReadMultiplexerHeader(size_t peer, Connection& s); 219 220 //! parses MultiplexerHeader and decides whether to receive Block or close 221 //! Stream 222 void OnMultiplexerHeader( 223 size_t peer, uint32_t seq, Connection& s, net::Buffer&& buffer); 224 225 //! Receives and dispatches a Block to a CatStreamData 226 void OnCatStreamBlock( 227 size_t peer, Connection& s, const StreamMultiplexerHeader& header, 228 const CatStreamDataPtr& stream, PinnedByteBlockPtr&& bytes); 229 230 //! Receives and dispatches a Block to a MixStream 231 void OnMixStreamBlock( 232 size_t peer, Connection& s, const StreamMultiplexerHeader& header, 233 const MixStreamDataPtr& stream, PinnedByteBlockPtr&& bytes); 234 }; 235 236 //! \} 237 238 } // namespace data 239 } // namespace thrill 240 241 #endif // !THRILL_DATA_MULTIPLEXER_HEADER 242 243 /******************************************************************************/ 244