1 /*******************************************************************************
2 * thrill/net/collective.hpp
3 *
4 * net::Group is a collection of net::Connections providing simple MPI-like
5 * collectives and point-to-point communication.
6 *
7 * Part of Project Thrill - http://project-thrill.org
8 *
9 * Copyright (C) 2015 Robert Hangu <robert.hangu@gmail.com>
10 * Copyright (C) 2015-2016 Timo Bingmann <tb@panthema.net>
11 * Copyright (C) 2015 Lorenz Hübschle-Schneider <lorenz@4z2.de>
12 * Copyright (C) 2017 Nejmeddine Douma <nejmeddine.douma@gmail.com>
13 *
14 * All rights reserved. Published under the BSD-2 license in the LICENSE file.
15 ******************************************************************************/
16
17 #pragma once
18 #ifndef THRILL_NET_COLLECTIVE_HEADER
19 #define THRILL_NET_COLLECTIVE_HEADER
20
21 #include <thrill/common/functional.hpp>
22 #include <thrill/net/group.hpp>
23 #include <tlx/math/ffs.hpp>
24 #include <tlx/math/integer_log2.hpp>
25 #include <tlx/math/is_power_of_two.hpp>
26 #include <tlx/math/round_to_power_of_two.hpp>
27
28 #include <functional>
29
30 namespace thrill {
31 namespace net {
32
33 //! \addtogroup net_layer
34 //! \{
35
36 /******************************************************************************/
37 // Prefixsum Algorithms
38
39 /*!
40 * Calculate for every worker his prefix sum.
41 *
42 * The prefix sum is the aggregation of the values of all workers with lesser
43 * index, including himself, according to a summation operator. The run-time is
44 * in O(log n).
45 *
46 * \param value The value to be summed up
47 * \param sum_op A custom summation operator
48 * \param initial Initial value of prefix sum
49 * \param inclusive Inclusive prefix sum if true (default)
50 */
51 template <typename T, typename BinarySumOp>
PrefixSumDoubling(T & value,BinarySumOp sum_op,const T & initial,bool inclusive)52 void Group::PrefixSumDoubling(T& value, BinarySumOp sum_op,
53 const T& initial, bool inclusive) {
54 static constexpr bool debug = false;
55
56 bool first = true;
57
58 if (my_host_rank() == 0)
59 value = sum_op(initial, value);
60
61 // Use a copy, in case of exclusive, we have to forward
62 // something that's not our result.
63 T to_forward = value;
64
65 // This is based on the pointer-doubling algorithm presented in the ParAlg
66 // script, which is used for list ranking.
67 for (size_t d = 1; d < num_hosts(); d <<= 1) {
68
69 if (my_host_rank() + d < num_hosts()) {
70 sLOG << "Host" << my_host_rank()
71 << ": sending to" << my_host_rank() + d;
72 SendTo(my_host_rank() + d, to_forward);
73 }
74
75 if (my_host_rank() >= d) {
76 T recv_value;
77 ReceiveFrom(my_host_rank() - d, &recv_value);
78 sLOG << "Host" << my_host_rank()
79 << ": receiving from" << my_host_rank() - d;
80
81 // Take care of order, so we don't break associativity.
82 to_forward = sum_op(recv_value, to_forward);
83
84 if (!first || inclusive) {
85 value = sum_op(recv_value, value);
86 }
87 else {
88 value = recv_value;
89 first = false;
90 }
91 }
92 }
93
94 // set worker 0's value for exclusive prefixsums
95 if (!inclusive && my_host_rank() == 0)
96 value = initial;
97 }
98
99 /*!
100 * \brief Calculate for every worker his prefix sum. Works only for worker
101 * numbers which are powers of two.
102 *
103 * \details The prefix sum is an aggregatation of the values of all workers with
104 * smaller index, including itself, according to an associative summation
105 * operator. This function currently only supports worker numbers which are
106 * powers of two.
107 *
108 * \param value The value to be summed up
109 *
110 * \param sum_op A custom summation operator
111 */
112 template <typename T, typename BinarySumOp>
PrefixSumHypercube(T & value,BinarySumOp sum_op)113 void Group::PrefixSumHypercube(T& value, BinarySumOp sum_op) {
114 T total_sum = value;
115
116 static constexpr bool debug = false;
117
118 for (size_t d = 1; d < num_hosts(); d <<= 1)
119 {
120 // communication peer for this round (hypercube dimension)
121 size_t peer = my_host_rank() ^ d;
122
123 // Send total sum of this hypercube to worker with id = id XOR d
124 if (peer < num_hosts()) {
125 SendTo(peer, total_sum);
126 sLOG << "PREFIX_SUM: host" << my_host_rank()
127 << ": sending to peer" << peer;
128 }
129
130 // Receive total sum of smaller hypercube from worker with id = id XOR d
131 T recv_data;
132 if (peer < num_hosts()) {
133 ReceiveFrom(peer, &recv_data);
134 // The order of addition is important. The total sum of the smaller
135 // hypercube always comes first.
136 if (my_host_rank() & d)
137 total_sum = sum_op(recv_data, total_sum);
138 else
139 total_sum = sum_op(total_sum, recv_data);
140 // Variable 'value' represents the prefix sum of this worker
141 if (my_host_rank() & d)
142 // The order of addition is respected the same way as above.
143 value = sum_op(recv_data, value);
144 sLOG << "PREFIX_SUM: host" << my_host_rank()
145 << ": received from peer" << peer;
146 }
147 }
148
149 sLOG << "PREFIX_SUM: host" << my_host_rank() << ": done";
150 }
151
152 //! select prefixsum implementation (often due to total number of processors)
153 template <typename T, typename BinarySumOp>
PrefixSumSelect(T & value,BinarySumOp sum_op,const T & initial,bool inclusive)154 void Group::PrefixSumSelect(T& value, BinarySumOp sum_op,
155 const T& initial, bool inclusive) {
156 return PrefixSumDoubling(value, sum_op, initial, inclusive);
157 }
158
159 template <typename T, typename BinarySumOp>
PrefixSum(T & value,BinarySumOp sum_op,const T & initial)160 void Group::PrefixSum(T& value, BinarySumOp sum_op, const T& initial) {
161 return PrefixSumSelect(value, sum_op, initial, true);
162 }
163
164 template <typename T, typename BinarySumOp>
ExPrefixSum(T & value,BinarySumOp sum_op,const T & initial)165 void Group::ExPrefixSum(T& value, BinarySumOp sum_op, const T& initial) {
166 return PrefixSumSelect(value, sum_op, initial, false);
167 }
168
169 /******************************************************************************/
170 // Broadcast Algorithms
171
172 /*!
173 * Broadcasts the value of the peer with index 0 to all the others. This is a
174 * trivial broadcast from peer 0.
175 *
176 * \param value The value to be broadcast / receive into.
177 *
178 * \param origin The PE to broadcast value from.
179 */
180 template <typename T>
BroadcastTrivial(T & value,size_t origin)181 void Group::BroadcastTrivial(T& value, size_t origin) {
182
183 if (my_host_rank() == origin) {
184 // send value to all peers
185 for (size_t p = 0; p < num_hosts(); ++p) {
186 if (p != origin)
187 SendTo(p, value);
188 }
189 }
190 else {
191 // receive from origin
192 ReceiveFrom(origin, &value);
193 }
194 }
195
196 /*!
197 * Broadcasts the value of the worker with index "origin" to all the
198 * others. This is a binomial tree broadcast method.
199 *
200 * \param value The value to be broadcast / receive into.
201 *
202 * \param origin The PE to broadcast value from.
203 */
204 template <typename T>
BroadcastBinomialTree(T & value,size_t origin)205 void Group::BroadcastBinomialTree(T& value, size_t origin) {
206 static constexpr bool debug = false;
207
208 size_t num_hosts = this->num_hosts();
209 // calculate rank in cyclically shifted binomial tree
210 size_t my_rank = (my_host_rank() + num_hosts - origin) % num_hosts;
211 size_t r = 0, d = 1;
212 // receive from predecessor
213 if (my_rank > 0) {
214 // our predecessor is p with the lowest one bit flipped to zero. this
215 // also counts the number of rounds we have to send out messages later.
216 r = tlx::ffs(my_rank) - 1;
217 d <<= r;
218 size_t from = ((my_rank ^ d) + origin) % num_hosts;
219 sLOG << "Broadcast: rank" << my_rank << "receiving from" << from
220 << "in round" << r;
221 ReceiveFrom(from, &value);
222 }
223 else {
224 d = tlx::round_up_to_power_of_two(num_hosts);
225 }
226 // send to successors
227 for (d >>= 1; d > 0; d >>= 1, ++r) {
228 if (my_rank + d < num_hosts) {
229 size_t to = (my_rank + d + origin) % num_hosts;
230 sLOG << "Broadcast: rank" << my_rank << "round" << r << "sending to"
231 << to;
232 SendTo(to, value);
233 }
234 }
235 }
236
237 //! select broadcast implementation (often due to total number of processors)
238 template <typename T>
BroadcastSelect(T & value,size_t origin)239 void Group::BroadcastSelect(T& value, size_t origin) {
240 return BroadcastBinomialTree(value, origin);
241 }
242
243 /*!
244 * Broadcasts the value of the worker with index 0 to all the others. This is a
245 * binomial tree broadcast method.
246 *
247 * \param value The value to be broadcast / receive into.
248 *
249 * \param origin The PE to broadcast value from.
250 */
251 template <typename T>
Broadcast(T & value,size_t origin)252 void Group::Broadcast(T& value, size_t origin) {
253 return BroadcastSelect(value, origin);
254 }
255
256 /******************************************************************************/
257 // AllGather Algorithms
258
259 template <typename T>
AllGatherRecursiveDoublingPowerOfTwo(T * values,size_t n)260 void Group::AllGatherRecursiveDoublingPowerOfTwo(T* values, size_t n) {
261 size_t num_hosts = this->num_hosts();
262 size_t my_rank = my_host_rank();
263 size_t d = tlx::integer_log2_ceil(num_hosts);
264
265 for (size_t j = 0; j < d; ++j) {
266 size_t peer = my_rank ^ (0x1 << j);
267 // index of first element to be sent
268 size_t snd_pos = (~((0x1 << j) - 1) & my_rank) * n;
269 // index of first element to be received
270 size_t rcv_pos = (~((0x1 << j) - 1) & peer) * n;
271 // number of elements to be sent/received
272 size_t ins_n = (0x1 << j) * n;
273
274 connection(peer).SendReceive(values + snd_pos, values + rcv_pos, ins_n);
275 }
276 }
277
278 template <typename T>
AllGatherBruck(T * values,size_t n)279 void Group::AllGatherBruck(T* values, size_t n) {
280 size_t num_hosts = this->num_hosts();
281 size_t my_rank = my_host_rank();
282 size_t size = num_hosts * n;
283 std::vector<T> temp(size);
284
285 for (size_t i = 0; i < n; ++i) {
286 temp[i] = values[i];
287 }
288
289 for (size_t j = 0; (0x1U << j) < num_hosts; ++j) {
290 size_t snd_peer = (my_rank + num_hosts - (0x1 << j)) % num_hosts;
291 size_t rcv_peer = (my_rank + (0x1 << j)) % num_hosts;
292 // position for received data
293 size_t ins_pos = (0x1 << j) * n;
294 // number of elements to be sent/received
295 size_t ins_n = std::min((0x1 << j) * n, size - ins_pos);
296
297 if ((0x1 << j) & my_rank) {
298 connection(rcv_peer).ReceiveN(temp.data() + ins_pos, ins_n);
299 connection(snd_peer).SendN(temp.data(), ins_n);
300 }
301 else {
302 connection(snd_peer).SendN(temp.data(), ins_n);
303 connection(rcv_peer).ReceiveN(temp.data() + ins_pos, ins_n);
304 }
305 }
306
307 // local reorder: shift whole array by my_rank*n to the right
308 for (size_t i = 0; i < size; ++i) {
309 values[i] = temp[(i + size - my_rank * n) % size];
310 }
311 }
312
313 /******************************************************************************/
314 // Reduce Algorithms
315
316 /*!
317 * \brief Perform a reduction on all workers in a group.
318 *
319 * \details This function aggregates the values of all workers in the group
320 * according with a specified reduction operator. The result will be returned in
321 * the input variable at the root node.
322 *
323 * \param value The input value to be used in the reduction. Will be overwritten
324 * with the result (on the root) or arbitrary data (on other ranks).
325 *
326 * \param root The rank of the root
327 *
328 * \param sum_op A custom reduction operator (optional)
329 */
330 template <typename T, typename BinarySumOp>
Reduce(T & value,size_t root,BinarySumOp sum_op)331 void Group::Reduce(T& value, size_t root, BinarySumOp sum_op) {
332 static constexpr bool debug = false;
333 const size_t num_hosts = this->num_hosts();
334 const size_t my_rank = my_host_rank() + num_hosts;
335 const size_t shifted_rank = (my_rank - root) % num_hosts;
336 sLOG << my_host_rank() << "shifted_rank" << shifted_rank;
337
338 for (size_t d = 1; d < num_hosts; d <<= 1) {
339 if (shifted_rank & d) {
340 sLOG << "Reduce" << my_host_rank()
341 << "->" << (my_rank - d) % num_hosts << "/"
342 << shifted_rank << "->" << shifted_rank - d;
343 SendTo((my_rank - d) % num_hosts, value);
344 break;
345 }
346 else if (shifted_rank + d < num_hosts) {
347 sLOG << "Reduce" << my_host_rank()
348 << "<-" << (my_rank + d) % num_hosts << "/"
349 << shifted_rank << "<-" << shifted_rank + d;
350 T recv_data;
351 ReceiveFrom((my_rank + d) % num_hosts, &recv_data);
352 value = sum_op(value, recv_data);
353 }
354 }
355 }
356
357 /******************************************************************************/
358 // AllReduce Algorithms
359
360 /*!
361 * Perform an All-Reduce on the workers. This is done by aggregating all values
362 * according to a summation operator and sending them backto all workers.
363 *
364 * \param value The value to be added to the aggregation
365 * \param sum_op A custom summation operator
366 */
367 template <typename T, typename BinarySumOp>
AllReduceSimple(T & value,BinarySumOp sum_op)368 void Group::AllReduceSimple(T& value, BinarySumOp sum_op) {
369 Reduce(value, 0, sum_op);
370 Broadcast(value, 0);
371 }
372
373 /*!
374 * Broadcasts the value of the peer with index 0 to all the others. This is a
375 * trivial broadcast from peer 0.
376 *
377 * \param value The value to be broadcast / receive into.
378 *
379 * \param sum_op A custom summation operator
380 */
381 template <typename T, typename BinarySumOp>
AllReduceAtRoot(T & value,BinarySumOp sum_op)382 void Group::AllReduceAtRoot(T& value, BinarySumOp sum_op) {
383
384 if (my_host_rank() == 0) {
385 // receive value from all peers
386 for (size_t p = 1; p < num_hosts(); ++p) {
387 T recv_value;
388 ReceiveFrom(p, &recv_value);
389 value = sum_op(value, recv_value);
390 }
391 // send reduced value back to all peers
392 for (size_t p = 1; p < num_hosts(); ++p) {
393 SendTo(p, value);
394 }
395 }
396 else {
397 // send to root host
398 SendTo(0, value);
399 // receive value back from root
400 ReceiveFrom(0, &value);
401 }
402 }
403
404 /*!
405 * Perform an All-Reduce for powers of two. This is done with the Hypercube
406 * algorithm from the ParAlg script.
407 *
408 * \note This method is no longer used, but it is kept here for reference
409 *
410 * \param value The value to be added to the aggregation
411 * \param sum_op A custom summation operator
412 */
413 template <typename T, typename BinarySumOp>
AllReduceHypercube(T & value,BinarySumOp sum_op)414 void Group::AllReduceHypercube(T& value, BinarySumOp sum_op) {
415 // For each dimension of the hypercube, exchange data between workers with
416 // different bits at position d
417
418 // static constexpr bool debug = false;
419
420 for (size_t d = 1; d < num_hosts(); d <<= 1) {
421 // communication peer for this round (hypercube dimension)
422 size_t peer = my_host_rank() ^ d;
423
424 // SendReceive value to worker with id id ^ d
425 if (peer < num_hosts()) {
426 // LOG << "ALL_REDUCE_HYPERCUBE: Host" << my_host_rank()
427 // << ": Sending" << value << " to worker" << peer;
428
429 // The order of addition is important. The total sum of the smaller
430 // hypercube always comes first.
431 T recv_data;
432 if (my_host_rank() & d) {
433 connection(peer).SendReceive(&value, &recv_data);
434 value = sum_op(recv_data, value);
435 }
436 else {
437 connection(peer).ReceiveSend(value, &recv_data);
438 value = sum_op(value, recv_data);
439 }
440
441 // LOG << "ALL_REDUCE_HYPERCUBE: Host " << my_host_rank()
442 // << ": Received " << recv_data
443 // << " from worker " << peer << " value = " << value;
444 }
445 }
446 }
447
448 /*!
449 * Perform an All-Reduce using the elimination protocol described in
450 * R. Rabenseifner and J. L. Traeff. "More Efficient Reduction Algorithms for
451 * Non-Power-of-Two Number of Processors in Message-Passing Parallel Systems."
452 * In Recent Advances in Parallel Virtual Machine and Message Passing Interface,
453 * 36–46. LNCS 3241. Springer, 2004.
454 *
455 * \param value The value to be added to the aggregation
456 * \param sum_op A custom summation operator
457 */
458 template <typename T, typename BinarySumOp>
AllReduceElimination(T & value,BinarySumOp sum_op)459 void Group::AllReduceElimination(T& value, BinarySumOp sum_op) {
460 AllReduceEliminationProcess(
461 my_host_rank(), 1, num_hosts(), 0, value, sum_op);
462 }
463
464 template <typename T, typename BinarySumOp>
465 T Group::SendReceiveReduce(size_t peer, const T& value, BinarySumOp sum_op) {
466 T recv_data;
467 if (my_host_rank() > peer) {
468 connection(peer).SendReceive(&value, &recv_data);
469 return sum_op(recv_data, value);
470 }
471 else {
472 connection(peer).ReceiveSend(value, &recv_data);
473 return sum_op(value, recv_data);
474 }
475 }
476
477 //! used for the recursive implementation of the elimination protocol
478 template <typename T, typename BinarySumOp>
AllReduceEliminationProcess(size_t host_id,size_t group_size,size_t remaining_hosts,size_t send_to,T & value,BinarySumOp sum_op)479 void Group::AllReduceEliminationProcess(
480 size_t host_id, size_t group_size, size_t remaining_hosts,
481 size_t send_to, T& value, BinarySumOp sum_op) {
482
483 // static const bool debug = false;
484
485 // send_to == 0 => no eliminated host waiting to receive from current host,
486 // host 0 is never eliminated
487
488 size_t group_count = remaining_hosts / group_size;
489 if (group_count % 2 == 0) {
490 // only hypercube
491 size_t peer = host_id ^ group_size;
492 if (peer < remaining_hosts) {
493 value = SendReceiveReduce(peer, value, sum_op);
494 }
495 }
496 else {
497 // check if my rank is in 3-2 elimination zone
498 size_t host_group = host_id / group_size;
499 if (host_group >= group_count - 3) {
500 // take part in the 3-2 elimination
501 if (host_group == group_count - 1) {
502 size_t peer = (host_id ^ group_size) - 2 * group_size;
503 SendTo(peer, value);
504 ReceiveFrom(peer, &value);
505 }
506 else if (host_group == group_count - 2) {
507 size_t peer = (host_id ^ group_size) + 2 * group_size;
508
509 T recv_data;
510 ReceiveFrom(peer, &recv_data);
511 if (my_host_rank() > peer)
512 value = sum_op(recv_data, value);
513 else
514 value = sum_op(value, recv_data);
515
516 // important for gathering
517 send_to = peer;
518
519 peer = host_id ^ group_size;
520 value = SendReceiveReduce(peer, value, sum_op);
521 }
522 else if (host_group == group_count - 3) {
523 size_t peer = host_id ^ group_size;
524 value = SendReceiveReduce(peer, value, sum_op);
525 }
526 }
527 else {
528 // no elimination, execute hypercube
529 size_t peer = host_id ^ group_size;
530 if (peer < remaining_hosts) {
531 value = SendReceiveReduce(peer, value, sum_op);
532 }
533 }
534 remaining_hosts -= group_size;
535 }
536 group_size <<= 1;
537
538 // recursion
539 if (group_size < remaining_hosts) {
540 AllReduceEliminationProcess(
541 host_id, group_size, remaining_hosts, send_to,
542 value, sum_op);
543 }
544 else if (send_to != 0) {
545 SendTo(send_to, value);
546 }
547 }
548
549 //! select allreduce implementation (often due to total number of processors)
550 template <typename T, typename BinarySumOp>
AllReduceSelect(T & value,BinarySumOp sum_op)551 void Group::AllReduceSelect(T& value, BinarySumOp sum_op) {
552 // always use 3-2-elimination reduction method.
553 AllReduceElimination(value, sum_op);
554 /*if (tlx::is_power_of_two(num_hosts()))
555 AllReduceHypercube(value, sum_op);
556 else
557 AllReduceAtRoot(value, sum_op);*/
558 }
559
560 /*!
561 * Perform an All-Reduce on the workers. This is done by aggregating all values
562 * according to a summation operator and sending them backto all workers.
563 *
564 * \param value The value to be added to the aggregation
565 * \param sum_op A custom summation operator
566 */
567 template <typename T, typename BinarySumOp>
AllReduce(T & value,BinarySumOp sum_op)568 void Group::AllReduce(T& value, BinarySumOp sum_op) {
569 return AllReduceSelect(value, sum_op);
570 }
571
572 //! \}
573
574 } // namespace net
575 } // namespace thrill
576
577 #endif // !THRILL_NET_COLLECTIVE_HEADER
578
579 /******************************************************************************/
580