1 //  Copyright (c) 2016 Hartmut Kaiser
2 //
3 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
4 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5 
6 // This example demonstrates the use of the hpx::lcos::local::receive_buffer
7 // facility It can be used to decouple time-step based operations between
8 // remote partitions of a spatially decomposed problem.
9 
10 // Including 'hpx/hpx_main.hpp' instead of the usual 'hpx/hpx_init.hpp' enables
11 // to use the plain C-main below as the direct main HPX entry point.
12 #include <hpx/hpx_main.hpp>
13 #include <hpx/hpx.hpp>
14 
15 #include <cstddef>
16 #include <deque>
17 #include <utility>
18 
19 // This example assumes that the computational space is divided into two
20 // partitions. Here we place both partitions on the same locality, but they
21 // could be easily placed on different HPX localities without any changes to
22 // the code as well.
23 //
24 // The example rotates the data stored in both partitions by one position
25 // during each time step while making sure that the right-most value is
26 // transferred to the neighboring partition.
27 //
28 // Each partition is represented by a component type 'partition' which holds
29 // the data (here a simple std::deque<int>) and an instance of a
30 // receive_buffer for the neighboring partition. The two partitions in
31 // this example are connected in a circular fashion, thus both of them have
32 // one receive_buffer, always representing the data to be received from the
33 // 'left'.
34 
35 char const* partition_basename = "/receive_buffer_example/partition/";
36 
37 // The neighbor of partition '0' is partition '1', and v.v.
neighbor(std::size_t partition_num)38 std::size_t neighbor(std::size_t partition_num)
39 {
40     return partition_num == 0 ? 1 : 0;
41 }
42 
43 ///////////////////////////////////////////////////////////////////////////////
44 struct partition_server : hpx::components::component_base<partition_server>
45 {
partition_serverpartition_server46     partition_server() {}
47 
48     // Retrieve the neighboring partition
partition_serverpartition_server49     partition_server(std::size_t partition_num, std::size_t num_elements)
50       : data_(num_elements),
51         left_(hpx::find_from_basename(partition_basename, neighbor(partition_num)))
52     {
53         // fill with some random data
54         std::generate(data_.begin(), data_.end(), std::rand);
55     }
56 
57 public:
58     // Action definitions
59 
60     // Do all the work for 'nt' time steps on the local
61     hpx::future<void> do_work(std::size_t nt);
62     HPX_DEFINE_COMPONENT_ACTION(partition_server, do_work, do_work_action);
63 
64     // Receive the data from the left partition. This will be called by the
65     // other partition, sending us its data.
from_rightpartition_server66     void from_right(std::size_t timestep, int data)
67     {
68         right_buffer_.store_received(timestep, std::move(data));
69     }
70     HPX_DEFINE_COMPONENT_ACTION(partition_server, from_right, from_right_action);
71 
72     // Explicitly release dependencies to avoid circular dependencies in the
73     // reference counting chain.
release_dependenciespartition_server74     void release_dependencies()
75     {
76         left_.free();
77     }
78     HPX_DEFINE_COMPONENT_ACTION(partition_server, release_dependencies,
79         release_dependencies_action);
80 
81 public:
82     // Other helper functions
83 
84     // Helper function to send our boundary elements to the left neighbor.
send_leftpartition_server85     void send_left(std::size_t timestep, int data) const
86     {
87         hpx::apply(from_right_action(), left_, timestep, data);
88     }
89 
90     // Helper function to receive the boundary element from the right neighbor.
receive_rightpartition_server91     hpx::future<int> receive_right(std::size_t timestep)
92     {
93         return right_buffer_.receive(timestep);
94     }
95 
96 private:
97     // Data stored in this partition.
98     std::deque<int> data_;
99 
100     // The id held by the future represents the neighboring partition (the one
101     // where the next element should be sent to).
102     hpx::components::client<partition_server> left_;
103 
104     // The receive buffers represents one single int to be received from the
105     // corresponding neighbor.
106     hpx::lcos::local::receive_buffer<int> right_buffer_;
107 };
108 
109 // The macros below are necessary to generate the code required for exposing
110 // our partition type remotely.
111 //
112 // HPX_REGISTER_COMPONENT() exposes the component creation through hpx::new_<>().
113 typedef hpx::components::component<partition_server> partition_server_type;
114 HPX_REGISTER_COMPONENT(partition_server_type, partition_server);
115 
116 // HPX_REGISTER_ACTION() exposes the component member function for remote
117 // invocation.
118 typedef partition_server::from_right_action from_right_action;
119 HPX_REGISTER_ACTION(from_right_action);
120 
121 typedef partition_server::do_work_action do_work_action;
122 HPX_REGISTER_ACTION(do_work_action);
123 
124 typedef partition_server::release_dependencies_action release_dependencies_action;
125 HPX_REGISTER_ACTION(release_dependencies_action);
126 
127 ///////////////////////////////////////////////////////////////////////////////
128 struct partition : hpx::components::client_base<partition, partition_server>
129 {
130     typedef hpx::components::client_base<partition, partition_server> base_type;
131 
partitionpartition132     partition(hpx::id_type const& locality, std::size_t partition_num,
133             std::size_t num_elements)
134       : base_type(hpx::new_<partition_server>(
135                 locality, partition_num, num_elements
136             )),
137         partition_num_(partition_num),
138         registered_name_(true)
139     {
140         // Register this partition with the runtime so that its neighbor can
141         // find it.
142         hpx::register_with_basename(partition_basename, *this, partition_num).get();
143     }
144 
partitionpartition145     partition(hpx::future<hpx::id_type> && id)
146       : base_type(std::move(id)),
147         registered_name_(false)
148     {}
149 
~partitionpartition150     ~partition()
151     {
152         if (!registered_name_)
153             return;
154 
155         // break cyclic dependencies
156         hpx::future<void> f1 = hpx::async(release_dependencies_action(), *this);
157 
158         // release the reference held by AGAS
159         hpx::future<void> f2 = hpx::unregister_with_basename(
160             partition_basename, partition_num_);
161 
162         hpx::wait_all(f1, f2);      // ignore exceptions
163     }
164 
do_workpartition165     hpx::future<void> do_work(std::size_t nt)
166     {
167         return hpx::async(do_work_action(), *this, nt);
168     }
169 
170 private:
171     std::size_t partition_num_;
172     bool registered_name_;
173 };
174 
175 ///////////////////////////////////////////////////////////////////////////////
176 // This is the implementation of the time step loop
do_work(std::size_t nt)177 hpx::future<void> partition_server::do_work(std::size_t nt)
178 {
179     // send initial values to neighbors
180     if (nt != 0)
181     {
182         // send left-most element
183         send_left(0, data_[0]);
184 
185         // rotate left by one element
186         std::rotate(data_.begin(), data_.begin() + 1, data_.end());
187     }
188 
189     hpx::future<void> result = hpx::make_ready_future();
190     for (std::size_t t = 0; t != nt; ++t)
191     {
192         // Receive element from the right, replace last local element with the
193         // received value.
194         //
195         // Each timestep depends on a) the previous timestep and b) the
196         // received value for the current timestep.
197         result =
198             hpx::dataflow(
199                 [this, t, nt](hpx::future<void> result, hpx::future<int> f)
200                 {
201                     result.get();       // propagate exceptions
202 
203                     // replace right-most element with received value
204                     data_[data_.size()-1] = f.get();
205 
206                     // if not last time step, send left-most and rotate left
207                     // by one element
208                     if (t != nt - 1)
209                     {
210                         send_left(t + 1, data_[0]);
211                         std::rotate(data_.begin(), data_.begin() + 1, data_.end());
212                     }
213                 },
214                 result, receive_right(t));
215     }
216     return result;
217 }
218 
219 ///////////////////////////////////////////////////////////////////////////////
main(int argc,char * argv[])220 int main(int argc, char* argv[])
221 {
222     // Initial conditions: f(0, i) = i
223     hpx::id_type here = hpx::find_here();
224 
225     // create partitions and launch work
226     partition p0(here, 0, 1000);
227     hpx::future<void> f0 = p0.do_work(100);
228 
229     partition p1(here, 1, 1000);
230     hpx::future<void> f1 = p1.do_work(100);
231 
232     // wait for both partitions to be finished
233     hpx::wait_all(f0, f1);
234 
235     return 0;
236 }
237 
238