1 /*******************************************************************************
2 * thrill/data/stream_data.cpp
3 *
4 * Part of Project Thrill - http://project-thrill.org
5 *
6 * Copyright (C) 2015-2017 Timo Bingmann <tb@panthema.net>
7 * Copyright (C) 2015 Tobias Sturm <mail@tobiassturm.de>
8 *
9 * All rights reserved. Published under the BSD-2 license in the LICENSE file.
10 ******************************************************************************/
11
12 #include <thrill/data/stream.hpp>
13
14 #include <thrill/data/cat_stream.hpp>
15 #include <thrill/data/mix_stream.hpp>
16 #include <thrill/data/multiplexer_header.hpp>
17
18 namespace thrill {
19 namespace data {
20
21 /******************************************************************************/
22 // StreamData
23
StreamData(StreamSetBase * stream_set_base,Multiplexer & multiplexer,size_t send_size_limit,const StreamId & id,size_t local_worker_id,size_t dia_id)24 StreamData::StreamData(StreamSetBase* stream_set_base,
25 Multiplexer& multiplexer, size_t send_size_limit,
26 const StreamId& id,
27 size_t local_worker_id, size_t dia_id)
28 : sem_queue_(send_size_limit),
29 id_(id),
30 stream_set_base_(stream_set_base),
31 local_worker_id_(local_worker_id),
32 dia_id_(dia_id),
33 multiplexer_(multiplexer)
34 { }
35
36 StreamData::~StreamData() = default;
37
OnWriterClosed(size_t peer_worker_rank,bool sent)38 void StreamData::OnWriterClosed(size_t peer_worker_rank, bool sent) {
39 ++writers_closed_;
40
41 LOG << "StreamData::OnWriterClosed()"
42 << " my_worker_rank= " << my_worker_rank()
43 << " peer_worker_rank=" << peer_worker_rank
44 << " writers_closed_=" << writers_closed_;
45
46 die_unless(writers_closed_ <= num_hosts() * workers_per_host());
47
48 stream_set_base_->OnWriterClosed(peer_worker_rank, sent);
49
50 if (writers_closed_ == num_hosts() * workers_per_host()) {
51 LOG << "StreamData::OnWriterClosed() final close received";
52
53 tx_lifetime_.StopEventually();
54 tx_timespan_.StopEventually();
55
56 OnAllWritersClosed();
57 all_writers_closed_ = true;
58 }
59 }
60
OnAllWritersClosed()61 void StreamData::OnAllWritersClosed() {
62 multiplexer_.logger()
63 << "class" << "StreamData"
64 << "event" << "close"
65 << "id" << id_
66 << "type" << stream_type()
67 << "dia_id" << dia_id_
68 << "worker_rank"
69 << (my_host_rank() * multiplexer_.workers_per_host())
70 + local_worker_id_
71 << "rx_net_items" << rx_net_items_
72 << "rx_net_bytes" << rx_net_bytes_
73 << "rx_net_blocks" << rx_net_blocks_
74 << "tx_net_items" << tx_net_items_
75 << "tx_net_bytes" << tx_net_bytes_
76 << "tx_net_blocks" << tx_net_blocks_
77 << "rx_int_items" << rx_int_items_
78 << "rx_int_bytes" << rx_int_bytes_
79 << "rx_int_blocks" << rx_int_blocks_
80 << "tx_int_items" << tx_int_items_
81 << "tx_int_bytes" << tx_int_bytes_
82 << "tx_int_blocks" << tx_int_blocks_;
83 }
84
85 /******************************************************************************/
86 // StreamData::Writers
87
Writers(size_t my_worker_rank)88 StreamData::Writers::Writers(size_t my_worker_rank)
89 : my_worker_rank_(my_worker_rank)
90 { }
91
Close()92 void StreamData::Writers::Close() {
93 // close BlockWriters in a cyclic fashion
94 size_t s = size();
95 for (size_t i = 0; i < s; ++i) {
96 operator [] ((i + my_worker_rank_) % s).Close();
97 }
98 }
99
~Writers()100 StreamData::Writers::~Writers() {
101 Close();
102 }
103
104 /******************************************************************************/
105 // StreamSet
106
107 template <typename StreamData>
StreamSet(Multiplexer & multiplexer,size_t send_size_limit,StreamId id,size_t workers_per_host,size_t dia_id)108 StreamSet<StreamData>::StreamSet(Multiplexer& multiplexer, size_t send_size_limit,
109 StreamId id, size_t workers_per_host, size_t dia_id)
110 : multiplexer_(multiplexer), id_(id) {
111 for (size_t i = 0; i < workers_per_host; ++i) {
112 streams_.emplace_back(
113 tlx::make_counting<StreamData>(
114 this, multiplexer, send_size_limit, id, i, dia_id));
115 }
116 remaining_ = workers_per_host;
117 writers_closed_per_host_.resize(num_hosts());
118 writers_closed_per_host_sent_.resize(num_hosts());
119 }
120
121 template <typename StreamData>
Peer(size_t local_worker_id)122 tlx::CountingPtr<StreamData> StreamSet<StreamData>::Peer(size_t local_worker_id) {
123 assert(local_worker_id < streams_.size());
124 return streams_[local_worker_id];
125 }
126
127 template <typename StreamData>
Release(size_t local_worker_id)128 bool StreamSet<StreamData>::Release(size_t local_worker_id) {
129 std::unique_lock<std::mutex> lock(mutex_);
130 assert(local_worker_id < streams_.size());
131 if (streams_[local_worker_id]) {
132 assert(remaining_ > 0);
133 streams_[local_worker_id].reset();
134 --remaining_;
135 }
136 return (remaining_ == 0);
137 }
138
139 template <typename StreamData>
Close()140 void StreamSet<StreamData>::Close() {
141 for (StreamDataPtr& c : streams_)
142 c->Close();
143 }
144
145 template <typename StreamData>
OnWriterClosed(size_t peer_worker_rank,bool sent)146 void StreamSet<StreamData>::OnWriterClosed(size_t peer_worker_rank, bool sent) {
147 std::unique_lock<std::mutex> lock(mutex_);
148
149 size_t peer_host_rank = peer_worker_rank / workers_per_host();
150 die_unless(peer_host_rank < writers_closed_per_host_.size());
151
152 writers_closed_per_host_[peer_host_rank]++;
153 if (sent)
154 writers_closed_per_host_sent_[peer_host_rank]++;
155
156 LOG << "StreamSet::OnWriterClosed()"
157 << " my_host_rank= " << my_host_rank()
158 << " peer_host_rank=" << peer_host_rank
159 << " peer_worker_rank=" << peer_worker_rank
160 << " writers_closed_per_host_[]="
161 << writers_closed_per_host_[peer_host_rank];
162
163 die_unless(writers_closed_per_host_[peer_host_rank] <=
164 workers_per_host() * workers_per_host());
165
166 if (writers_closed_per_host_[peer_host_rank] ==
167 workers_per_host() * workers_per_host())
168 {
169 if (peer_host_rank == my_host_rank())
170 return;
171
172 if (writers_closed_per_host_[peer_host_rank] ==
173 writers_closed_per_host_sent_[peer_host_rank])
174 {
175 LOG << "StreamSet::OnWriterClosed() final close already-done"
176 << " my_host_rank=" << my_host_rank()
177 << " peer_host_rank=" << peer_host_rank
178 << " writers_closed_per_host_[]="
179 << writers_closed_per_host_[peer_host_rank];
180 return;
181 }
182
183 LOG << "StreamSet::OnWriterClosed() final close "
184 << " my_host_rank=" << my_host_rank()
185 << " peer_host_rank=" << peer_host_rank
186 << " writers_closed_per_host_[]="
187 << writers_closed_per_host_[peer_host_rank];
188
189 StreamMultiplexerHeader header;
190 header.magic = magic_byte();
191 header.stream_id = id_;
192 header.sender_worker = my_host_rank() * workers_per_host();
193 header.receiver_local_worker = StreamMultiplexerHeader::all_workers;
194 header.seq = StreamMultiplexerHeader::final_seq;
195
196 net::BufferBuilder bb;
197 header.Serialize(bb);
198
199 net::Buffer buffer = bb.ToBuffer();
200 assert(buffer.size() == MultiplexerHeader::total_size);
201
202 net::Connection& conn = multiplexer_.group().connection(peer_host_rank);
203
204 multiplexer_.dispatcher().AsyncWrite(
205 conn, 42 + (conn.tx_seq_.fetch_add(2) & 0xFFFF),
206 std::move(buffer));
207 }
208 }
209
210 template <>
magic_byte() const211 MagicByte StreamSet<CatStreamData>::magic_byte() const {
212 return MagicByte::CatStreamBlock;
213 }
214
215 template <>
magic_byte() const216 MagicByte StreamSet<MixStreamData>::magic_byte() const {
217 return MagicByte::MixStreamBlock;
218 }
219
220 template class StreamSet<CatStreamData>;
221 template class StreamSet<MixStreamData>;
222
223 } // namespace data
224 } // namespace thrill
225
226 /******************************************************************************/
227