1 /*******************************************************************************
2 * thrill/net/group.cpp
3 *
4 * Part of Project Thrill - http://project-thrill.org
5 *
6 * Copyright (C) 2015 Timo Bingmann <tb@panthema.net>
7 *
8 * All rights reserved. Published under the BSD-2 license in the LICENSE file.
9 ******************************************************************************/
10
11 #include <thrill/net/collective.hpp>
12 #include <thrill/net/group.hpp>
13 #include <thrill/net/manager.hpp>
14 #include <thrill/net/mock/group.hpp>
15
16 #if THRILL_HAVE_NET_TCP
17 #include <thrill/net/tcp/group.hpp>
18 #endif
19
20 #include <functional>
21 #include <utility>
22 #include <vector>
23
24 namespace thrill {
25 namespace net {
26
RunLoopbackGroupTest(size_t num_hosts,const std::function<void (Group *)> & thread_function)27 void RunLoopbackGroupTest(
28 size_t num_hosts,
29 const std::function<void(Group*)>& thread_function) {
30 #if THRILL_HAVE_NET_TCP
31 // construct local tcp network mesh and run threads
32 ExecuteGroupThreads(
33 tcp::Group::ConstructLoopbackMesh(num_hosts),
34 thread_function);
35 #else
36 // construct mock network mesh and run threads
37 ExecuteGroupThreads(
38 mock::Group::ConstructLoopbackMesh(num_hosts),
39 thread_function);
40 #endif
41 }
42
operator <<(std::ostream & os,const Traffic & t)43 std::ostream& operator << (std::ostream& os, const Traffic& t) {
44 return os << t.total();
45 }
46
47 /******************************************************************************/
48 // Manager
49
Manager(std::array<GroupPtr,kGroupCount> && groups,common::JsonLogger & logger)50 Manager::Manager(std::array<GroupPtr, kGroupCount>&& groups,
51 common::JsonLogger& logger) noexcept
52 : groups_(std::move(groups)), logger_(logger) { }
53
Manager(std::vector<GroupPtr> && groups,common::JsonLogger & logger)54 Manager::Manager(std::vector<GroupPtr>&& groups,
55 common::JsonLogger& logger) noexcept
56 : logger_(logger) {
57 assert(groups.size() == kGroupCount);
58 std::move(groups.begin(), groups.end(), groups_.begin());
59 }
60
Close()61 void Manager::Close() {
62 for (size_t i = 0; i < kGroupCount; i++) {
63 groups_[i]->Close();
64 }
65 }
66
Traffic() const67 net::Traffic Manager::Traffic() const {
68 size_t total_tx = 0, total_rx = 0;
69
70 for (size_t g = 0; g < kGroupCount; ++g) {
71 Group& group = *groups_[g];
72
73 for (size_t h = 0; h < group.num_hosts(); ++h) {
74 if (h == group.my_host_rank()) continue;
75
76 total_tx += group.connection(h).tx_bytes_;
77 total_rx += group.connection(h).rx_bytes_;
78 }
79 }
80
81 return net::Traffic(total_tx, total_rx);
82 }
83
RunTask(const std::chrono::steady_clock::time_point & tp)84 void Manager::RunTask(const std::chrono::steady_clock::time_point& tp) {
85
86 common::JsonLine line = logger_.line();
87 line << "class" << "NetManager"
88 << "event" << "profile";
89
90 double elapsed = static_cast<double>(
91 std::chrono::duration_cast<std::chrono::microseconds>(
92 tp - tp_last_).count()) / 1e6;
93
94 size_t total_tx = 0, total_rx = 0;
95 size_t prev_total_tx = 0, prev_total_rx = 0;
96 size_t total_tx_active = 0, total_rx_active = 0;
97
98 for (size_t g = 0; g < kGroupCount; ++g) {
99 Group& group = *groups_[g];
100
101 size_t group_tx = 0, group_rx = 0;
102 size_t prev_group_tx = 0, prev_group_rx = 0;
103 size_t group_tx_active = 0, group_rx_active = 0;
104 std::vector<size_t> tx_per_host(group.num_hosts());
105 std::vector<size_t> rx_per_host(group.num_hosts());
106
107 for (size_t h = 0; h < group.num_hosts(); ++h) {
108 if (h == group.my_host_rank()) continue;
109
110 Connection& conn = group.connection(h);
111
112 size_t tx = conn.tx_bytes_.load(std::memory_order_relaxed);
113 size_t rx = conn.rx_bytes_.load(std::memory_order_relaxed);
114 size_t prev_tx = conn.prev_tx_bytes_;
115 size_t prev_rx = conn.prev_rx_bytes_;
116
117 group_tx += tx;
118 prev_group_tx += prev_tx;
119 group.connection(h).prev_tx_bytes_ = tx;
120 group_tx_active += conn.tx_active_;
121
122 group_rx += rx;
123 prev_group_rx += prev_rx;
124 group.connection(h).prev_rx_bytes_ = rx;
125 group_rx_active += conn.rx_active_;
126
127 tx_per_host[h] = tx;
128 rx_per_host[h] = rx;
129 }
130
131 line.sub(g == 0 ? "flow" : g == 1 ? "data" : "???")
132 << "tx_bytes" << group_tx
133 << "rx_bytes" << group_rx
134 << "tx_speed"
135 << static_cast<double>(group_tx - prev_group_tx) / elapsed
136 << "rx_speed"
137 << static_cast<double>(group_rx - prev_group_rx) / elapsed
138 << "tx_per_host" << tx_per_host
139 << "rx_per_host" << rx_per_host;
140
141 total_tx += group_tx;
142 total_rx += group_rx;
143 prev_total_tx += prev_group_tx;
144 prev_total_rx += prev_group_rx;
145 total_tx_active += group_tx_active;
146 total_rx_active += group_rx_active;
147
148 tp_last_ = tp;
149 }
150
151 // write out totals
152 line
153 << "tx_bytes" << total_tx
154 << "rx_bytes" << total_rx
155 << "tx_speed"
156 << static_cast<double>(total_tx - prev_total_tx) / elapsed
157 << "rx_speed"
158 << static_cast<double>(total_rx - prev_total_rx) / elapsed
159 << "tx_active" << total_tx_active
160 << "rx_active" << total_rx_active;
161 }
162
163 /******************************************************************************/
164 // Group
165
num_parallel_async() const166 size_t Group::num_parallel_async() const {
167 return 0;
168 }
169
170 /*[[[perl
171 for my $e (
172 ["int", "Int"], ["unsigned int", "UnsignedInt"],
173 ["long", "Long"], ["unsigned long", "UnsignedLong"],
174 ["long long", "LongLong"], ["unsigned long long", "UnsignedLongLong"])
175 {
176 print "void Group::PrefixSumPlus$$e[1]($$e[0]& value, const $$e[0]& initial) {\n";
177 print " return PrefixSumSelect(value, std::plus<$$e[0]>(), initial, true);\n";
178 print "}\n";
179
180 print "void Group::ExPrefixSumPlus$$e[1]($$e[0]& value, const $$e[0]& initial) {\n";
181 print " return PrefixSumSelect(value, std::plus<$$e[0]>(), initial, false);\n";
182 print "}\n";
183
184 print "void Group::Broadcast$$e[1]($$e[0]& value, size_t origin) {\n";
185 print " return BroadcastSelect(value, origin);\n";
186 print "}\n";
187
188 print "void Group::AllReducePlus$$e[1]($$e[0]& value) {\n";
189 print " return AllReduceSelect(value, std::plus<$$e[0]>());\n";
190 print "}\n";
191
192 print "void Group::AllReduceMinimum$$e[1]($$e[0]& value) {\n";
193 print " return AllReduceSelect(value, common::minimum<$$e[0]>());\n";
194 print "}\n";
195
196 print "void Group::AllReduceMaximum$$e[1]($$e[0]& value) {\n";
197 print " return AllReduceSelect(value, common::maximum<$$e[0]>());\n";
198 print "}\n";
199 }
200 ]]]*/
PrefixSumPlusInt(int & value,const int & initial)201 void Group::PrefixSumPlusInt(int& value, const int& initial) {
202 return PrefixSumSelect(value, std::plus<int>(), initial, true);
203 }
ExPrefixSumPlusInt(int & value,const int & initial)204 void Group::ExPrefixSumPlusInt(int& value, const int& initial) {
205 return PrefixSumSelect(value, std::plus<int>(), initial, false);
206 }
BroadcastInt(int & value,size_t origin)207 void Group::BroadcastInt(int& value, size_t origin) {
208 return BroadcastSelect(value, origin);
209 }
AllReducePlusInt(int & value)210 void Group::AllReducePlusInt(int& value) {
211 return AllReduceSelect(value, std::plus<int>());
212 }
AllReduceMinimumInt(int & value)213 void Group::AllReduceMinimumInt(int& value) {
214 return AllReduceSelect(value, common::minimum<int>());
215 }
AllReduceMaximumInt(int & value)216 void Group::AllReduceMaximumInt(int& value) {
217 return AllReduceSelect(value, common::maximum<int>());
218 }
PrefixSumPlusUnsignedInt(unsigned int & value,const unsigned int & initial)219 void Group::PrefixSumPlusUnsignedInt(unsigned int& value, const unsigned int& initial) {
220 return PrefixSumSelect(value, std::plus<unsigned int>(), initial, true);
221 }
ExPrefixSumPlusUnsignedInt(unsigned int & value,const unsigned int & initial)222 void Group::ExPrefixSumPlusUnsignedInt(unsigned int& value, const unsigned int& initial) {
223 return PrefixSumSelect(value, std::plus<unsigned int>(), initial, false);
224 }
BroadcastUnsignedInt(unsigned int & value,size_t origin)225 void Group::BroadcastUnsignedInt(unsigned int& value, size_t origin) {
226 return BroadcastSelect(value, origin);
227 }
AllReducePlusUnsignedInt(unsigned int & value)228 void Group::AllReducePlusUnsignedInt(unsigned int& value) {
229 return AllReduceSelect(value, std::plus<unsigned int>());
230 }
AllReduceMinimumUnsignedInt(unsigned int & value)231 void Group::AllReduceMinimumUnsignedInt(unsigned int& value) {
232 return AllReduceSelect(value, common::minimum<unsigned int>());
233 }
AllReduceMaximumUnsignedInt(unsigned int & value)234 void Group::AllReduceMaximumUnsignedInt(unsigned int& value) {
235 return AllReduceSelect(value, common::maximum<unsigned int>());
236 }
PrefixSumPlusLong(long & value,const long & initial)237 void Group::PrefixSumPlusLong(long& value, const long& initial) {
238 return PrefixSumSelect(value, std::plus<long>(), initial, true);
239 }
ExPrefixSumPlusLong(long & value,const long & initial)240 void Group::ExPrefixSumPlusLong(long& value, const long& initial) {
241 return PrefixSumSelect(value, std::plus<long>(), initial, false);
242 }
BroadcastLong(long & value,size_t origin)243 void Group::BroadcastLong(long& value, size_t origin) {
244 return BroadcastSelect(value, origin);
245 }
AllReducePlusLong(long & value)246 void Group::AllReducePlusLong(long& value) {
247 return AllReduceSelect(value, std::plus<long>());
248 }
AllReduceMinimumLong(long & value)249 void Group::AllReduceMinimumLong(long& value) {
250 return AllReduceSelect(value, common::minimum<long>());
251 }
AllReduceMaximumLong(long & value)252 void Group::AllReduceMaximumLong(long& value) {
253 return AllReduceSelect(value, common::maximum<long>());
254 }
PrefixSumPlusUnsignedLong(unsigned long & value,const unsigned long & initial)255 void Group::PrefixSumPlusUnsignedLong(unsigned long& value, const unsigned long& initial) {
256 return PrefixSumSelect(value, std::plus<unsigned long>(), initial, true);
257 }
ExPrefixSumPlusUnsignedLong(unsigned long & value,const unsigned long & initial)258 void Group::ExPrefixSumPlusUnsignedLong(unsigned long& value, const unsigned long& initial) {
259 return PrefixSumSelect(value, std::plus<unsigned long>(), initial, false);
260 }
BroadcastUnsignedLong(unsigned long & value,size_t origin)261 void Group::BroadcastUnsignedLong(unsigned long& value, size_t origin) {
262 return BroadcastSelect(value, origin);
263 }
AllReducePlusUnsignedLong(unsigned long & value)264 void Group::AllReducePlusUnsignedLong(unsigned long& value) {
265 return AllReduceSelect(value, std::plus<unsigned long>());
266 }
AllReduceMinimumUnsignedLong(unsigned long & value)267 void Group::AllReduceMinimumUnsignedLong(unsigned long& value) {
268 return AllReduceSelect(value, common::minimum<unsigned long>());
269 }
AllReduceMaximumUnsignedLong(unsigned long & value)270 void Group::AllReduceMaximumUnsignedLong(unsigned long& value) {
271 return AllReduceSelect(value, common::maximum<unsigned long>());
272 }
PrefixSumPlusLongLong(long long & value,const long long & initial)273 void Group::PrefixSumPlusLongLong(long long& value, const long long& initial) {
274 return PrefixSumSelect(value, std::plus<long long>(), initial, true);
275 }
ExPrefixSumPlusLongLong(long long & value,const long long & initial)276 void Group::ExPrefixSumPlusLongLong(long long& value, const long long& initial) {
277 return PrefixSumSelect(value, std::plus<long long>(), initial, false);
278 }
BroadcastLongLong(long long & value,size_t origin)279 void Group::BroadcastLongLong(long long& value, size_t origin) {
280 return BroadcastSelect(value, origin);
281 }
AllReducePlusLongLong(long long & value)282 void Group::AllReducePlusLongLong(long long& value) {
283 return AllReduceSelect(value, std::plus<long long>());
284 }
AllReduceMinimumLongLong(long long & value)285 void Group::AllReduceMinimumLongLong(long long& value) {
286 return AllReduceSelect(value, common::minimum<long long>());
287 }
AllReduceMaximumLongLong(long long & value)288 void Group::AllReduceMaximumLongLong(long long& value) {
289 return AllReduceSelect(value, common::maximum<long long>());
290 }
PrefixSumPlusUnsignedLongLong(unsigned long long & value,const unsigned long long & initial)291 void Group::PrefixSumPlusUnsignedLongLong(unsigned long long& value, const unsigned long long& initial) {
292 return PrefixSumSelect(value, std::plus<unsigned long long>(), initial, true);
293 }
ExPrefixSumPlusUnsignedLongLong(unsigned long long & value,const unsigned long long & initial)294 void Group::ExPrefixSumPlusUnsignedLongLong(unsigned long long& value, const unsigned long long& initial) {
295 return PrefixSumSelect(value, std::plus<unsigned long long>(), initial, false);
296 }
BroadcastUnsignedLongLong(unsigned long long & value,size_t origin)297 void Group::BroadcastUnsignedLongLong(unsigned long long& value, size_t origin) {
298 return BroadcastSelect(value, origin);
299 }
AllReducePlusUnsignedLongLong(unsigned long long & value)300 void Group::AllReducePlusUnsignedLongLong(unsigned long long& value) {
301 return AllReduceSelect(value, std::plus<unsigned long long>());
302 }
AllReduceMinimumUnsignedLongLong(unsigned long long & value)303 void Group::AllReduceMinimumUnsignedLongLong(unsigned long long& value) {
304 return AllReduceSelect(value, common::minimum<unsigned long long>());
305 }
AllReduceMaximumUnsignedLongLong(unsigned long long & value)306 void Group::AllReduceMaximumUnsignedLongLong(unsigned long long& value) {
307 return AllReduceSelect(value, common::maximum<unsigned long long>());
308 }
309 // [[[end]]]
310
311 } // namespace net
312 } // namespace thrill
313
314 /******************************************************************************/
315