1 // Copyright (c) 2016 Hartmut Kaiser
2 //
3 // Distributed under the Boost Software License, Version 1.0. (See accompanying
4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5
6 #include <hpx/hpx.hpp>
7 #include <hpx/hpx_init.hpp>
8 #include <hpx/include/actions.hpp>
9 #include <hpx/include/iostreams.hpp>
10 #include <hpx/include/parcel_coalescing.hpp>
11 #include <hpx/include/performance_counters.hpp>
12 #include <hpx/util/lightweight_test.hpp>
13
14 #include <cstddef>
15 #include <iostream>
16 #include <string>
17 #include <utility>
18 #include <type_traits>
19 #include <vector>
20
21 ///////////////////////////////////////////////////////////////////////////////
22 std::size_t const vsize_default = 1024;
23 std::size_t const numparcels_default = 10;
24
25 ///////////////////////////////////////////////////////////////////////////////
26 template <typename Action, typename T>
27 hpx::parcelset::parcel
generate_parcel(hpx::id_type const & dest_id,hpx::id_type const & cont,T && data)28 generate_parcel(hpx::id_type const& dest_id, hpx::id_type const& cont, T && data)
29 {
30 hpx::naming::address addr;
31 hpx::naming::gid_type dest = dest_id.get_gid();
32 hpx::naming::detail::strip_credits_from_gid(dest);
33 hpx::parcelset::parcel p(hpx::parcelset::detail::create_parcel::call(
34 std::true_type(), std::move(dest), std::move(addr),
35 hpx::actions::typed_continuation<hpx::id_type>(cont),
36 Action(), hpx::threads::thread_priority_normal,
37 std::forward<T>(data)));
38
39 p.set_source_id(hpx::find_here());
40 p.size() = 4096;
41
42 return p;
43 }
44
45 ///////////////////////////////////////////////////////////////////////////////
46 struct test_server : hpx::components::component_base<test_server>
47 {
48 typedef hpx::components::component_base<test_server> base_type;
49
test1test_server50 hpx::id_type test1(std::vector<double> const& data)
51 {
52 return hpx::find_here();
53 }
54
55 HPX_DEFINE_COMPONENT_ACTION(test_server, test1, test1_action);
56 };
57
58 typedef hpx::components::component<test_server> server_type;
59 HPX_REGISTER_COMPONENT(server_type, test_server);
60
61 typedef test_server::test1_action test1_action;
62
63 HPX_REGISTER_ACTION_DECLARATION(test1_action);
64 HPX_ACTION_USES_MESSAGE_COALESCING(test1_action);
65 HPX_REGISTER_ACTION(test1_action);
66
67 ///////////////////////////////////////////////////////////////////////////////
test_plain_argument(hpx::id_type const & id)68 void test_plain_argument(hpx::id_type const& id)
69 {
70 std::vector<double> data(vsize_default);
71 std::generate(data.begin(), data.end(), std::rand);
72
73 std::vector<hpx::future<hpx::id_type> > results;
74 results.reserve(numparcels_default);
75
76 hpx::components::client<test_server> c = hpx::new_<test_server>(id);
77
78 // create parcels
79 std::vector<hpx::parcelset::parcel> parcels;
80 for (std::size_t i = 0; i != numparcels_default; ++i)
81 {
82 hpx::lcos::promise<hpx::id_type> p;
83 auto f = p.get_future();
84
85 parcels.push_back(
86 generate_parcel<test1_action>(c.get_id(), p.get_id(), data)
87 );
88
89 results.push_back(std::move(f));
90 }
91
92 // send parcels
93 hpx::get_runtime().get_parcel_handler().put_parcels(std::move(parcels));
94
95 // verify all messages got actually sent to the correct locality
96 hpx::wait_all(results);
97
98 for (hpx::future<hpx::id_type>& f : results)
99 {
100 HPX_TEST(f.get() == id);
101 }
102 }
103
104 ///////////////////////////////////////////////////////////////////////////////
test2(hpx::future<double> const & data)105 hpx::id_type test2(hpx::future<double> const& data)
106 {
107 return hpx::find_here();
108 }
109 HPX_DECLARE_PLAIN_ACTION(test2, test2_action);
110 HPX_ACTION_USES_MESSAGE_COALESCING(test2_action);
111 HPX_PLAIN_ACTION(test2, test2_action);
112
test_future_argument(hpx::id_type const & id)113 void test_future_argument(hpx::id_type const& id)
114 {
115 std::vector<hpx::lcos::local::promise<double> > args;
116 args.reserve(numparcels_default);
117
118 std::vector<hpx::future<hpx::id_type> > results;
119 results.reserve(numparcels_default);
120
121 // create parcels
122 std::vector<hpx::parcelset::parcel> parcels;
123 for (std::size_t i = 0; i != numparcels_default; ++i)
124 {
125 hpx::lcos::local::promise<double> p_arg;
126 hpx::lcos::promise<hpx::id_type> p_cont;
127 auto f_cont = p_cont.get_future();
128
129 parcels.push_back(
130 generate_parcel<test2_action>(id, p_cont.get_id(),
131 p_arg.get_future())
132 );
133
134 args.push_back(std::move(p_arg));
135 results.push_back(std::move(f_cont));
136 }
137
138 // send parcels
139 hpx::get_runtime().get_parcel_handler().put_parcels(std::move(parcels));
140
141 // now make the futures ready
142 for (hpx::lcos::local::promise<double>& arg : args)
143 {
144 arg.set_value(42.0);
145 }
146
147 // verify all messages got actually sent to the correct locality
148 hpx::wait_all(results);
149
150 for (hpx::future<hpx::id_type>& f : results)
151 {
152 HPX_TEST(f.get() == id);
153 }
154 }
155
test_mixed_arguments(hpx::id_type const & id)156 void test_mixed_arguments(hpx::id_type const& id)
157 {
158 std::vector<double> data(vsize_default);
159 std::generate(data.begin(), data.end(), std::rand);
160
161 std::vector<hpx::lcos::local::promise<double> > args;
162 args.reserve(numparcels_default);
163
164 std::vector<hpx::future<hpx::id_type> > results;
165 results.reserve(numparcels_default);
166
167 hpx::components::client<test_server> c = hpx::new_<test_server>(id);
168
169 // create parcels
170 std::vector<hpx::parcelset::parcel> parcels;
171 for (std::size_t i = 0; i != numparcels_default; ++i)
172 {
173 hpx::lcos::promise<hpx::id_type> p_cont;
174 auto f_cont = p_cont.get_future();
175
176 if (std::rand() % 2)
177 {
178 parcels.push_back(
179 generate_parcel<test1_action>(c.get_id(), p_cont.get_id(), data)
180 );
181 }
182 else
183 {
184 hpx::lcos::local::promise<double> p_arg;
185
186 parcels.push_back(
187 generate_parcel<test2_action>(id, p_cont.get_id(),
188 p_arg.get_future())
189 );
190
191 args.push_back(std::move(p_arg));
192 }
193
194 results.push_back(std::move(f_cont));
195 }
196
197 // send parcels
198 hpx::get_runtime().get_parcel_handler().put_parcels(std::move(parcels));
199
200 // now make the futures ready
201 for (hpx::lcos::local::promise<double>& arg : args)
202 {
203 arg.set_value(42.0);
204 }
205
206 // verify all messages got actually sent to the correct locality
207 hpx::wait_all(results);
208
209 for (hpx::future<hpx::id_type>& f : results)
210 {
211 HPX_TEST(f.get() == id);
212 }
213 }
214
215 ///////////////////////////////////////////////////////////////////////////////
print_counters(char const * name)216 void print_counters(char const* name)
217 {
218 using namespace hpx::performance_counters;
219
220 std::vector<performance_counter> counters = discover_counters(name);
221
222 for (performance_counter const& c : counters)
223 {
224 counter_value value = c.get_counter_value(hpx::launch::sync);
225 HPX_TEST_NEQ(value.get_value<double>(), 0.0);
226
227 hpx::cout
228 << "counter: " << c.get_name(hpx::launch::sync)
229 << ", value: " << value.get_value<double>()
230 << std::endl;
231 }
232 }
233
234 ///////////////////////////////////////////////////////////////////////////////
hpx_main(boost::program_options::variables_map & vm)235 int hpx_main(boost::program_options::variables_map& vm)
236 {
237 unsigned int seed = (unsigned int)std::time(nullptr);
238 if (vm.count("seed"))
239 seed = vm["seed"].as<unsigned int>();
240
241 std::cout << "using seed: " << seed << std::endl;
242 std::srand(seed);
243
244 for (hpx::id_type const& id : hpx::find_remote_localities())
245 {
246 test_plain_argument(id);
247 test_future_argument(id);
248 test_mixed_arguments(id);
249 }
250
251 // make sure coalescing was actually invoked
252 print_counters("/coalescing{locality#0/total}/count/parcels@test1_action");
253 print_counters("/coalescing{locality#0/total}/count/parcels@test2_action");
254 print_counters("/coalescing{locality#0/total}/count/messages@test1_action");
255 print_counters("/coalescing{locality#0/total}/count/messages@test2_action");
256
257 return hpx::finalize();
258 }
259
260 ///////////////////////////////////////////////////////////////////////////////
main(int argc,char * argv[])261 int main(int argc, char* argv[])
262 {
263 // add command line option which controls the random number generator seed
264 using namespace boost::program_options;
265 options_description desc_commandline(
266 "Usage: " HPX_APPLICATION_STRING " [options]");
267
268 desc_commandline.add_options()
269 ("seed,s", value<unsigned int>(),
270 "the random number generator seed to use for this run")
271 ;
272
273 // explicitly enable message handlers (parcel coalescing)
274 std::vector<std::string> const cfg = {
275 "hpx.parcel.message_handlers=1"
276 };
277
278 // Initialize and run HPX
279 HPX_TEST_EQ_MSG(hpx::init(desc_commandline, argc, argv, cfg), 0,
280 "HPX main exited with non-zero status");
281
282 return hpx::util::report_errors();
283 }
284