1 /*******************************************************************************
2  * thrill/data/multiplexer.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
7  * Copyright (C) 2015 Tobias Sturm <mail@tobiassturm.de>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_DATA_MULTIPLEXER_HEADER
14 #define THRILL_DATA_MULTIPLEXER_HEADER
15 
16 #include <thrill/common/json_logger.hpp>
17 #include <thrill/net/dispatcher_thread.hpp>
18 #include <thrill/net/group.hpp>
19 
20 #include <algorithm>
21 #include <atomic>
22 #include <memory>
23 
24 namespace thrill {
25 namespace data {
26 
27 //! \addtogroup data_layer
28 //! \{
29 
30 class StreamSetBase;
31 
32 template <typename Stream>
33 class StreamSet;
34 
35 class CatStreamData;
36 using CatStreamDataPtr = tlx::CountingPtr<CatStreamData>;
37 class CatStream;
38 using CatStreamPtr = tlx::CountingPtr<CatStream>;
39 
40 class MixStreamData;
41 using MixStreamDataPtr = tlx::CountingPtr<MixStreamData>;
42 class MixStream;
43 using MixStreamPtr = tlx::CountingPtr<MixStream>;
44 
45 using CatStreamSet = StreamSet<CatStreamData>;
46 using MixStreamSet = StreamSet<MixStreamData>;
47 
48 class BlockQueue;
49 class MixBlockQueueSink;
50 
51 class StreamMultiplexerHeader;
52 
53 /*!
54  * Multiplexes virtual Connections on Dispatcher.
55  *
56  * A worker as a TCP conneciton to each other worker to exchange large amounts
57  * of data. Since multiple exchanges can occur at the same time on this single
58  * connection we use multiplexing. The slices are called Blocks and are
59  * indicated by a \ref MultiplexerHeader. Multiple Blocks form a Stream on a
60  * single TCP connection. The multiplexer multiplexes all streams on all
61  * sockets.
62  *
63  * All sockets are polled for headers. As soon as the a header arrives it is
64  * either attached to an existing stream or a new stream instance is
65  * created.
66  */
67 class Multiplexer
68 {
69     static constexpr bool debug = false;
70 
71 public:
72     Multiplexer(mem::Manager& mem_manager, BlockPool& block_pool,
73                 net::DispatcherThread& dispatcher, net::Group& group,
74                 size_t workers_per_host);
75 
76     //! non-copyable: delete copy-constructor
77     Multiplexer(const Multiplexer&) = delete;
78     //! non-copyable: delete assignment operator
79     Multiplexer& operator = (const Multiplexer&) = delete;
80 
81     //! Closes all client connections
82     ~Multiplexer();
83 
84     //! Closes all client connections
85     void Close();
86 
87     //! total number of hosts.
num_hosts() const88     size_t num_hosts() const {
89         return group_.num_hosts();
90     }
91 
92     //! my rank among the hosts.
my_host_rank() const93     size_t my_host_rank() const {
94         return group_.my_host_rank();
95     }
96 
97     //! total number of workers.
num_workers() const98     size_t num_workers() const {
99         return num_hosts() * workers_per_host_;
100     }
101 
102     //! number of workers per host
workers_per_host() const103     size_t workers_per_host() const {
104         return workers_per_host_;
105     }
106 
107     //! Get the used BlockPool
block_pool()108     BlockPool& block_pool() { return block_pool_; }
109 
110     //! Get the JsonLogger from the BlockPool
111     common::JsonLogger& logger();
112 
113     //! get network dispatcher
dispatcher()114     net::DispatcherThread& dispatcher() { return dispatcher_; }
115 
116     //! get network group connection
group()117     net::Group& group() { return group_; }
118 
119     //! \name CatStreamData
120     //! \{
121 
122     //! Allocate the next stream
123     size_t AllocateCatStreamId(size_t local_worker_id);
124 
125     //! Get stream with given id, if it does not exist, create it.
126     CatStreamDataPtr GetOrCreateCatStreamData(
127         size_t id, size_t local_worker_id, size_t dia_id);
128 
129     //! Request next stream.
130     CatStreamPtr GetNewCatStream(size_t local_worker_id, size_t dia_id);
131 
132     //! \}
133 
134     //! \name MixStream
135     //! \{
136 
137     //! Allocate the next stream
138     size_t AllocateMixStreamId(size_t local_worker_id);
139 
140     //! Get stream with given id, if it does not exist, create it.
141     MixStreamDataPtr GetOrCreateMixStreamData(
142         size_t id, size_t local_worker_id, size_t dia_id);
143 
144     //! Request next stream.
145     MixStreamPtr GetNewMixStream(size_t local_worker_id, size_t dia_id);
146 
147     //! \}
148 
149 private:
150     //! reference to host-global memory manager
151     mem::Manager& mem_manager_;
152 
153     //! reference to host-global BlockPool.
154     BlockPool& block_pool_;
155 
156     //! dispatcher used for all communication by data::Multiplexer, the thread
157     //! never leaves the data components!
158     net::DispatcherThread& dispatcher_;
159 
160     //! Holds NetConnections for outgoing Streams
161     net::Group& group_;
162 
163     //! Number of workers per host
164     size_t workers_per_host_;
165 
166     //! protects critical sections
167     std::mutex mutex_;
168 
169     //! closed
170     bool closed_ = false;
171 
172     //! number of parallel recv requests
173     size_t num_parallel_async_;
174 
175     //! Calculated send queue size limit for StreamData semaphores
176     size_t send_size_limit_;
177 
178     //! number of active Cat/MixStreams
179     std::atomic<size_t> active_streams_ { 0 };
180 
181     //! maximu number of active Cat/MixStreams
182     std::atomic<size_t> max_active_streams_ { 0 };
183 
184     //! friends for access to network components
185     friend class CatStreamData;
186     friend class MixStreamData;
187     friend class StreamSink;
188 
189     //! Pointer to queue that is used for communication between two workers on
190     //! the same host.
191     CatStreamDataPtr CatLoopback(size_t stream_id, size_t to_worker_id);
192     MixStreamDataPtr MixLoopback(size_t stream_id, size_t to_worker_id);
193 
194     /**************************************************************************/
195 
196     //! pimpl data structure
197     struct Data;
198 
199     //! pimpl data structure
200     std::unique_ptr<Data> d_;
201 
202     CatStreamDataPtr IntGetOrCreateCatStreamData(
203         size_t id, size_t local_worker_id, size_t dia_id);
204     MixStreamDataPtr IntGetOrCreateMixStreamData(
205         size_t id, size_t local_worker_id, size_t dia_id);
206 
207     //! release pointer onto a CatStreamData object
208     void IntReleaseCatStream(size_t id, size_t local_worker_id);
209     //! release pointer onto a MixStream object
210     void IntReleaseMixStream(size_t id, size_t local_worker_id);
211 
212     /**************************************************************************/
213 
214     using Connection = net::Connection;
215 
216     //! expects the next MultiplexerHeader from a socket and passes to
217     //! OnMultiplexerHeader
218     void AsyncReadMultiplexerHeader(size_t peer, Connection& s);
219 
220     //! parses MultiplexerHeader and decides whether to receive Block or close
221     //! Stream
222     void OnMultiplexerHeader(
223         size_t peer, uint32_t seq, Connection& s, net::Buffer&& buffer);
224 
225     //! Receives and dispatches a Block to a CatStreamData
226     void OnCatStreamBlock(
227         size_t peer, Connection& s, const StreamMultiplexerHeader& header,
228         const CatStreamDataPtr& stream, PinnedByteBlockPtr&& bytes);
229 
230     //! Receives and dispatches a Block to a MixStream
231     void OnMixStreamBlock(
232         size_t peer, Connection& s, const StreamMultiplexerHeader& header,
233         const MixStreamDataPtr& stream, PinnedByteBlockPtr&& bytes);
234 };
235 
236 //! \}
237 
238 } // namespace data
239 } // namespace thrill
240 
241 #endif // !THRILL_DATA_MULTIPLEXER_HEADER
242 
243 /******************************************************************************/
244