1 /*******************************************************************************
2  * thrill/data/cat_stream.hpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
7  * Copyright (C) 2015 Tobias Sturm <mail@tobiassturm.de>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #pragma once
13 #ifndef THRILL_DATA_CAT_STREAM_HEADER
14 #define THRILL_DATA_CAT_STREAM_HEADER
15 
16 #include <thrill/data/block_queue.hpp>
17 #include <thrill/data/cat_block_source.hpp>
18 #include <thrill/data/stream.hpp>
19 
20 #include <string>
21 #include <vector>
22 
23 namespace thrill {
24 namespace data {
25 
26 //! \addtogroup data_layer
27 //! \{
28 
29 /*!
30  * A Stream is a virtual set of connections to all other worker instances,
31  * hence a "Stream" bundles them to a logical communication context. We call an
32  * individual connection from a worker to another worker a "Host".
33  *
34  * To use a Stream, one can get a vector of BlockWriter via OpenWriters() of
35  * outbound Stream. The vector is of size of workers in the system.
36  * One can then write items destined to the
37  * corresponding worker. The written items are buffered into a Block and only
38  * sent when the Block is full. To force a send, use BlockWriter::Flush(). When
39  * all items are sent, the BlockWriters **must** be closed using
40  * BlockWriter::Close().
41  *
42  * To read the inbound Connection items, one can get a vector of BlockReader via
43  * OpenReaders(), which can then be used to read items sent by individual
44  * workers.
45  *
46  * Alternatively, one can use OpenReader() to get a BlockReader which delivers
47  * all items from *all* worker in worker order (concatenating all inbound
48  * Connections).
49  *
50  * As soon as all attached streams of the Stream have been Close() the number of
51  * expected streams is reached, the stream is marked as finished and no more
52  * data will arrive.
53  */
54 class CatStreamData final : public StreamData
55 {
56 public:
57     static constexpr bool debug = false;
58     static constexpr bool debug_data = false;
59 
60     using BlockQueueSource = ConsumeBlockQueueSource;
61     using BlockQueueReader = BlockReader<BlockQueueSource>;
62 
63     using CatBlockSource = data::CatBlockSource<DynBlockSource>;
64     using CatBlockReader = BlockReader<CatBlockSource>;
65 
66     using Reader = BlockQueueReader;
67     using CatReader = CatBlockReader;
68 
69     using Handle = CatStream;
70 
71     //! Creates a new stream instance
72     CatStreamData(StreamSetBase* stream_set_base,
73                   Multiplexer& multiplexer, size_t send_size_limit,
74                   const StreamId& id, size_t local_worker_id, size_t dia_id);
75 
76     //! non-copyable: delete copy-constructor
77     CatStreamData(const CatStreamData&) = delete;
78     //! non-copyable: delete assignment operator
79     CatStreamData& operator = (const CatStreamData&) = delete;
80     //! move-constructor: default
81     CatStreamData(CatStreamData&&) = default;
82 
83     ~CatStreamData() final;
84 
85     //! return stream type string
86     const char * stream_type() final;
87 
88     //! change dia_id after construction (needed because it may be unknown at
89     //! construction)
90     void set_dia_id(size_t dia_id);
91 
92     //! Creates BlockWriters for each worker. BlockWriter can only be opened
93     //! once, otherwise the block sequence is incorrectly interleaved!
94     Writers GetWriters() final;
95 
96     //! Creates a BlockReader for each worker. The BlockReaders are attached to
97     //! the BlockQueues in the Stream and wait for further Blocks to arrive or
98     //! the Stream's remote close. These Readers _always_ consume!
99     std::vector<Reader> GetReaders();
100 
101     //! Gets a CatBlockSource which includes all incoming queues of this stream.
102     CatBlockSource GetCatBlockSource(bool consume);
103 
104     //! Creates a BlockReader which concatenates items from all workers in
105     //! worker rank order. The BlockReader is attached to one \ref
106     //! CatBlockSource which includes all incoming queues of this stream.
107     CatReader GetCatReader(bool consume);
108 
109     //! Open a CatReader (function name matches a method in File and MixStream).
110     CatReader GetReader(bool consume);
111 
112     //! shuts the stream down.
113     void Close() final;
114 
115     //! Indicates if the stream is closed - meaning all remaining streams have
116     //! been closed. This does *not* include the loopback stream
117     bool closed() const final;
118 
119     //! check if inbound queue is closed
120     bool is_queue_closed(size_t from);
121 
122 private:
123     bool is_closed_ = false;
124 
125     struct SeqReordering;
126 
127     //! Block Sequence numbers
128     std::vector<SeqReordering> seq_;
129 
130     //! BlockQueues to store incoming Blocks with no attached destination.
131     std::vector<BlockQueue> queues_;
132 
133     //! for calling methods to deliver blocks
134     friend class Multiplexer;
135 
136     //! called from Multiplexer when there is a new Block on a
137     //! Stream.
138     void OnStreamBlock(size_t from, uint32_t seq, Block&& b);
139 
140     void OnStreamBlockOrdered(size_t from, Block&& b);
141 
142     //! Returns the loopback queue for the worker of this stream.
143     BlockQueue * loopback_queue(size_t from_worker_id);
144 };
145 
146 // we have two types of CatStream smart pointers: one for internal use in the
147 // Multiplexer (ordinary CountingPtr), and another for public handles in the
148 // DIANodes. Once all public handles are deleted, the CatStream is deactivated.
149 using CatStreamDataPtr = tlx::CountingPtr<CatStreamData>;
150 
151 using CatStreamSet = StreamSet<CatStreamData>;
152 using CatStreamSetPtr = tlx::CountingPtr<CatStreamSet>;
153 
154 //! Ownership handle onto a CatStreamData
155 class CatStream final : public Stream
156 {
157 public:
158     using Writer = CatStreamData::Writer;
159     using Reader = CatStreamData::Reader;
160 
161     using CatReader = CatStreamData::CatReader;
162 
163     explicit CatStream(const CatStreamDataPtr& ptr);
164 
165     //! When the user handle is destroyed, close the stream (but maybe not
166     //! destroy the data object)
167     ~CatStream();
168 
169     const StreamId& id() const final;
170 
171     //! Return stream data reference
172     StreamData& data() final;
173 
174     //! Return stream data reference
175     const StreamData& data() const final;
176 
177     //! Creates BlockWriters for each worker. BlockWriter can only be opened
178     //! once, otherwise the block sequence is incorrectly interleaved!
179     Writers GetWriters() final;
180 
181     //! Creates a BlockReader for each worker. The BlockReaders are attached to
182     //! the BlockQueues in the Stream and wait for further Blocks to arrive or
183     //! the Stream's remote close. These Readers _always_ consume!
184     std::vector<Reader> GetReaders();
185 
186     //! Creates a BlockReader which concatenates items from all workers in
187     //! worker rank order. The BlockReader is attached to one \ref
188     //! CatBlockSource which includes all incoming queues of this stream.
189     CatReader GetCatReader(bool consume);
190 
191     //! Open a CatReader (function name matches a method in File and MixStream).
192     CatReader GetReader(bool consume);
193 
194 private:
195     CatStreamDataPtr ptr_;
196 };
197 
198 using CatStreamPtr = tlx::CountingPtr<CatStream>;
199 
200 //! \}
201 
202 } // namespace data
203 } // namespace thrill
204 
205 #endif // !THRILL_DATA_CAT_STREAM_HEADER
206 
207 /******************************************************************************/
208