1 #ifndef DIY_REDUCE_HPP
2 #define DIY_REDUCE_HPP
3
4 #include <vector>
5 #include "master.hpp"
6 #include "assigner.hpp"
7 #include "detail/block_traits.hpp"
8 #include "log.hpp"
9
10 namespace diy
11 {
12 //! Enables communication within a group during a reduction.
13 //! DIY creates the ReduceProxy for you in diy::reduce()
14 //! and provides a reference to ReduceProxy each time the user's reduction function is called
15 struct ReduceProxy: public Master::Proxy
16 {
17 typedef std::vector<int> GIDVector;
18
ReduceProxydiy::ReduceProxy19 ReduceProxy(const Master::Proxy& proxy, //!< parent proxy
20 void* block, //!< diy block
21 unsigned round, //!< current round
22 const Assigner& assigner, //!< assigner
23 const GIDVector& incoming_gids, //!< incoming gids in this group
24 const GIDVector& outgoing_gids): //!< outgoing gids in this group
25 Master::Proxy(proxy),
26 block_(block),
27 round_(round),
28 assigner_(assigner)
29 {
30 // setup in_link
31 for (unsigned i = 0; i < incoming_gids.size(); ++i)
32 {
33 BlockID nbr;
34 nbr.gid = incoming_gids[i];
35 nbr.proc = assigner.rank(nbr.gid);
36 in_link_.add_neighbor(nbr);
37 }
38
39 // setup out_link
40 for (unsigned i = 0; i < outgoing_gids.size(); ++i)
41 {
42 BlockID nbr;
43 nbr.gid = outgoing_gids[i];
44 nbr.proc = assigner.rank(nbr.gid);
45 out_link_.add_neighbor(nbr);
46 }
47 }
48
ReduceProxydiy::ReduceProxy49 ReduceProxy(const Master::Proxy& proxy, //!< parent proxy
50 void* block, //!< diy block
51 unsigned round, //!< current round
52 const Assigner& assigner,
53 const Link& in_link,
54 const Link& out_link):
55 Master::Proxy(proxy),
56 block_(block),
57 round_(round),
58 assigner_(assigner),
59 in_link_(in_link),
60 out_link_(out_link)
61 {}
62
63 //! returns pointer to block
blockdiy::ReduceProxy64 void* block() const { return block_; }
65 //! returns current round number
rounddiy::ReduceProxy66 unsigned round() const { return round_; }
67 //! returns incoming link
in_linkdiy::ReduceProxy68 const Link& in_link() const { return in_link_; }
69 //! returns outgoing link
out_linkdiy::ReduceProxy70 const Link& out_link() const { return out_link_; }
71 //! returns total number of blocks
nblocksdiy::ReduceProxy72 int nblocks() const { return assigner_.nblocks(); }
73 //! returns the assigner
assignerdiy::ReduceProxy74 const Assigner& assigner() const { return assigner_; }
75
76 //! advanced: change current round number
set_rounddiy::ReduceProxy77 void set_round(unsigned r) { round_ = r; }
78
79 private:
80 void* block_;
81 unsigned round_;
82 const Assigner& assigner_;
83
84 Link in_link_;
85 Link out_link_;
86 };
87
88 namespace detail
89 {
90 template<class Block, class Partners>
91 struct ReductionFunctor;
92
93 template<class Partners, class Skip>
94 struct SkipInactiveOr;
95
96 struct ReduceNeverSkip
97 {
operator ()diy::detail::ReduceNeverSkip98 bool operator()(int, int, const Master&) const { return false; }
99 };
100 }
101
102 /**
103 * \ingroup Communication
104 * \brief Implementation of the reduce communication pattern (includes
105 * swap-reduce, merge-reduce, and any other global communication).
106 *
107 */
108 template<class Reduce, class Partners, class Skip>
reduce(Master & master,const Assigner & assigner,const Partners & partners,const Reduce & reduce,const Skip & skip)109 void reduce(Master& master, //!< master object
110 const Assigner& assigner, //!< assigner object
111 const Partners& partners, //!< partners object
112 const Reduce& reduce, //!< reduction callback function
113 const Skip& skip) //!< object determining whether a block should be skipped
114 {
115 auto log = get_logger();
116
117 int original_expected = master.expected();
118
119 using Block = typename detail::block_traits<Reduce>::type;
120
121 unsigned round;
122 for (round = 0; round < partners.rounds(); ++round)
123 {
124 log->debug("Round {}", round);
125 master.foreach(detail::ReductionFunctor<Block,Partners>(round, reduce, partners, assigner),
126 detail::SkipInactiveOr<Partners,Skip>(round, partners, skip));
127 master.execute();
128
129 int expected = 0;
130 for (unsigned i = 0; i < master.size(); ++i)
131 {
132 if (partners.active(round + 1, master.gid(i), master))
133 {
134 std::vector<int> incoming_gids;
135 partners.incoming(round + 1, master.gid(i), incoming_gids, master);
136 expected += static_cast<int>(incoming_gids.size());
137 master.incoming(master.gid(i)).clear();
138 }
139 }
140 master.set_expected(expected);
141 master.flush();
142 }
143 // final round
144 log->debug("Round {}", round);
145 master.foreach(detail::ReductionFunctor<Block,Partners>(round, reduce, partners, assigner),
146 detail::SkipInactiveOr<Partners,Skip>(round, partners, skip));
147
148 master.set_expected(original_expected);
149 }
150
151 /**
152 * \ingroup Communication
153 * \brief Implementation of the reduce communication pattern (includes
154 * swap-reduce, merge-reduce, and any other global communication).
155 *
156 */
157 template<class Reduce, class Partners>
reduce(Master & master,const Assigner & assigner,const Partners & partners,const Reduce & reducer)158 void reduce(Master& master, //!< master object
159 const Assigner& assigner, //!< assigner object
160 const Partners& partners, //!< partners object
161 const Reduce& reducer) //!< reduction callback function
162 {
163 reduce(master, assigner, partners, reducer, detail::ReduceNeverSkip());
164 }
165
166 namespace detail
167 {
168 template<class Block, class Partners>
169 struct ReductionFunctor
170 {
171 using Callback = std::function<void(Block*, const ReduceProxy&, const Partners&)>;
172
ReductionFunctordiy::detail::ReductionFunctor173 ReductionFunctor(unsigned round_, const Callback& reduce_, const Partners& partners_, const Assigner& assigner_):
174 round(round_), reduce(reduce_), partners(partners_), assigner(assigner_) {}
175
operator ()diy::detail::ReductionFunctor176 void operator()(Block* b, const Master::ProxyWithLink& cp) const
177 {
178 if (!partners.active(round, cp.gid(), *cp.master())) return;
179
180 std::vector<int> incoming_gids, outgoing_gids;
181 if (round > 0)
182 partners.incoming(round, cp.gid(), incoming_gids, *cp.master()); // receive from the previous round
183 if (round < partners.rounds())
184 partners.outgoing(round, cp.gid(), outgoing_gids, *cp.master()); // send to the next round
185
186 ReduceProxy rp(cp, b, round, assigner, incoming_gids, outgoing_gids);
187 reduce(b, rp, partners);
188
189 // touch the outgoing queues to make sure they exist
190 Master::OutgoingQueues& outgoing = *cp.outgoing();
191 if (outgoing.size() < (size_t) rp.out_link().size())
192 for (int j = 0; j < rp.out_link().size(); ++j)
193 outgoing[rp.out_link().target(j)]; // touch the outgoing queue, creating it if necessary
194 }
195
196 unsigned round;
197 Callback reduce;
198 Partners partners;
199 const Assigner& assigner;
200 };
201
202 template<class Partners, class Skip>
203 struct SkipInactiveOr
204 {
SkipInactiveOrdiy::detail::SkipInactiveOr205 SkipInactiveOr(int round_, const Partners& partners_, const Skip& skip_):
206 round(round_), partners(partners_), skip(skip_) {}
operator ()diy::detail::SkipInactiveOr207 bool operator()(int i, const Master& master) const { return !partners.active(round, master.gid(i), master) || skip(round, i, master); }
208 int round;
209 const Partners& partners;
210 Skip skip;
211 };
212 }
213
214 } // diy
215
216 #endif // DIY_REDUCE_HPP
217