1 // Copyright (C) 2005-2006 The Trustees of Indiana University.
2 
3 // Use, modification and distribution is subject to the Boost Software
4 // License, Version 1.0. (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 
7 //  Authors: Douglas Gregor
8 //           Andrew Lumsdaine
9 #ifndef BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
10 #define BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
11 
12 #ifndef BOOST_GRAPH_USE_MPI
13 #error "Parallel BGL files should not be included unless <boost/graph/use_mpi.hpp> has been included"
14 #endif
15 
16 #include <boost/graph/parallel/process_group.hpp>
17 #include <boost/type_traits/is_convertible.hpp>
18 #include <vector>
19 #include <boost/assert.hpp>
20 #include <boost/optional.hpp>
21 #include <queue>
22 
23 namespace boost { namespace graph { namespace detail {
24 
25 template<typename ProcessGroup>
do_synchronize(ProcessGroup & pg)26 void do_synchronize(ProcessGroup& pg)
27 {
28   using boost::parallel::synchronize;
29   synchronize(pg);
30 }
31 
32 struct remote_set_queued {};
33 struct remote_set_immediate {};
34 
35 template<typename ProcessGroup>
36 class remote_set_semantics
37 {
38   BOOST_STATIC_CONSTANT
39     (bool,
40      queued = (is_convertible<
41                  typename ProcessGroup::communication_category,
42                  boost::parallel::bsp_process_group_tag>::value));
43 
44  public:
45   typedef typename mpl::if_c<queued,
46                              remote_set_queued,
47                              remote_set_immediate>::type type;
48 };
49 
50 
51 template<typename Derived, typename ProcessGroup, typename Value,
52          typename OwnerMap,
53          typename Semantics = typename remote_set_semantics<ProcessGroup>::type>
54 class remote_update_set;
55 
56 /**********************************************************************
57  * Remote updating set that queues messages until synchronization     *
58  **********************************************************************/
59 template<typename Derived, typename ProcessGroup, typename Value,
60          typename OwnerMap>
61 class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
62                         remote_set_queued>
63 {
64   typedef typename property_traits<OwnerMap>::key_type Key;
65   typedef std::vector<std::pair<Key, Value> > Updates;
66   typedef typename Updates::size_type   updates_size_type;
67   typedef typename Updates::value_type  updates_pair_type;
68 
69 public:
70 
71 private:
72   typedef typename ProcessGroup::process_id_type process_id_type;
73 
74   enum message_kind {
75     /** Message containing the number of updates that will be sent in
76      *  a msg_updates message that will immediately follow. This
77      *  message will contain a single value of type
78      *  updates_size_type.
79      */
80     msg_num_updates,
81 
82     /** Contains (key, value) pairs with all of the updates from a
83      *  particular source. The number of updates is variable, but will
84      *  be provided in a msg_num_updates message that immediately
85      *  preceeds this message.
86      *
87      */
88     msg_updates
89   };
90 
91   struct handle_messages
92   {
93     explicit
handle_messagesboost::graph::detail::remote_update_set::handle_messages94     handle_messages(remote_update_set* self, const ProcessGroup& pg)
95       : self(self), update_sizes(num_processes(pg), 0) { }
96 
operator ()boost::graph::detail::remote_update_set::handle_messages97     void operator()(process_id_type source, int tag)
98     {
99       switch(tag) {
100       case msg_num_updates:
101         {
102           // Receive the # of updates
103           updates_size_type num_updates;
104           receive(self->process_group, source, tag, num_updates);
105 
106           update_sizes[source] = num_updates;
107         }
108         break;
109 
110       case msg_updates:
111         {
112           updates_size_type num_updates = update_sizes[source];
113           BOOST_ASSERT(num_updates);
114 
115           // Receive the actual updates
116           std::vector<updates_pair_type> updates(num_updates);
117           receive(self->process_group, source, msg_updates, &updates[0],
118                   num_updates);
119 
120           // Send updates to derived "receive_update" member
121           Derived* derived = static_cast<Derived*>(self);
122           for (updates_size_type u = 0; u < num_updates; ++u)
123             derived->receive_update(source, updates[u].first, updates[u].second);
124 
125           update_sizes[source] = 0;
126         }
127         break;
128       };
129     }
130 
131   private:
132     remote_update_set* self;
133     std::vector<updates_size_type> update_sizes;
134   };
135   friend struct handle_messages;
136 
137  protected:
remote_update_set(const ProcessGroup & pg,const OwnerMap & owner)138   remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
139     : process_group(pg, handle_messages(this, pg)),
140       updates(num_processes(pg)), owner(owner) {
141     }
142 
143 
update(const Key & key,const Value & value)144   void update(const Key& key, const Value& value)
145   {
146     if (get(owner, key) == process_id(process_group)) {
147       Derived* derived = static_cast<Derived*>(this);
148       derived->receive_update(get(owner, key), key, value);
149     }
150     else {
151       updates[get(owner, key)].push_back(std::make_pair(key, value));
152     }
153   }
154 
collect()155   void collect() { }
156 
synchronize()157   void synchronize()
158   {
159     // Emit all updates and then remove them
160     process_id_type num_processes = updates.size();
161     for (process_id_type p = 0; p < num_processes; ++p) {
162       if (!updates[p].empty()) {
163         send(process_group, p, msg_num_updates, updates[p].size());
164         send(process_group, p, msg_updates,
165              &updates[p].front(), updates[p].size());
166         updates[p].clear();
167       }
168     }
169 
170     do_synchronize(process_group);
171   }
172 
173   ProcessGroup process_group;
174 
175  private:
176   std::vector<Updates> updates;
177   OwnerMap owner;
178 };
179 
180 /**********************************************************************
181  * Remote updating set that sends messages immediately                *
182  **********************************************************************/
183 template<typename Derived, typename ProcessGroup, typename Value,
184          typename OwnerMap>
185 class remote_update_set<Derived, ProcessGroup, Value, OwnerMap,
186                         remote_set_immediate>
187 {
188   typedef typename property_traits<OwnerMap>::key_type Key;
189   typedef std::pair<Key, Value> update_pair_type;
190   typedef typename std::vector<update_pair_type>::size_type updates_size_type;
191 
192 public:
193   typedef typename ProcessGroup::process_id_type process_id_type;
194 
195 private:
196   enum message_kind {
197     /** Contains a (key, value) pair that will be updated. */
198     msg_update
199   };
200 
201   struct handle_messages
202   {
handle_messagesboost::graph::detail::remote_update_set::handle_messages203     explicit handle_messages(remote_update_set* self, const ProcessGroup& pg)
204       : self(self)
205     { update_sizes.resize(num_processes(pg), 0); }
206 
operator ()boost::graph::detail::remote_update_set::handle_messages207     void operator()(process_id_type source, int tag)
208     {
209       // Receive the # of updates
210       BOOST_ASSERT(tag == msg_update);
211       update_pair_type update;
212       receive(self->process_group, source, tag, update);
213 
214       // Send update to derived "receive_update" member
215       Derived* derived = static_cast<Derived*>(self);
216       derived->receive_update(source, update.first, update.second);
217     }
218 
219   private:
220     std::vector<updates_size_type> update_sizes;
221     remote_update_set* self;
222   };
223   friend struct handle_messages;
224 
225  protected:
remote_update_set(const ProcessGroup & pg,const OwnerMap & owner)226   remote_update_set(const ProcessGroup& pg, const OwnerMap& owner)
227     : process_group(pg, handle_messages(this, pg)), owner(owner) { }
228 
update(const Key & key,const Value & value)229   void update(const Key& key, const Value& value)
230   {
231     if (get(owner, key) == process_id(process_group)) {
232       Derived* derived = static_cast<Derived*>(this);
233       derived->receive_update(get(owner, key), key, value);
234     }
235     else
236       send(process_group, get(owner, key), msg_update,
237            update_pair_type(key, value));
238   }
239 
collect()240   void collect()
241   {
242     typedef std::pair<process_id_type, int> probe_type;
243     handle_messages handler(this, process_group);
244     while (optional<probe_type> stp = probe(process_group))
245       if (stp->second == msg_update) handler(stp->first, stp->second);
246   }
247 
synchronize()248   void synchronize()
249   {
250     do_synchronize(process_group);
251   }
252 
253   ProcessGroup process_group;
254   OwnerMap owner;
255 };
256 
257 } } } // end namespace boost::graph::detail
258 
259 #endif // BOOST_GRAPH_DETAIL_REMOTE_UPDATE_SET_HPP
260