1 /*******************************************************************************
2  * thrill/data/stream_data.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015-2017 Timo Bingmann <tb@panthema.net>
7  * Copyright (C) 2015 Tobias Sturm <mail@tobiassturm.de>
8  *
9  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10  ******************************************************************************/
11 
12 #include <thrill/data/stream.hpp>
13 
14 #include <thrill/data/cat_stream.hpp>
15 #include <thrill/data/mix_stream.hpp>
16 #include <thrill/data/multiplexer_header.hpp>
17 
18 namespace thrill {
19 namespace data {
20 
21 /******************************************************************************/
22 // StreamData
23 
StreamData(StreamSetBase * stream_set_base,Multiplexer & multiplexer,size_t send_size_limit,const StreamId & id,size_t local_worker_id,size_t dia_id)24 StreamData::StreamData(StreamSetBase* stream_set_base,
25                        Multiplexer& multiplexer, size_t send_size_limit,
26                        const StreamId& id,
27                        size_t local_worker_id, size_t dia_id)
28     : sem_queue_(send_size_limit),
29       id_(id),
30       stream_set_base_(stream_set_base),
31       local_worker_id_(local_worker_id),
32       dia_id_(dia_id),
33       multiplexer_(multiplexer)
34 { }
35 
36 StreamData::~StreamData() = default;
37 
OnWriterClosed(size_t peer_worker_rank,bool sent)38 void StreamData::OnWriterClosed(size_t peer_worker_rank, bool sent) {
39     ++writers_closed_;
40 
41     LOG << "StreamData::OnWriterClosed()"
42         << " my_worker_rank= " << my_worker_rank()
43         << " peer_worker_rank=" << peer_worker_rank
44         << " writers_closed_=" << writers_closed_;
45 
46     die_unless(writers_closed_ <= num_hosts() * workers_per_host());
47 
48     stream_set_base_->OnWriterClosed(peer_worker_rank, sent);
49 
50     if (writers_closed_ == num_hosts() * workers_per_host()) {
51         LOG << "StreamData::OnWriterClosed() final close received";
52 
53         tx_lifetime_.StopEventually();
54         tx_timespan_.StopEventually();
55 
56         OnAllWritersClosed();
57         all_writers_closed_ = true;
58     }
59 }
60 
OnAllWritersClosed()61 void StreamData::OnAllWritersClosed() {
62     multiplexer_.logger()
63         << "class" << "StreamData"
64         << "event" << "close"
65         << "id" << id_
66         << "type" << stream_type()
67         << "dia_id" << dia_id_
68         << "worker_rank"
69         << (my_host_rank() * multiplexer_.workers_per_host())
70         + local_worker_id_
71         << "rx_net_items" << rx_net_items_
72         << "rx_net_bytes" << rx_net_bytes_
73         << "rx_net_blocks" << rx_net_blocks_
74         << "tx_net_items" << tx_net_items_
75         << "tx_net_bytes" << tx_net_bytes_
76         << "tx_net_blocks" << tx_net_blocks_
77         << "rx_int_items" << rx_int_items_
78         << "rx_int_bytes" << rx_int_bytes_
79         << "rx_int_blocks" << rx_int_blocks_
80         << "tx_int_items" << tx_int_items_
81         << "tx_int_bytes" << tx_int_bytes_
82         << "tx_int_blocks" << tx_int_blocks_;
83 }
84 
85 /******************************************************************************/
86 // StreamData::Writers
87 
Writers(size_t my_worker_rank)88 StreamData::Writers::Writers(size_t my_worker_rank)
89     : my_worker_rank_(my_worker_rank)
90 { }
91 
Close()92 void StreamData::Writers::Close() {
93     // close BlockWriters in a cyclic fashion
94     size_t s = size();
95     for (size_t i = 0; i < s; ++i) {
96         operator [] ((i + my_worker_rank_) % s).Close();
97     }
98 }
99 
~Writers()100 StreamData::Writers::~Writers() {
101     Close();
102 }
103 
104 /******************************************************************************/
105 // StreamSet
106 
107 template <typename StreamData>
StreamSet(Multiplexer & multiplexer,size_t send_size_limit,StreamId id,size_t workers_per_host,size_t dia_id)108 StreamSet<StreamData>::StreamSet(Multiplexer& multiplexer, size_t send_size_limit,
109                                  StreamId id, size_t workers_per_host, size_t dia_id)
110     : multiplexer_(multiplexer), id_(id) {
111     for (size_t i = 0; i < workers_per_host; ++i) {
112         streams_.emplace_back(
113             tlx::make_counting<StreamData>(
114                 this, multiplexer, send_size_limit, id, i, dia_id));
115     }
116     remaining_ = workers_per_host;
117     writers_closed_per_host_.resize(num_hosts());
118     writers_closed_per_host_sent_.resize(num_hosts());
119 }
120 
121 template <typename StreamData>
Peer(size_t local_worker_id)122 tlx::CountingPtr<StreamData> StreamSet<StreamData>::Peer(size_t local_worker_id) {
123     assert(local_worker_id < streams_.size());
124     return streams_[local_worker_id];
125 }
126 
127 template <typename StreamData>
Release(size_t local_worker_id)128 bool StreamSet<StreamData>::Release(size_t local_worker_id) {
129     std::unique_lock<std::mutex> lock(mutex_);
130     assert(local_worker_id < streams_.size());
131     if (streams_[local_worker_id]) {
132         assert(remaining_ > 0);
133         streams_[local_worker_id].reset();
134         --remaining_;
135     }
136     return (remaining_ == 0);
137 }
138 
139 template <typename StreamData>
Close()140 void StreamSet<StreamData>::Close() {
141     for (StreamDataPtr& c : streams_)
142         c->Close();
143 }
144 
145 template <typename StreamData>
OnWriterClosed(size_t peer_worker_rank,bool sent)146 void StreamSet<StreamData>::OnWriterClosed(size_t peer_worker_rank, bool sent) {
147     std::unique_lock<std::mutex> lock(mutex_);
148 
149     size_t peer_host_rank = peer_worker_rank / workers_per_host();
150     die_unless(peer_host_rank < writers_closed_per_host_.size());
151 
152     writers_closed_per_host_[peer_host_rank]++;
153     if (sent)
154         writers_closed_per_host_sent_[peer_host_rank]++;
155 
156     LOG << "StreamSet::OnWriterClosed()"
157         << " my_host_rank= " << my_host_rank()
158         << " peer_host_rank=" << peer_host_rank
159         << " peer_worker_rank=" << peer_worker_rank
160         << " writers_closed_per_host_[]="
161         << writers_closed_per_host_[peer_host_rank];
162 
163     die_unless(writers_closed_per_host_[peer_host_rank] <=
164                workers_per_host() * workers_per_host());
165 
166     if (writers_closed_per_host_[peer_host_rank] ==
167         workers_per_host() * workers_per_host())
168     {
169         if (peer_host_rank == my_host_rank())
170             return;
171 
172         if (writers_closed_per_host_[peer_host_rank] ==
173             writers_closed_per_host_sent_[peer_host_rank])
174         {
175             LOG << "StreamSet::OnWriterClosed() final close already-done"
176                 << " my_host_rank=" << my_host_rank()
177                 << " peer_host_rank=" << peer_host_rank
178                 << " writers_closed_per_host_[]="
179                 << writers_closed_per_host_[peer_host_rank];
180             return;
181         }
182 
183         LOG << "StreamSet::OnWriterClosed() final close "
184             << " my_host_rank=" << my_host_rank()
185             << " peer_host_rank=" << peer_host_rank
186             << " writers_closed_per_host_[]="
187             << writers_closed_per_host_[peer_host_rank];
188 
189         StreamMultiplexerHeader header;
190         header.magic = magic_byte();
191         header.stream_id = id_;
192         header.sender_worker = my_host_rank() * workers_per_host();
193         header.receiver_local_worker = StreamMultiplexerHeader::all_workers;
194         header.seq = StreamMultiplexerHeader::final_seq;
195 
196         net::BufferBuilder bb;
197         header.Serialize(bb);
198 
199         net::Buffer buffer = bb.ToBuffer();
200         assert(buffer.size() == MultiplexerHeader::total_size);
201 
202         net::Connection& conn = multiplexer_.group().connection(peer_host_rank);
203 
204         multiplexer_.dispatcher().AsyncWrite(
205             conn, 42 + (conn.tx_seq_.fetch_add(2) & 0xFFFF),
206             std::move(buffer));
207     }
208 }
209 
210 template <>
magic_byte() const211 MagicByte StreamSet<CatStreamData>::magic_byte() const {
212     return MagicByte::CatStreamBlock;
213 }
214 
215 template <>
magic_byte() const216 MagicByte StreamSet<MixStreamData>::magic_byte() const {
217     return MagicByte::MixStreamBlock;
218 }
219 
220 template class StreamSet<CatStreamData>;
221 template class StreamSet<MixStreamData>;
222 
223 } // namespace data
224 } // namespace thrill
225 
226 /******************************************************************************/
227