1 /*******************************************************************************
2  * thrill/net/collective.hpp
3  *
4  * net::Group is a collection of net::Connections providing simple MPI-like
5  * collectives and point-to-point communication.
6  *
7  * Part of Project Thrill - http://project-thrill.org
8  *
9  * Copyright (C) 2015 Robert Hangu <robert.hangu@gmail.com>
10  * Copyright (C) 2015-2016 Timo Bingmann <tb@panthema.net>
11  * Copyright (C) 2015 Lorenz Hübschle-Schneider <lorenz@4z2.de>
12  * Copyright (C) 2017 Nejmeddine Douma <nejmeddine.douma@gmail.com>
13  *
14  * All rights reserved. Published under the BSD-2 license in the LICENSE file.
15  ******************************************************************************/
16 
17 #pragma once
18 #ifndef THRILL_NET_COLLECTIVE_HEADER
19 #define THRILL_NET_COLLECTIVE_HEADER
20 
21 #include <thrill/common/functional.hpp>
22 #include <thrill/net/group.hpp>
23 #include <tlx/math/ffs.hpp>
24 #include <tlx/math/integer_log2.hpp>
25 #include <tlx/math/is_power_of_two.hpp>
26 #include <tlx/math/round_to_power_of_two.hpp>
27 
28 #include <functional>
29 
30 namespace thrill {
31 namespace net {
32 
33 //! \addtogroup net_layer
34 //! \{
35 
36 /******************************************************************************/
37 // Prefixsum Algorithms
38 
39 /*!
40  * Calculate for every worker his prefix sum.
41  *
42  * The prefix sum is the aggregation of the values of all workers with lesser
43  * index, including himself, according to a summation operator. The run-time is
44  * in O(log n).
45  *
46  * \param value The value to be summed up
47  * \param sum_op A custom summation operator
48  * \param initial Initial value of prefix sum
49  * \param inclusive Inclusive prefix sum if true (default)
50  */
51 template <typename T, typename BinarySumOp>
PrefixSumDoubling(T & value,BinarySumOp sum_op,const T & initial,bool inclusive)52 void Group::PrefixSumDoubling(T& value, BinarySumOp sum_op,
53                               const T& initial, bool inclusive) {
54     static constexpr bool debug = false;
55 
56     bool first = true;
57 
58     if (my_host_rank() == 0)
59         value = sum_op(initial, value);
60 
61     // Use a copy, in case of exclusive, we have to forward
62     // something that's not our result.
63     T to_forward = value;
64 
65     // This is based on the pointer-doubling algorithm presented in the ParAlg
66     // script, which is used for list ranking.
67     for (size_t d = 1; d < num_hosts(); d <<= 1) {
68 
69         if (my_host_rank() + d < num_hosts()) {
70             sLOG << "Host" << my_host_rank()
71                  << ": sending to" << my_host_rank() + d;
72             SendTo(my_host_rank() + d, to_forward);
73         }
74 
75         if (my_host_rank() >= d) {
76             T recv_value;
77             ReceiveFrom(my_host_rank() - d, &recv_value);
78             sLOG << "Host" << my_host_rank()
79                  << ": receiving from" << my_host_rank() - d;
80 
81             // Take care of order, so we don't break associativity.
82             to_forward = sum_op(recv_value, to_forward);
83 
84             if (!first || inclusive) {
85                 value = sum_op(recv_value, value);
86             }
87             else {
88                 value = recv_value;
89                 first = false;
90             }
91         }
92     }
93 
94     // set worker 0's value for exclusive prefixsums
95     if (!inclusive && my_host_rank() == 0)
96         value = initial;
97 }
98 
99 /*!
100  * \brief Calculate for every worker his prefix sum. Works only for worker
101  * numbers which are powers of two.
102  *
103  * \details The prefix sum is an aggregatation of the values of all workers with
104  * smaller index, including itself, according to an associative summation
105  * operator. This function currently only supports worker numbers which are
106  * powers of two.
107  *
108  * \param value The value to be summed up
109  *
110  * \param sum_op A custom summation operator
111  */
112 template <typename T, typename BinarySumOp>
PrefixSumHypercube(T & value,BinarySumOp sum_op)113 void Group::PrefixSumHypercube(T& value, BinarySumOp sum_op) {
114     T total_sum = value;
115 
116     static constexpr bool debug = false;
117 
118     for (size_t d = 1; d < num_hosts(); d <<= 1)
119     {
120         // communication peer for this round (hypercube dimension)
121         size_t peer = my_host_rank() ^ d;
122 
123         // Send total sum of this hypercube to worker with id = id XOR d
124         if (peer < num_hosts()) {
125             SendTo(peer, total_sum);
126             sLOG << "PREFIX_SUM: host" << my_host_rank()
127                  << ": sending to peer" << peer;
128         }
129 
130         // Receive total sum of smaller hypercube from worker with id = id XOR d
131         T recv_data;
132         if (peer < num_hosts()) {
133             ReceiveFrom(peer, &recv_data);
134             // The order of addition is important. The total sum of the smaller
135             // hypercube always comes first.
136             if (my_host_rank() & d)
137                 total_sum = sum_op(recv_data, total_sum);
138             else
139                 total_sum = sum_op(total_sum, recv_data);
140             // Variable 'value' represents the prefix sum of this worker
141             if (my_host_rank() & d)
142                 // The order of addition is respected the same way as above.
143                 value = sum_op(recv_data, value);
144             sLOG << "PREFIX_SUM: host" << my_host_rank()
145                  << ": received from peer" << peer;
146         }
147     }
148 
149     sLOG << "PREFIX_SUM: host" << my_host_rank() << ": done";
150 }
151 
152 //! select prefixsum implementation (often due to total number of processors)
153 template <typename T, typename BinarySumOp>
PrefixSumSelect(T & value,BinarySumOp sum_op,const T & initial,bool inclusive)154 void Group::PrefixSumSelect(T& value, BinarySumOp sum_op,
155                             const T& initial, bool inclusive) {
156     return PrefixSumDoubling(value, sum_op, initial, inclusive);
157 }
158 
159 template <typename T, typename BinarySumOp>
PrefixSum(T & value,BinarySumOp sum_op,const T & initial)160 void Group::PrefixSum(T& value, BinarySumOp sum_op, const T& initial) {
161     return PrefixSumSelect(value, sum_op, initial, true);
162 }
163 
164 template <typename T, typename BinarySumOp>
ExPrefixSum(T & value,BinarySumOp sum_op,const T & initial)165 void Group::ExPrefixSum(T& value, BinarySumOp sum_op, const T& initial) {
166     return PrefixSumSelect(value, sum_op, initial, false);
167 }
168 
169 /******************************************************************************/
170 // Broadcast Algorithms
171 
172 /*!
173  * Broadcasts the value of the peer with index 0 to all the others. This is a
174  * trivial broadcast from peer 0.
175  *
176  * \param value The value to be broadcast / receive into.
177  *
178  * \param origin The PE to broadcast value from.
179  */
180 template <typename T>
BroadcastTrivial(T & value,size_t origin)181 void Group::BroadcastTrivial(T& value, size_t origin) {
182 
183     if (my_host_rank() == origin) {
184         // send value to all peers
185         for (size_t p = 0; p < num_hosts(); ++p) {
186             if (p != origin)
187                 SendTo(p, value);
188         }
189     }
190     else {
191         // receive from origin
192         ReceiveFrom(origin, &value);
193     }
194 }
195 
196 /*!
197  * Broadcasts the value of the worker with index "origin" to all the
198  * others. This is a binomial tree broadcast method.
199  *
200  * \param value The value to be broadcast / receive into.
201  *
202  * \param origin The PE to broadcast value from.
203  */
204 template <typename T>
BroadcastBinomialTree(T & value,size_t origin)205 void Group::BroadcastBinomialTree(T& value, size_t origin) {
206     static constexpr bool debug = false;
207 
208     size_t num_hosts = this->num_hosts();
209     // calculate rank in cyclically shifted binomial tree
210     size_t my_rank = (my_host_rank() + num_hosts - origin) % num_hosts;
211     size_t r = 0, d = 1;
212     // receive from predecessor
213     if (my_rank > 0) {
214         // our predecessor is p with the lowest one bit flipped to zero. this
215         // also counts the number of rounds we have to send out messages later.
216         r = tlx::ffs(my_rank) - 1;
217         d <<= r;
218         size_t from = ((my_rank ^ d) + origin) % num_hosts;
219         sLOG << "Broadcast: rank" << my_rank << "receiving from" << from
220              << "in round" << r;
221         ReceiveFrom(from, &value);
222     }
223     else {
224         d = tlx::round_up_to_power_of_two(num_hosts);
225     }
226     // send to successors
227     for (d >>= 1; d > 0; d >>= 1, ++r) {
228         if (my_rank + d < num_hosts) {
229             size_t to = (my_rank + d + origin) % num_hosts;
230             sLOG << "Broadcast: rank" << my_rank << "round" << r << "sending to"
231                  << to;
232             SendTo(to, value);
233         }
234     }
235 }
236 
237 //! select broadcast implementation (often due to total number of processors)
238 template <typename T>
BroadcastSelect(T & value,size_t origin)239 void Group::BroadcastSelect(T& value, size_t origin) {
240     return BroadcastBinomialTree(value, origin);
241 }
242 
243 /*!
244  * Broadcasts the value of the worker with index 0 to all the others. This is a
245  * binomial tree broadcast method.
246  *
247  * \param value The value to be broadcast / receive into.
248  *
249  * \param origin The PE to broadcast value from.
250  */
251 template <typename T>
Broadcast(T & value,size_t origin)252 void Group::Broadcast(T& value, size_t origin) {
253     return BroadcastSelect(value, origin);
254 }
255 
256 /******************************************************************************/
257 // AllGather Algorithms
258 
259 template <typename T>
AllGatherRecursiveDoublingPowerOfTwo(T * values,size_t n)260 void Group::AllGatherRecursiveDoublingPowerOfTwo(T* values, size_t n) {
261     size_t num_hosts = this->num_hosts();
262     size_t my_rank = my_host_rank();
263     size_t d = tlx::integer_log2_ceil(num_hosts);
264 
265     for (size_t j = 0; j < d; ++j) {
266         size_t peer = my_rank ^ (0x1 << j);
267         // index of first element to be sent
268         size_t snd_pos = (~((0x1 << j) - 1) & my_rank) * n;
269         // index of first element to be received
270         size_t rcv_pos = (~((0x1 << j) - 1) & peer) * n;
271         // number of elements to be sent/received
272         size_t ins_n = (0x1 << j) * n;
273 
274         connection(peer).SendReceive(values + snd_pos, values + rcv_pos, ins_n);
275     }
276 }
277 
278 template <typename T>
AllGatherBruck(T * values,size_t n)279 void Group::AllGatherBruck(T* values, size_t n) {
280     size_t num_hosts = this->num_hosts();
281     size_t my_rank = my_host_rank();
282     size_t size = num_hosts * n;
283     std::vector<T> temp(size);
284 
285     for (size_t i = 0; i < n; ++i) {
286         temp[i] = values[i];
287     }
288 
289     for (size_t j = 0; (0x1U << j) < num_hosts; ++j) {
290         size_t snd_peer = (my_rank + num_hosts - (0x1 << j)) % num_hosts;
291         size_t rcv_peer = (my_rank + (0x1 << j)) % num_hosts;
292         // position for received data
293         size_t ins_pos = (0x1 << j) * n;
294         // number of elements to be sent/received
295         size_t ins_n = std::min((0x1 << j) * n, size - ins_pos);
296 
297         if ((0x1 << j) & my_rank) {
298             connection(rcv_peer).ReceiveN(temp.data() + ins_pos, ins_n);
299             connection(snd_peer).SendN(temp.data(), ins_n);
300         }
301         else {
302             connection(snd_peer).SendN(temp.data(), ins_n);
303             connection(rcv_peer).ReceiveN(temp.data() + ins_pos, ins_n);
304         }
305     }
306 
307     // local reorder: shift whole array by my_rank*n to the right
308     for (size_t i = 0; i < size; ++i) {
309         values[i] = temp[(i + size - my_rank * n) % size];
310     }
311 }
312 
313 /******************************************************************************/
314 // Reduce Algorithms
315 
316 /*!
317  * \brief Perform a reduction on all workers in a group.
318  *
319  * \details This function aggregates the values of all workers in the group
320  * according with a specified reduction operator. The result will be returned in
321  * the input variable at the root node.
322  *
323  * \param value The input value to be used in the reduction. Will be overwritten
324  * with the result (on the root) or arbitrary data (on other ranks).
325  *
326  * \param root The rank of the root
327  *
328  * \param sum_op A custom reduction operator (optional)
329  */
330 template <typename T, typename BinarySumOp>
Reduce(T & value,size_t root,BinarySumOp sum_op)331 void Group::Reduce(T& value, size_t root, BinarySumOp sum_op) {
332     static constexpr bool debug = false;
333     const size_t num_hosts = this->num_hosts();
334     const size_t my_rank = my_host_rank() + num_hosts;
335     const size_t shifted_rank = (my_rank - root) % num_hosts;
336     sLOG << my_host_rank() << "shifted_rank" << shifted_rank;
337 
338     for (size_t d = 1; d < num_hosts; d <<= 1) {
339         if (shifted_rank & d) {
340             sLOG << "Reduce" << my_host_rank()
341                  << "->" << (my_rank - d) % num_hosts << "/"
342                  << shifted_rank << "->" << shifted_rank - d;
343             SendTo((my_rank - d) % num_hosts, value);
344             break;
345         }
346         else if (shifted_rank + d < num_hosts) {
347             sLOG << "Reduce" << my_host_rank()
348                  << "<-" << (my_rank + d) % num_hosts << "/"
349                  << shifted_rank << "<-" << shifted_rank + d;
350             T recv_data;
351             ReceiveFrom((my_rank + d) % num_hosts, &recv_data);
352             value = sum_op(value, recv_data);
353         }
354     }
355 }
356 
357 /******************************************************************************/
358 // AllReduce Algorithms
359 
360 /*!
361  * Perform an All-Reduce on the workers. This is done by aggregating all values
362  * according to a summation operator and sending them backto all workers.
363  *
364  * \param value The value to be added to the aggregation
365  * \param sum_op A custom summation operator
366  */
367 template <typename T, typename BinarySumOp>
AllReduceSimple(T & value,BinarySumOp sum_op)368 void Group::AllReduceSimple(T& value, BinarySumOp sum_op) {
369     Reduce(value, 0, sum_op);
370     Broadcast(value, 0);
371 }
372 
373 /*!
374  * Broadcasts the value of the peer with index 0 to all the others. This is a
375  * trivial broadcast from peer 0.
376  *
377  * \param value The value to be broadcast / receive into.
378  *
379  * \param sum_op A custom summation operator
380  */
381 template <typename T, typename BinarySumOp>
AllReduceAtRoot(T & value,BinarySumOp sum_op)382 void Group::AllReduceAtRoot(T& value, BinarySumOp sum_op) {
383 
384     if (my_host_rank() == 0) {
385         // receive value from all peers
386         for (size_t p = 1; p < num_hosts(); ++p) {
387             T recv_value;
388             ReceiveFrom(p, &recv_value);
389             value = sum_op(value, recv_value);
390         }
391         // send reduced value back to all peers
392         for (size_t p = 1; p < num_hosts(); ++p) {
393             SendTo(p, value);
394         }
395     }
396     else {
397         // send to root host
398         SendTo(0, value);
399         // receive value back from root
400         ReceiveFrom(0, &value);
401     }
402 }
403 
404 /*!
405  * Perform an All-Reduce for powers of two. This is done with the Hypercube
406  * algorithm from the ParAlg script.
407  *
408  * \note This method is no longer used, but it is kept here for reference
409  *
410  * \param value The value to be added to the aggregation
411  * \param sum_op A custom summation operator
412  */
413 template <typename T, typename BinarySumOp>
AllReduceHypercube(T & value,BinarySumOp sum_op)414 void Group::AllReduceHypercube(T& value, BinarySumOp sum_op) {
415     // For each dimension of the hypercube, exchange data between workers with
416     // different bits at position d
417 
418     // static constexpr bool debug = false;
419 
420     for (size_t d = 1; d < num_hosts(); d <<= 1) {
421         // communication peer for this round (hypercube dimension)
422         size_t peer = my_host_rank() ^ d;
423 
424         // SendReceive value to worker with id id ^ d
425         if (peer < num_hosts()) {
426             // LOG << "ALL_REDUCE_HYPERCUBE: Host" << my_host_rank()
427             //     << ": Sending" << value << " to worker" << peer;
428 
429             // The order of addition is important. The total sum of the smaller
430             // hypercube always comes first.
431             T recv_data;
432             if (my_host_rank() & d) {
433                 connection(peer).SendReceive(&value, &recv_data);
434                 value = sum_op(recv_data, value);
435             }
436             else {
437                 connection(peer).ReceiveSend(value, &recv_data);
438                 value = sum_op(value, recv_data);
439             }
440 
441             // LOG << "ALL_REDUCE_HYPERCUBE: Host " << my_host_rank()
442             //     << ": Received " << recv_data
443             //     << " from worker " << peer << " value = " << value;
444         }
445     }
446 }
447 
448 /*!
449  * Perform an All-Reduce using the elimination protocol described in
450  * R. Rabenseifner and J. L. Traeff. "More Efficient Reduction Algorithms for
451  * Non-Power-of-Two Number of Processors in Message-Passing Parallel Systems."
452  * In Recent Advances in Parallel Virtual Machine and Message Passing Interface,
453  * 36–46. LNCS 3241. Springer, 2004.
454  *
455  * \param value The value to be added to the aggregation
456  * \param sum_op A custom summation operator
457  */
458 template <typename T, typename BinarySumOp>
AllReduceElimination(T & value,BinarySumOp sum_op)459 void Group::AllReduceElimination(T& value, BinarySumOp sum_op) {
460     AllReduceEliminationProcess(
461         my_host_rank(), 1, num_hosts(), 0, value, sum_op);
462 }
463 
464 template <typename T, typename BinarySumOp>
465 T Group::SendReceiveReduce(size_t peer, const T& value, BinarySumOp sum_op) {
466     T recv_data;
467     if (my_host_rank() > peer) {
468         connection(peer).SendReceive(&value, &recv_data);
469         return sum_op(recv_data, value);
470     }
471     else {
472         connection(peer).ReceiveSend(value, &recv_data);
473         return sum_op(value, recv_data);
474     }
475 }
476 
477 //! used for the recursive implementation of the elimination protocol
478 template <typename T, typename BinarySumOp>
AllReduceEliminationProcess(size_t host_id,size_t group_size,size_t remaining_hosts,size_t send_to,T & value,BinarySumOp sum_op)479 void Group::AllReduceEliminationProcess(
480     size_t host_id, size_t group_size, size_t remaining_hosts,
481     size_t send_to, T& value, BinarySumOp sum_op) {
482 
483     // static const bool debug = false;
484 
485     // send_to == 0 => no eliminated host waiting to receive from current host,
486     // host 0 is never eliminated
487 
488     size_t group_count = remaining_hosts / group_size;
489     if (group_count % 2 == 0) {
490         // only hypercube
491         size_t peer = host_id ^ group_size;
492         if (peer < remaining_hosts) {
493             value = SendReceiveReduce(peer, value, sum_op);
494         }
495     }
496     else {
497         // check if my rank is in 3-2 elimination zone
498         size_t host_group = host_id / group_size;
499         if (host_group >= group_count - 3) {
500             // take part in the 3-2 elimination
501             if (host_group == group_count - 1) {
502                 size_t peer = (host_id ^ group_size) - 2 * group_size;
503                 SendTo(peer, value);
504                 ReceiveFrom(peer, &value);
505             }
506             else if (host_group == group_count - 2) {
507                 size_t peer = (host_id ^ group_size) + 2 * group_size;
508 
509                 T recv_data;
510                 ReceiveFrom(peer, &recv_data);
511                 if (my_host_rank() > peer)
512                     value = sum_op(recv_data, value);
513                 else
514                     value = sum_op(value, recv_data);
515 
516                 // important for gathering
517                 send_to = peer;
518 
519                 peer = host_id ^ group_size;
520                 value = SendReceiveReduce(peer, value, sum_op);
521             }
522             else if (host_group == group_count - 3) {
523                 size_t peer = host_id ^ group_size;
524                 value = SendReceiveReduce(peer, value, sum_op);
525             }
526         }
527         else {
528             // no elimination, execute hypercube
529             size_t peer = host_id ^ group_size;
530             if (peer < remaining_hosts) {
531                 value = SendReceiveReduce(peer, value, sum_op);
532             }
533         }
534         remaining_hosts -= group_size;
535     }
536     group_size <<= 1;
537 
538     // recursion
539     if (group_size < remaining_hosts) {
540         AllReduceEliminationProcess(
541             host_id, group_size, remaining_hosts, send_to,
542             value, sum_op);
543     }
544     else if (send_to != 0) {
545         SendTo(send_to, value);
546     }
547 }
548 
549 //! select allreduce implementation (often due to total number of processors)
550 template <typename T, typename BinarySumOp>
AllReduceSelect(T & value,BinarySumOp sum_op)551 void Group::AllReduceSelect(T& value, BinarySumOp sum_op) {
552     // always use 3-2-elimination reduction method.
553     AllReduceElimination(value, sum_op);
554     /*if (tlx::is_power_of_two(num_hosts()))
555         AllReduceHypercube(value, sum_op);
556     else
557         AllReduceAtRoot(value, sum_op);*/
558 }
559 
560 /*!
561  * Perform an All-Reduce on the workers.  This is done by aggregating all values
562  * according to a summation operator and sending them backto all workers.
563  *
564  * \param   value The value to be added to the aggregation
565  * \param   sum_op A custom summation operator
566  */
567 template <typename T, typename BinarySumOp>
AllReduce(T & value,BinarySumOp sum_op)568 void Group::AllReduce(T& value, BinarySumOp sum_op) {
569     return AllReduceSelect(value, sum_op);
570 }
571 
572 //! \}
573 
574 } // namespace net
575 } // namespace thrill
576 
577 #endif // !THRILL_NET_COLLECTIVE_HEADER
578 
579 /******************************************************************************/
580