1 /*******************************************************************************
2  * thrill/net/group.cpp
3  *
4  * Part of Project Thrill - http://project-thrill.org
5  *
6  * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
7  *
8  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9  ******************************************************************************/
10 
11 #include <thrill/net/collective.hpp>
12 #include <thrill/net/group.hpp>
13 #include <thrill/net/manager.hpp>
14 #include <thrill/net/mock/group.hpp>
15 
16 #if THRILL_HAVE_NET_TCP
17 #include <thrill/net/tcp/group.hpp>
18 #endif
19 
20 #include <functional>
21 #include <utility>
22 #include <vector>
23 
24 namespace thrill {
25 namespace net {
26 
RunLoopbackGroupTest(size_t num_hosts,const std::function<void (Group *)> & thread_function)27 void RunLoopbackGroupTest(
28     size_t num_hosts,
29     const std::function<void(Group*)>& thread_function) {
30 #if THRILL_HAVE_NET_TCP
31     // construct local tcp network mesh and run threads
32     ExecuteGroupThreads(
33         tcp::Group::ConstructLoopbackMesh(num_hosts),
34         thread_function);
35 #else
36     // construct mock network mesh and run threads
37     ExecuteGroupThreads(
38         mock::Group::ConstructLoopbackMesh(num_hosts),
39         thread_function);
40 #endif
41 }
42 
operator <<(std::ostream & os,const Traffic & t)43 std::ostream& operator << (std::ostream& os, const Traffic& t) {
44     return os << t.total();
45 }
46 
47 /******************************************************************************/
48 // Manager
49 
Manager(std::array<GroupPtr,kGroupCount> && groups,common::JsonLogger & logger)50 Manager::Manager(std::array<GroupPtr, kGroupCount>&& groups,
51                  common::JsonLogger& logger) noexcept
52     : groups_(std::move(groups)), logger_(logger) { }
53 
Manager(std::vector<GroupPtr> && groups,common::JsonLogger & logger)54 Manager::Manager(std::vector<GroupPtr>&& groups,
55                  common::JsonLogger& logger) noexcept
56     : logger_(logger) {
57     assert(groups.size() == kGroupCount);
58     std::move(groups.begin(), groups.end(), groups_.begin());
59 }
60 
Close()61 void Manager::Close() {
62     for (size_t i = 0; i < kGroupCount; i++) {
63         groups_[i]->Close();
64     }
65 }
66 
Traffic() const67 net::Traffic Manager::Traffic() const {
68     size_t total_tx = 0, total_rx = 0;
69 
70     for (size_t g = 0; g < kGroupCount; ++g) {
71         Group& group = *groups_[g];
72 
73         for (size_t h = 0; h < group.num_hosts(); ++h) {
74             if (h == group.my_host_rank()) continue;
75 
76             total_tx += group.connection(h).tx_bytes_;
77             total_rx += group.connection(h).rx_bytes_;
78         }
79     }
80 
81     return net::Traffic(total_tx, total_rx);
82 }
83 
RunTask(const std::chrono::steady_clock::time_point & tp)84 void Manager::RunTask(const std::chrono::steady_clock::time_point& tp) {
85 
86     common::JsonLine line = logger_.line();
87     line << "class" << "NetManager"
88          << "event" << "profile";
89 
90     double elapsed = static_cast<double>(
91         std::chrono::duration_cast<std::chrono::microseconds>(
92             tp - tp_last_).count()) / 1e6;
93 
94     size_t total_tx = 0, total_rx = 0;
95     size_t prev_total_tx = 0, prev_total_rx = 0;
96     size_t total_tx_active = 0, total_rx_active = 0;
97 
98     for (size_t g = 0; g < kGroupCount; ++g) {
99         Group& group = *groups_[g];
100 
101         size_t group_tx = 0, group_rx = 0;
102         size_t prev_group_tx = 0, prev_group_rx = 0;
103         size_t group_tx_active = 0, group_rx_active = 0;
104         std::vector<size_t> tx_per_host(group.num_hosts());
105         std::vector<size_t> rx_per_host(group.num_hosts());
106 
107         for (size_t h = 0; h < group.num_hosts(); ++h) {
108             if (h == group.my_host_rank()) continue;
109 
110             Connection& conn = group.connection(h);
111 
112             size_t tx = conn.tx_bytes_.load(std::memory_order_relaxed);
113             size_t rx = conn.rx_bytes_.load(std::memory_order_relaxed);
114             size_t prev_tx = conn.prev_tx_bytes_;
115             size_t prev_rx = conn.prev_rx_bytes_;
116 
117             group_tx += tx;
118             prev_group_tx += prev_tx;
119             group.connection(h).prev_tx_bytes_ = tx;
120             group_tx_active += conn.tx_active_;
121 
122             group_rx += rx;
123             prev_group_rx += prev_rx;
124             group.connection(h).prev_rx_bytes_ = rx;
125             group_rx_active += conn.rx_active_;
126 
127             tx_per_host[h] = tx;
128             rx_per_host[h] = rx;
129         }
130 
131         line.sub(g == 0 ? "flow" : g == 1 ? "data" : "???")
132             << "tx_bytes" << group_tx
133             << "rx_bytes" << group_rx
134             << "tx_speed"
135             << static_cast<double>(group_tx - prev_group_tx) / elapsed
136             << "rx_speed"
137             << static_cast<double>(group_rx - prev_group_rx) / elapsed
138             << "tx_per_host" << tx_per_host
139             << "rx_per_host" << rx_per_host;
140 
141         total_tx += group_tx;
142         total_rx += group_rx;
143         prev_total_tx += prev_group_tx;
144         prev_total_rx += prev_group_rx;
145         total_tx_active += group_tx_active;
146         total_rx_active += group_rx_active;
147 
148         tp_last_ = tp;
149     }
150 
151     // write out totals
152     line
153         << "tx_bytes" << total_tx
154         << "rx_bytes" << total_rx
155         << "tx_speed"
156         << static_cast<double>(total_tx - prev_total_tx) / elapsed
157         << "rx_speed"
158         << static_cast<double>(total_rx - prev_total_rx) / elapsed
159         << "tx_active" << total_tx_active
160         << "rx_active" << total_rx_active;
161 }
162 
163 /******************************************************************************/
164 // Group
165 
num_parallel_async() const166 size_t Group::num_parallel_async() const {
167     return 0;
168 }
169 
170 /*[[[perl
171   for my $e (
172     ["int", "Int"], ["unsigned int", "UnsignedInt"],
173     ["long", "Long"], ["unsigned long", "UnsignedLong"],
174     ["long long", "LongLong"], ["unsigned long long", "UnsignedLongLong"])
175   {
176     print "void Group::PrefixSumPlus$$e[1]($$e[0]& value, const $$e[0]& initial) {\n";
177     print "    return PrefixSumSelect(value, std::plus<$$e[0]>(), initial, true);\n";
178     print "}\n";
179 
180     print "void Group::ExPrefixSumPlus$$e[1]($$e[0]& value, const $$e[0]& initial) {\n";
181     print "    return PrefixSumSelect(value, std::plus<$$e[0]>(), initial, false);\n";
182     print "}\n";
183 
184     print "void Group::Broadcast$$e[1]($$e[0]& value, size_t origin) {\n";
185     print "    return BroadcastSelect(value, origin);\n";
186     print "}\n";
187 
188     print "void Group::AllReducePlus$$e[1]($$e[0]& value) {\n";
189     print "    return AllReduceSelect(value, std::plus<$$e[0]>());\n";
190     print "}\n";
191 
192     print "void Group::AllReduceMinimum$$e[1]($$e[0]& value) {\n";
193     print "    return AllReduceSelect(value, common::minimum<$$e[0]>());\n";
194     print "}\n";
195 
196     print "void Group::AllReduceMaximum$$e[1]($$e[0]& value) {\n";
197     print "    return AllReduceSelect(value, common::maximum<$$e[0]>());\n";
198     print "}\n";
199   }
200 ]]]*/
PrefixSumPlusInt(int & value,const int & initial)201 void Group::PrefixSumPlusInt(int& value, const int& initial) {
202     return PrefixSumSelect(value, std::plus<int>(), initial, true);
203 }
ExPrefixSumPlusInt(int & value,const int & initial)204 void Group::ExPrefixSumPlusInt(int& value, const int& initial) {
205     return PrefixSumSelect(value, std::plus<int>(), initial, false);
206 }
BroadcastInt(int & value,size_t origin)207 void Group::BroadcastInt(int& value, size_t origin) {
208     return BroadcastSelect(value, origin);
209 }
AllReducePlusInt(int & value)210 void Group::AllReducePlusInt(int& value) {
211     return AllReduceSelect(value, std::plus<int>());
212 }
AllReduceMinimumInt(int & value)213 void Group::AllReduceMinimumInt(int& value) {
214     return AllReduceSelect(value, common::minimum<int>());
215 }
AllReduceMaximumInt(int & value)216 void Group::AllReduceMaximumInt(int& value) {
217     return AllReduceSelect(value, common::maximum<int>());
218 }
PrefixSumPlusUnsignedInt(unsigned int & value,const unsigned int & initial)219 void Group::PrefixSumPlusUnsignedInt(unsigned int& value, const unsigned int& initial) {
220     return PrefixSumSelect(value, std::plus<unsigned int>(), initial, true);
221 }
ExPrefixSumPlusUnsignedInt(unsigned int & value,const unsigned int & initial)222 void Group::ExPrefixSumPlusUnsignedInt(unsigned int& value, const unsigned int& initial) {
223     return PrefixSumSelect(value, std::plus<unsigned int>(), initial, false);
224 }
BroadcastUnsignedInt(unsigned int & value,size_t origin)225 void Group::BroadcastUnsignedInt(unsigned int& value, size_t origin) {
226     return BroadcastSelect(value, origin);
227 }
AllReducePlusUnsignedInt(unsigned int & value)228 void Group::AllReducePlusUnsignedInt(unsigned int& value) {
229     return AllReduceSelect(value, std::plus<unsigned int>());
230 }
AllReduceMinimumUnsignedInt(unsigned int & value)231 void Group::AllReduceMinimumUnsignedInt(unsigned int& value) {
232     return AllReduceSelect(value, common::minimum<unsigned int>());
233 }
AllReduceMaximumUnsignedInt(unsigned int & value)234 void Group::AllReduceMaximumUnsignedInt(unsigned int& value) {
235     return AllReduceSelect(value, common::maximum<unsigned int>());
236 }
PrefixSumPlusLong(long & value,const long & initial)237 void Group::PrefixSumPlusLong(long& value, const long& initial) {
238     return PrefixSumSelect(value, std::plus<long>(), initial, true);
239 }
ExPrefixSumPlusLong(long & value,const long & initial)240 void Group::ExPrefixSumPlusLong(long& value, const long& initial) {
241     return PrefixSumSelect(value, std::plus<long>(), initial, false);
242 }
BroadcastLong(long & value,size_t origin)243 void Group::BroadcastLong(long& value, size_t origin) {
244     return BroadcastSelect(value, origin);
245 }
AllReducePlusLong(long & value)246 void Group::AllReducePlusLong(long& value) {
247     return AllReduceSelect(value, std::plus<long>());
248 }
AllReduceMinimumLong(long & value)249 void Group::AllReduceMinimumLong(long& value) {
250     return AllReduceSelect(value, common::minimum<long>());
251 }
AllReduceMaximumLong(long & value)252 void Group::AllReduceMaximumLong(long& value) {
253     return AllReduceSelect(value, common::maximum<long>());
254 }
PrefixSumPlusUnsignedLong(unsigned long & value,const unsigned long & initial)255 void Group::PrefixSumPlusUnsignedLong(unsigned long& value, const unsigned long& initial) {
256     return PrefixSumSelect(value, std::plus<unsigned long>(), initial, true);
257 }
ExPrefixSumPlusUnsignedLong(unsigned long & value,const unsigned long & initial)258 void Group::ExPrefixSumPlusUnsignedLong(unsigned long& value, const unsigned long& initial) {
259     return PrefixSumSelect(value, std::plus<unsigned long>(), initial, false);
260 }
BroadcastUnsignedLong(unsigned long & value,size_t origin)261 void Group::BroadcastUnsignedLong(unsigned long& value, size_t origin) {
262     return BroadcastSelect(value, origin);
263 }
AllReducePlusUnsignedLong(unsigned long & value)264 void Group::AllReducePlusUnsignedLong(unsigned long& value) {
265     return AllReduceSelect(value, std::plus<unsigned long>());
266 }
AllReduceMinimumUnsignedLong(unsigned long & value)267 void Group::AllReduceMinimumUnsignedLong(unsigned long& value) {
268     return AllReduceSelect(value, common::minimum<unsigned long>());
269 }
AllReduceMaximumUnsignedLong(unsigned long & value)270 void Group::AllReduceMaximumUnsignedLong(unsigned long& value) {
271     return AllReduceSelect(value, common::maximum<unsigned long>());
272 }
PrefixSumPlusLongLong(long long & value,const long long & initial)273 void Group::PrefixSumPlusLongLong(long long& value, const long long& initial) {
274     return PrefixSumSelect(value, std::plus<long long>(), initial, true);
275 }
ExPrefixSumPlusLongLong(long long & value,const long long & initial)276 void Group::ExPrefixSumPlusLongLong(long long& value, const long long& initial) {
277     return PrefixSumSelect(value, std::plus<long long>(), initial, false);
278 }
BroadcastLongLong(long long & value,size_t origin)279 void Group::BroadcastLongLong(long long& value, size_t origin) {
280     return BroadcastSelect(value, origin);
281 }
AllReducePlusLongLong(long long & value)282 void Group::AllReducePlusLongLong(long long& value) {
283     return AllReduceSelect(value, std::plus<long long>());
284 }
AllReduceMinimumLongLong(long long & value)285 void Group::AllReduceMinimumLongLong(long long& value) {
286     return AllReduceSelect(value, common::minimum<long long>());
287 }
AllReduceMaximumLongLong(long long & value)288 void Group::AllReduceMaximumLongLong(long long& value) {
289     return AllReduceSelect(value, common::maximum<long long>());
290 }
PrefixSumPlusUnsignedLongLong(unsigned long long & value,const unsigned long long & initial)291 void Group::PrefixSumPlusUnsignedLongLong(unsigned long long& value, const unsigned long long& initial) {
292     return PrefixSumSelect(value, std::plus<unsigned long long>(), initial, true);
293 }
ExPrefixSumPlusUnsignedLongLong(unsigned long long & value,const unsigned long long & initial)294 void Group::ExPrefixSumPlusUnsignedLongLong(unsigned long long& value, const unsigned long long& initial) {
295     return PrefixSumSelect(value, std::plus<unsigned long long>(), initial, false);
296 }
BroadcastUnsignedLongLong(unsigned long long & value,size_t origin)297 void Group::BroadcastUnsignedLongLong(unsigned long long& value, size_t origin) {
298     return BroadcastSelect(value, origin);
299 }
AllReducePlusUnsignedLongLong(unsigned long long & value)300 void Group::AllReducePlusUnsignedLongLong(unsigned long long& value) {
301     return AllReduceSelect(value, std::plus<unsigned long long>());
302 }
AllReduceMinimumUnsignedLongLong(unsigned long long & value)303 void Group::AllReduceMinimumUnsignedLongLong(unsigned long long& value) {
304     return AllReduceSelect(value, common::minimum<unsigned long long>());
305 }
AllReduceMaximumUnsignedLongLong(unsigned long long & value)306 void Group::AllReduceMaximumUnsignedLongLong(unsigned long long& value) {
307     return AllReduceSelect(value, common::maximum<unsigned long long>());
308 }
309 // [[[end]]]
310 
311 } // namespace net
312 } // namespace thrill
313 
314 /******************************************************************************/
315