1 #ifndef DIY_REDUCE_HPP
2 #define DIY_REDUCE_HPP
3 
4 #include <vector>
5 #include "master.hpp"
6 #include "assigner.hpp"
7 #include "detail/block_traits.hpp"
8 #include "log.hpp"
9 
10 namespace diy
11 {
12 //! Enables communication within a group during a reduction.
13 //! DIY creates the ReduceProxy for you in diy::reduce()
14 //! and provides a reference to ReduceProxy each time the user's reduction function is called
15 struct ReduceProxy: public Master::Proxy
16 {
17     typedef     std::vector<int>                            GIDVector;
18 
ReduceProxydiy::ReduceProxy19     ReduceProxy(const Master::Proxy&    proxy, //!< parent proxy
20                 void*                   block, //!< diy block
21                 unsigned                round, //!< current round
22                 const Assigner&         assigner, //!< assigner
23                 const GIDVector&        incoming_gids, //!< incoming gids in this group
24                 const GIDVector&        outgoing_gids): //!< outgoing gids in this group
25       Master::Proxy(proxy),
26       block_(block),
27       round_(round),
28       assigner_(assigner)
29     {
30       // setup in_link
31       for (unsigned i = 0; i < incoming_gids.size(); ++i)
32       {
33         BlockID nbr;
34         nbr.gid  = incoming_gids[i];
35         nbr.proc = assigner.rank(nbr.gid);
36         in_link_.add_neighbor(nbr);
37       }
38 
39       // setup out_link
40       for (unsigned i = 0; i < outgoing_gids.size(); ++i)
41       {
42         BlockID nbr;
43         nbr.gid  = outgoing_gids[i];
44         nbr.proc = assigner.rank(nbr.gid);
45         out_link_.add_neighbor(nbr);
46       }
47     }
48 
ReduceProxydiy::ReduceProxy49     ReduceProxy(const Master::Proxy&    proxy, //!< parent proxy
50                 void*                   block, //!< diy block
51                 unsigned                round, //!< current round
52                 const Assigner&         assigner,
53                 const Link&             in_link,
54                 const Link&             out_link):
55       Master::Proxy(proxy),
56       block_(block),
57       round_(round),
58       assigner_(assigner),
59       in_link_(in_link),
60       out_link_(out_link)
61     {}
62 
63     //! returns pointer to block
blockdiy::ReduceProxy64     void*         block() const                           { return block_; }
65     //! returns current round number
rounddiy::ReduceProxy66     unsigned      round() const                           { return round_; }
67     //! returns incoming link
in_linkdiy::ReduceProxy68     const Link&   in_link() const                         { return in_link_; }
69     //! returns outgoing link
out_linkdiy::ReduceProxy70     const Link&   out_link() const                        { return out_link_; }
71     //! returns total number of blocks
nblocksdiy::ReduceProxy72     int           nblocks() const                         { return assigner_.nblocks(); }
73     //! returns the assigner
assignerdiy::ReduceProxy74     const Assigner& assigner() const                      { return assigner_; }
75 
76     //! advanced: change current round number
set_rounddiy::ReduceProxy77     void          set_round(unsigned r)                   { round_ = r; }
78 
79   private:
80     void*         block_;
81     unsigned      round_;
82     const Assigner& assigner_;
83 
84     Link          in_link_;
85     Link          out_link_;
86 };
87 
88 namespace detail
89 {
90   template<class Block, class Partners>
91   struct ReductionFunctor;
92 
93   template<class Partners, class Skip>
94   struct SkipInactiveOr;
95 
96   struct ReduceNeverSkip
97   {
operator ()diy::detail::ReduceNeverSkip98     bool operator()(int, int, const Master&) const  { return false; }
99   };
100 }
101 
102 /**
103  * \ingroup Communication
104  * \brief Implementation of the reduce communication pattern (includes
105  *        swap-reduce, merge-reduce, and any other global communication).
106  *
107  */
108 template<class Reduce, class Partners, class Skip>
reduce(Master & master,const Assigner & assigner,const Partners & partners,const Reduce & reduce,const Skip & skip)109 void reduce(Master&                    master,        //!< master object
110             const Assigner&            assigner,      //!< assigner object
111             const Partners&            partners,      //!< partners object
112             const Reduce&              reduce,        //!< reduction callback function
113             const Skip&                skip)          //!< object determining whether a block should be skipped
114 {
115   auto log = get_logger();
116 
117   int original_expected = master.expected();
118 
119   using Block = typename detail::block_traits<Reduce>::type;
120 
121   unsigned round;
122   for (round = 0; round < partners.rounds(); ++round)
123   {
124     log->debug("Round {}", round);
125     master.foreach(detail::ReductionFunctor<Block,Partners>(round, reduce, partners, assigner),
126                    detail::SkipInactiveOr<Partners,Skip>(round, partners, skip));
127     master.execute();
128 
129     int expected = 0;
130     for (unsigned i = 0; i < master.size(); ++i)
131     {
132       if (partners.active(round + 1, master.gid(i), master))
133       {
134         std::vector<int> incoming_gids;
135         partners.incoming(round + 1, master.gid(i), incoming_gids, master);
136         expected += static_cast<int>(incoming_gids.size());
137         master.incoming(master.gid(i)).clear();
138       }
139     }
140     master.set_expected(expected);
141     master.flush();
142   }
143   // final round
144   log->debug("Round {}", round);
145   master.foreach(detail::ReductionFunctor<Block,Partners>(round, reduce, partners, assigner),
146                  detail::SkipInactiveOr<Partners,Skip>(round, partners, skip));
147 
148   master.set_expected(original_expected);
149 }
150 
151 /**
152  * \ingroup Communication
153  * \brief Implementation of the reduce communication pattern (includes
154  *        swap-reduce, merge-reduce, and any other global communication).
155  *
156  */
157 template<class Reduce, class Partners>
reduce(Master & master,const Assigner & assigner,const Partners & partners,const Reduce & reducer)158 void reduce(Master&                    master,        //!< master object
159             const Assigner&            assigner,      //!< assigner object
160             const Partners&            partners,      //!< partners object
161             const Reduce&              reducer)       //!< reduction callback function
162 {
163   reduce(master, assigner, partners, reducer, detail::ReduceNeverSkip());
164 }
165 
166 namespace detail
167 {
168   template<class Block, class Partners>
169   struct ReductionFunctor
170   {
171     using Callback = std::function<void(Block*, const ReduceProxy&, const Partners&)>;
172 
ReductionFunctordiy::detail::ReductionFunctor173                 ReductionFunctor(unsigned round_, const Callback& reduce_, const Partners& partners_, const Assigner& assigner_):
174                     round(round_), reduce(reduce_), partners(partners_), assigner(assigner_)        {}
175 
operator ()diy::detail::ReductionFunctor176     void        operator()(Block* b, const Master::ProxyWithLink& cp) const
177     {
178       if (!partners.active(round, cp.gid(), *cp.master())) return;
179 
180       std::vector<int> incoming_gids, outgoing_gids;
181       if (round > 0)
182           partners.incoming(round, cp.gid(), incoming_gids, *cp.master());        // receive from the previous round
183       if (round < partners.rounds())
184           partners.outgoing(round, cp.gid(), outgoing_gids, *cp.master());        // send to the next round
185 
186       ReduceProxy   rp(cp, b, round, assigner, incoming_gids, outgoing_gids);
187       reduce(b, rp, partners);
188 
189       // touch the outgoing queues to make sure they exist
190       Master::OutgoingQueues& outgoing = *cp.outgoing();
191       if (outgoing.size() < (size_t) rp.out_link().size())
192         for (int j = 0; j < rp.out_link().size(); ++j)
193           outgoing[rp.out_link().target(j)];       // touch the outgoing queue, creating it if necessary
194     }
195 
196     unsigned        round;
197     Callback        reduce;
198     Partners        partners;
199     const Assigner& assigner;
200   };
201 
202   template<class Partners, class Skip>
203   struct SkipInactiveOr
204   {
SkipInactiveOrdiy::detail::SkipInactiveOr205                     SkipInactiveOr(int round_, const Partners& partners_, const Skip& skip_):
206                         round(round_), partners(partners_), skip(skip_)         {}
operator ()diy::detail::SkipInactiveOr207     bool            operator()(int i, const Master& master) const               { return !partners.active(round, master.gid(i), master) || skip(round, i, master); }
208     int             round;
209     const Partners& partners;
210     Skip            skip;
211   };
212 }
213 
214 } // diy
215 
216 #endif // DIY_REDUCE_HPP
217