1 // Copyright (c) 2007-2015 Hartmut Kaiser
2 //
3 // Distributed under the Boost Software License, Version 1.0. (See accompanying
4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5
6 #include <hpx/hpx_init.hpp>
7 #include <hpx/include/traits.hpp>
8 #include <hpx/include/lcos.hpp>
9 #include <hpx/include/components.hpp>
10 #include <hpx/include/async.hpp>
11 #include <hpx/util/lightweight_test.hpp>
12
13 #include <atomic>
14 #include <cstdint>
15 #include <utility>
16 #include <vector>
17
18 ///////////////////////////////////////////////////////////////////////////////
increment(std::int32_t i)19 std::int32_t increment(std::int32_t i)
20 {
21 return i + 1;
22 }
23 HPX_PLAIN_ACTION(increment);
24
increment_with_future(hpx::shared_future<std::int32_t> fi)25 std::int32_t increment_with_future(hpx::shared_future<std::int32_t> fi)
26 {
27 return fi.get() + 1;
28 }
29 HPX_PLAIN_ACTION(increment_with_future);
30
31 ///////////////////////////////////////////////////////////////////////////////
32 struct decrement_server
33 : hpx::components::managed_component_base<decrement_server>
34 {
calldecrement_server35 std::int32_t call(std::int32_t i) const
36 {
37 return i - 1;
38 }
39
40 HPX_DEFINE_COMPONENT_ACTION(decrement_server, call);
41 };
42
43 typedef hpx::components::managed_component<decrement_server> server_type;
44 HPX_REGISTER_COMPONENT(server_type, decrement_server);
45
46 typedef decrement_server::call_action call_action;
47 HPX_REGISTER_ACTION_DECLARATION(call_action);
48 HPX_REGISTER_ACTION(call_action);
49
50 ///////////////////////////////////////////////////////////////////////////////
51 struct test_server : hpx::components::simple_component_base<test_server>
52 {
53 };
54
55 typedef hpx::components::simple_component<test_server> test_server_type;
56 HPX_REGISTER_COMPONENT(test_server_type, test_server);
57
58 struct test_client : hpx::components::client_base<test_client, test_server>
59 {
60 typedef hpx::components::client_base<test_client, test_server> base_type;
61
test_clienttest_client62 test_client(hpx::id_type const& id)
63 : base_type(id)
64 {}
test_clienttest_client65 test_client(hpx::future<hpx::id_type> && id)
66 : base_type(std::move(id))
67 {}
68 };
69
70 ///////////////////////////////////////////////////////////////////////////////
71 std::atomic<int> callback_called(0);
72
cb(boost::system::error_code const & ec,hpx::parcelset::parcel const & p)73 void cb(boost::system::error_code const& ec,
74 hpx::parcelset::parcel const& p)
75 {
76 ++callback_called;
77 }
78
79 ///////////////////////////////////////////////////////////////////////////////
test_remote_async_cb_colocated(test_client const & target)80 void test_remote_async_cb_colocated(test_client const& target)
81 {
82 {
83 increment_action inc;
84
85 callback_called.store(0);
86 hpx::future<std::int32_t> f1 =
87 hpx::async_cb(inc, hpx::colocated(target), &cb, 42);
88 HPX_TEST_EQ(f1.get(), 43);
89 HPX_TEST_EQ(callback_called.load(), 1);
90
91 callback_called.store(0);
92 hpx::future<std::int32_t> f2 =
93 hpx::async_cb(hpx::launch::all, inc, hpx::colocated(target), &cb, 42);
94 HPX_TEST_EQ(f2.get(), 43);
95 HPX_TEST_EQ(callback_called.load(), 1);
96 }
97
98 {
99 increment_with_future_action inc;
100
101 hpx::promise<std::int32_t> p;
102 hpx::shared_future<std::int32_t> f = p.get_future();
103
104 callback_called.store(0);
105 hpx::future<std::int32_t> f1 =
106 hpx::async_cb(inc, hpx::colocated(target), &cb, f);
107 hpx::future<std::int32_t> f2 =
108 hpx::async_cb(hpx::launch::all, inc, hpx::colocated(target), &cb, f);
109
110 p.set_value(42);
111 HPX_TEST_EQ(f1.get(), 43);
112 HPX_TEST_EQ(f2.get(), 43);
113 HPX_TEST_EQ(callback_called.load(), 2);
114 }
115
116 {
117 callback_called.store(0);
118 hpx::future<std::int32_t> f1 =
119 hpx::async_cb<increment_action>(hpx::colocated(target), &cb, 42);
120 HPX_TEST_EQ(f1.get(), 43);
121 HPX_TEST_EQ(callback_called.load(), 1);
122
123 callback_called.store(0);
124 hpx::future<std::int32_t> f2 = hpx::async_cb<increment_action>(
125 hpx::launch::all, hpx::colocated(target), &cb, 42);
126 HPX_TEST_EQ(f2.get(), 43);
127 HPX_TEST_EQ(callback_called.load(), 1);
128 }
129
130 {
131 hpx::future<hpx::id_type> dec_f =
132 hpx::components::new_<decrement_server>(hpx::colocated(target));
133 hpx::id_type dec = dec_f.get();
134
135 call_action call;
136
137 callback_called.store(0);
138 hpx::future<std::int32_t> f1 = hpx::async_cb(call, dec, &cb, 42);
139 HPX_TEST_EQ(f1.get(), 41);
140 HPX_TEST_EQ(callback_called.load(), 1);
141
142 callback_called.store(0);
143 hpx::future<std::int32_t> f2 =
144 hpx::async_cb(hpx::launch::all, call, dec, &cb, 42);
145 HPX_TEST_EQ(f2.get(), 41);
146 HPX_TEST_EQ(callback_called.load(), 1);
147 }
148
149 {
150 hpx::future<hpx::id_type> dec_f =
151 hpx::components::new_<decrement_server>(hpx::colocated(target));
152 hpx::id_type dec = dec_f.get();
153
154 callback_called.store(0);
155 hpx::future<std::int32_t> f1 =
156 hpx::async_cb<call_action>(dec, &cb, 42);
157 HPX_TEST_EQ(f1.get(), 41);
158 HPX_TEST_EQ(callback_called.load(), 1);
159
160 callback_called.store(0);
161 hpx::future<std::int32_t> f2 =
162 hpx::async_cb<call_action>(hpx::launch::all, dec, &cb, 42);
163 HPX_TEST_EQ(f2.get(), 41);
164 HPX_TEST_EQ(callback_called.load(), 1);
165 }
166
167 {
168 increment_with_future_action inc;
169 hpx::shared_future<std::int32_t> f =
170 hpx::async(hpx::launch::deferred, hpx::util::bind(&increment, 42));
171
172 callback_called.store(0);
173 hpx::future<std::int32_t> f1 = hpx::async_cb(
174 inc, hpx::colocated(target), &cb, f);
175 hpx::future<std::int32_t> f2 = hpx::async_cb(
176 hpx::launch::all, inc, hpx::colocated(target), &cb, f);
177
178 HPX_TEST_EQ(f1.get(), 44);
179 HPX_TEST_EQ(f2.get(), 44);
180 HPX_TEST_EQ(callback_called.load(), 2);
181 }
182 }
183
hpx_main()184 int hpx_main()
185 {
186 std::vector<hpx::id_type> localities = hpx::find_all_localities();
187 for (hpx::id_type const& id : localities)
188 {
189 test_client client(hpx::new_<test_client>(id));
190 test_remote_async_cb_colocated(client);
191 }
192 return hpx::finalize();
193 }
194
main(int argc,char * argv[])195 int main(int argc, char* argv[])
196 {
197 // Initialize and run HPX
198 HPX_TEST_EQ_MSG(hpx::init(argc, argv), 0,
199 "HPX main exited with non-zero status");
200
201 return hpx::util::report_errors();
202 }
203
204