1 // Copyright (c) 2016 Hartmut Kaiser
2 //
3 // Distributed under the Boost Software License, Version 1.0. (See accompanying
4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5
6 // This example demonstrates the use of the hpx::lcos::local::receive_buffer
7 // facility It can be used to decouple time-step based operations between
8 // remote partitions of a spatially decomposed problem.
9
10 // Including 'hpx/hpx_main.hpp' instead of the usual 'hpx/hpx_init.hpp' enables
11 // to use the plain C-main below as the direct main HPX entry point.
12 #include <hpx/hpx_main.hpp>
13 #include <hpx/hpx.hpp>
14
15 #include <cstddef>
16 #include <deque>
17 #include <utility>
18
19 // This example assumes that the computational space is divided into two
20 // partitions. Here we place both partitions on the same locality, but they
21 // could be easily placed on different HPX localities without any changes to
22 // the code as well.
23 //
24 // The example rotates the data stored in both partitions by one position
25 // during each time step while making sure that the right-most value is
26 // transferred to the neighboring partition.
27 //
28 // Each partition is represented by a component type 'partition' which holds
29 // the data (here a simple std::deque<int>) and an instance of a
30 // receive_buffer for the neighboring partition. The two partitions in
31 // this example are connected in a circular fashion, thus both of them have
32 // one receive_buffer, always representing the data to be received from the
33 // 'left'.
34
35 char const* partition_basename = "/receive_buffer_example/partition/";
36
37 // The neighbor of partition '0' is partition '1', and v.v.
neighbor(std::size_t partition_num)38 std::size_t neighbor(std::size_t partition_num)
39 {
40 return partition_num == 0 ? 1 : 0;
41 }
42
43 ///////////////////////////////////////////////////////////////////////////////
44 struct partition_server : hpx::components::component_base<partition_server>
45 {
partition_serverpartition_server46 partition_server() {}
47
48 // Retrieve the neighboring partition
partition_serverpartition_server49 partition_server(std::size_t partition_num, std::size_t num_elements)
50 : data_(num_elements),
51 left_(hpx::find_from_basename(partition_basename, neighbor(partition_num)))
52 {
53 // fill with some random data
54 std::generate(data_.begin(), data_.end(), std::rand);
55 }
56
57 public:
58 // Action definitions
59
60 // Do all the work for 'nt' time steps on the local
61 hpx::future<void> do_work(std::size_t nt);
62 HPX_DEFINE_COMPONENT_ACTION(partition_server, do_work, do_work_action);
63
64 // Receive the data from the left partition. This will be called by the
65 // other partition, sending us its data.
from_rightpartition_server66 void from_right(std::size_t timestep, int data)
67 {
68 right_buffer_.store_received(timestep, std::move(data));
69 }
70 HPX_DEFINE_COMPONENT_ACTION(partition_server, from_right, from_right_action);
71
72 // Explicitly release dependencies to avoid circular dependencies in the
73 // reference counting chain.
release_dependenciespartition_server74 void release_dependencies()
75 {
76 left_.free();
77 }
78 HPX_DEFINE_COMPONENT_ACTION(partition_server, release_dependencies,
79 release_dependencies_action);
80
81 public:
82 // Other helper functions
83
84 // Helper function to send our boundary elements to the left neighbor.
send_leftpartition_server85 void send_left(std::size_t timestep, int data) const
86 {
87 hpx::apply(from_right_action(), left_, timestep, data);
88 }
89
90 // Helper function to receive the boundary element from the right neighbor.
receive_rightpartition_server91 hpx::future<int> receive_right(std::size_t timestep)
92 {
93 return right_buffer_.receive(timestep);
94 }
95
96 private:
97 // Data stored in this partition.
98 std::deque<int> data_;
99
100 // The id held by the future represents the neighboring partition (the one
101 // where the next element should be sent to).
102 hpx::components::client<partition_server> left_;
103
104 // The receive buffers represents one single int to be received from the
105 // corresponding neighbor.
106 hpx::lcos::local::receive_buffer<int> right_buffer_;
107 };
108
109 // The macros below are necessary to generate the code required for exposing
110 // our partition type remotely.
111 //
112 // HPX_REGISTER_COMPONENT() exposes the component creation through hpx::new_<>().
113 typedef hpx::components::component<partition_server> partition_server_type;
114 HPX_REGISTER_COMPONENT(partition_server_type, partition_server);
115
116 // HPX_REGISTER_ACTION() exposes the component member function for remote
117 // invocation.
118 typedef partition_server::from_right_action from_right_action;
119 HPX_REGISTER_ACTION(from_right_action);
120
121 typedef partition_server::do_work_action do_work_action;
122 HPX_REGISTER_ACTION(do_work_action);
123
124 typedef partition_server::release_dependencies_action release_dependencies_action;
125 HPX_REGISTER_ACTION(release_dependencies_action);
126
127 ///////////////////////////////////////////////////////////////////////////////
128 struct partition : hpx::components::client_base<partition, partition_server>
129 {
130 typedef hpx::components::client_base<partition, partition_server> base_type;
131
partitionpartition132 partition(hpx::id_type const& locality, std::size_t partition_num,
133 std::size_t num_elements)
134 : base_type(hpx::new_<partition_server>(
135 locality, partition_num, num_elements
136 )),
137 partition_num_(partition_num),
138 registered_name_(true)
139 {
140 // Register this partition with the runtime so that its neighbor can
141 // find it.
142 hpx::register_with_basename(partition_basename, *this, partition_num).get();
143 }
144
partitionpartition145 partition(hpx::future<hpx::id_type> && id)
146 : base_type(std::move(id)),
147 registered_name_(false)
148 {}
149
~partitionpartition150 ~partition()
151 {
152 if (!registered_name_)
153 return;
154
155 // break cyclic dependencies
156 hpx::future<void> f1 = hpx::async(release_dependencies_action(), *this);
157
158 // release the reference held by AGAS
159 hpx::future<void> f2 = hpx::unregister_with_basename(
160 partition_basename, partition_num_);
161
162 hpx::wait_all(f1, f2); // ignore exceptions
163 }
164
do_workpartition165 hpx::future<void> do_work(std::size_t nt)
166 {
167 return hpx::async(do_work_action(), *this, nt);
168 }
169
170 private:
171 std::size_t partition_num_;
172 bool registered_name_;
173 };
174
175 ///////////////////////////////////////////////////////////////////////////////
176 // This is the implementation of the time step loop
do_work(std::size_t nt)177 hpx::future<void> partition_server::do_work(std::size_t nt)
178 {
179 // send initial values to neighbors
180 if (nt != 0)
181 {
182 // send left-most element
183 send_left(0, data_[0]);
184
185 // rotate left by one element
186 std::rotate(data_.begin(), data_.begin() + 1, data_.end());
187 }
188
189 hpx::future<void> result = hpx::make_ready_future();
190 for (std::size_t t = 0; t != nt; ++t)
191 {
192 // Receive element from the right, replace last local element with the
193 // received value.
194 //
195 // Each timestep depends on a) the previous timestep and b) the
196 // received value for the current timestep.
197 result =
198 hpx::dataflow(
199 [this, t, nt](hpx::future<void> result, hpx::future<int> f)
200 {
201 result.get(); // propagate exceptions
202
203 // replace right-most element with received value
204 data_[data_.size()-1] = f.get();
205
206 // if not last time step, send left-most and rotate left
207 // by one element
208 if (t != nt - 1)
209 {
210 send_left(t + 1, data_[0]);
211 std::rotate(data_.begin(), data_.begin() + 1, data_.end());
212 }
213 },
214 result, receive_right(t));
215 }
216 return result;
217 }
218
219 ///////////////////////////////////////////////////////////////////////////////
main(int argc,char * argv[])220 int main(int argc, char* argv[])
221 {
222 // Initial conditions: f(0, i) = i
223 hpx::id_type here = hpx::find_here();
224
225 // create partitions and launch work
226 partition p0(here, 0, 1000);
227 hpx::future<void> f0 = p0.do_work(100);
228
229 partition p1(here, 1, 1000);
230 hpx::future<void> f1 = p1.do_work(100);
231
232 // wait for both partitions to be finished
233 hpx::wait_all(f0, f1);
234
235 return 0;
236 }
237
238