1 /*******************************************************************************
2  * thrill/data/stream_data.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Tobias Sturm <mail@tobiassturm.de>
7  * Copyright (C) 2015-2016 Timo Bingmann <tb@panthema.net>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_DATA_STREAM_DATA_HEADER
14 #define THRILL_DATA_STREAM_DATA_HEADER
15 
16 #include <thrill/common/stats_counter.hpp>
17 #include <thrill/common/stats_timer.hpp>
18 #include <thrill/data/block_writer.hpp>
19 #include <thrill/data/file.hpp>
20 #include <thrill/data/multiplexer.hpp>
21 #include <tlx/semaphore.hpp>
22 
23 #include <mutex>
24 #include <vector>
25 
26 namespace thrill {
27 namespace data {
28 
29 //! \addtogroup data_layer
30 //! \{
31 
32 using StreamId = size_t;
33 
34 enum class MagicByte : uint8_t {
35     Invalid, CatStreamBlock, MixStreamBlock, PartitionBlock
36 };
37 
38 class StreamSink;
39 class StreamSetBase;
40 
41 /*!
42  * Base class for common structures for ConcatStream and MixedStream. This is
43  * also a virtual base class use by Multiplexer to pass blocks to streams!
44  * Instead, it contains common items like stats.
45  */
46 class StreamData : public tlx::ReferenceCounter
47 {
48 public:
49     static constexpr bool debug = false;
50 
51     using Writer = BlockWriter<StreamSink>;
52 
53     /*!
54      * An extra class derived from std::vector<> for delivery of the
55      * BlockWriters of a Stream. The purpose is to enforce a custom way to close
56      * stream writers cyclically such that PE k first sends it's Close-packet to
57      * k+1, k+2, etc.
58      */
59     class Writers : public std::vector<BlockWriter<StreamSink> >
60     {
61     public:
62         Writers(size_t my_worker_rank = 0);
63 
64         //! copyable: default copy-constructor
65         Writers(const Writers&) = default;
66         //! copyable: default assignment operator
67         Writers& operator = (const Writers&) = default;
68         //! move-constructor: default
69         Writers(Writers&&) = default;
70         //! move-assignment operator: default
71         Writers& operator = (Writers&&) = default;
72 
73         //! custom destructor to close writers is a cyclic fashion
74         void Close();
75 
76         //! custom destructor to close writers is a cyclic fashion
77         ~Writers();
78 
79     private:
80         //! rank of this worker
81         size_t my_worker_rank_;
82     };
83 
84     StreamData(StreamSetBase* stream_set_base,
85                Multiplexer& multiplexer, size_t send_size_limit,
86                const StreamId& id, size_t local_worker_id, size_t dia_id);
87 
88     virtual ~StreamData();
89 
90     //! Return stream id
id() const91     const StreamId& id() const { return id_; }
92 
93     //! return stream type string
94     virtual const char * stream_type() = 0;
95 
96     //! Returns my_host_rank
my_host_rank() const97     size_t my_host_rank() const { return multiplexer_.my_host_rank(); }
98     //! Number of hosts in system
num_hosts() const99     size_t num_hosts() const { return multiplexer_.num_hosts(); }
100     //! Number of workers in system
num_workers() const101     size_t num_workers() const { return multiplexer_.num_workers(); }
102 
103     //! Returns workers_per_host
workers_per_host() const104     size_t workers_per_host() const { return multiplexer_.workers_per_host(); }
105     //! Returns my_worker_rank_
my_worker_rank() const106     size_t my_worker_rank() const {
107         return my_host_rank() * workers_per_host() + local_worker_id_;
108     }
109 
110     /*------------------------------------------------------------------------*/
111 
112     //! shuts the stream down.
113     virtual void Close() = 0;
114 
115     virtual bool closed() const = 0;
116 
117     //! Creates BlockWriters for each worker. BlockWriter can only be opened
118     //! once, otherwise the block sequence is incorrectly interleaved!
119     virtual Writers GetWriters() = 0;
120 
121     //! method called from StreamSink when it is closed, used to aggregate Close
122     //! messages to remote hosts
123     void OnWriterClosed(size_t peer_worker_rank, bool sent);
124 
125     //! method called when all StreamSink writers have finished
126     void OnAllWritersClosed();
127 
128     /*------------------------------------------------------------------------*/
129     ///////// expose these members - getters would be too java-ish /////////////
130 
131     //! StatsCounter for incoming data transfer.  Does not include loopback data
132     //! transfer
133     std::atomic<size_t>
134     rx_net_items_ { 0 }, rx_net_bytes_ { 0 }, rx_net_blocks_ { 0 };
135 
136     //! StatsCounters for outgoing data transfer - shared by all sinks.  Does
137     //! not include loopback data transfer
138     std::atomic<size_t>
139     tx_net_items_ { 0 }, tx_net_bytes_ { 0 }, tx_net_blocks_ { 0 };
140 
141     //! StatsCounter for incoming data transfer.  Exclusively contains only
142     //! loopback (internal) data transfer
143     std::atomic<size_t>
144     rx_int_items_ { 0 }, rx_int_bytes_ { 0 }, rx_int_blocks_ { 0 };
145 
146     //! StatsCounters for outgoing data transfer - shared by all sinks.
147     //! Exclusively contains only loopback (internal) data transfer
148     std::atomic<size_t>
149     tx_int_items_ { 0 }, tx_int_bytes_ { 0 }, tx_int_blocks_ { 0 };
150 
151     //! Timers from creation of stream until rx / tx direction is closed.
152     common::StatsTimerStart tx_lifetime_, rx_lifetime_;
153 
154     //! Timers from first rx / tx package until rx / tx direction is closed.
155     common::StatsTimerStopped tx_timespan_, rx_timespan_;
156 
157     //! semaphore to stall the amount of PinnedBlocks (measured in bytes) passed
158     //! to the network layer for transmission.
159     tlx::Semaphore sem_queue_;
160 
161     ///////////////////////////////////////////////////////////////////////////
162 
163 protected:
164     //! our own stream id.
165     StreamId id_;
166 
167     //! pointer to StreamSetBase containing this StreamData
168     StreamSetBase* stream_set_base_;
169 
170     //! local worker id
171     size_t local_worker_id_;
172 
173     //! associated DIANode id.
174     size_t dia_id_;
175 
176     //! reference to multiplexer
177     Multiplexer& multiplexer_;
178 
179     //! number of remaining expected stream closing operations. Required to know
180     //! when to stop rx_lifetime
181     std::atomic<size_t> remaining_closing_blocks_;
182 
183     //! number of received stream closing Blocks.
184     tlx::Semaphore sem_closing_blocks_;
185 
186     //! number of writers closed via StreamSink.
187     size_t writers_closed_ = 0;
188 
189     //! bool if all writers were closed
190     bool all_writers_closed_ = false;
191 
192     //! friends for access to multiplexer_
193     friend class StreamSink;
194 };
195 
196 using StreamDataPtr = tlx::CountingPtr<StreamData>;
197 
198 /*!
199  * Base class for StreamSet.
200  */
201 class StreamSetBase : public tlx::ReferenceCounter
202 {
203 public:
204     static constexpr bool debug = false;
205 
~StreamSetBase()206     virtual ~StreamSetBase() { }
207 
208     //! Close all streams in the set.
209     virtual void Close() = 0;
210 
211     //! method called from StreamSink when it is closed, used to aggregate Close
212     //! messages to remote hosts
213     virtual void OnWriterClosed(size_t peer_worker_rank, bool sent) = 0;
214 };
215 
216 /*!
217  * Simple structure that holds a all stream instances for the workers on the
218  * local host for a given stream id.
219  */
220 template <typename StreamData>
221 class StreamSet : public StreamSetBase
222 {
223 public:
224     using StreamDataPtr = tlx::CountingPtr<StreamData>;
225 
226     //! Creates a StreamSet with the given number of streams (num workers per
227     //! host).
228     StreamSet(Multiplexer& multiplexer, size_t send_size_limit,
229               StreamId id, size_t workers_per_host, size_t dia_id);
230 
231     //! Returns the stream that will be consumed by the worker with the given
232     //! local id
233     StreamDataPtr Peer(size_t local_worker_id);
234 
235     //! Release local_worker_id, returns true when all individual streams are
236     //! done.
237     bool Release(size_t local_worker_id);
238 
239     //! Close all StreamData objects
240     void Close() final;
241 
242     //! method called from StreamSink when it is closed, used to aggregate Close
243     //! messages to remote hosts
244     void OnWriterClosed(size_t peer_worker_rank, bool sent);
245 
246     //! Returns my_host_rank
my_host_rank() const247     size_t my_host_rank() const { return multiplexer_.my_host_rank(); }
248     //! Number of hosts in system
num_hosts() const249     size_t num_hosts() const { return multiplexer_.num_hosts(); }
250     //! Returns workers_per_host
workers_per_host() const251     size_t workers_per_host() const { return multiplexer_.workers_per_host(); }
252 
253     inline MagicByte magic_byte() const;
254 
255 private:
256     //! reference to multiplexer
257     Multiplexer& multiplexer_;
258     //! stream id
259     StreamId id_;
260     //! 'owns' all streams belonging to one stream id for all local workers.
261     std::vector<StreamDataPtr> streams_;
262     //! countdown to destruction
263     size_t remaining_;
264     //! number of writers closed per host, message is set when all are closed
265     std::vector<size_t> writers_closed_per_host_;
266     //! number of writers closed per host, message is set when all are closed
267     std::vector<size_t> writers_closed_per_host_sent_;
268     //! mutex for working on the data structure
269     std::mutex mutex_;
270 };
271 
272 //! \}
273 
274 } // namespace data
275 } // namespace thrill
276 
277 #endif // !THRILL_DATA_STREAM_DATA_HEADER
278 
279 /******************************************************************************/
280