1 // Copyright (c) 2016 Hartmut Kaiser
2 //
3 // Distributed under the Boost Software License, Version 1.0. (See accompanying
4 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
5
6 #include <hpx/hpx_main.hpp>
7 #include <hpx/include/apply.hpp>
8 #include <hpx/include/lcos.hpp>
9 #include <hpx/util/lightweight_test.hpp>
10
11 #include <atomic>
12 #include <numeric>
13 #include <string>
14 #include <vector>
15
16 ///////////////////////////////////////////////////////////////////////////////
sum(std::vector<int> const & s,hpx::lcos::local::channel<int> c)17 void sum(std::vector<int> const& s, hpx::lcos::local::channel<int> c)
18 {
19 c.set(std::accumulate(s.begin(), s.end(), 0)); // send sum to channel
20 }
21
calculate_sum()22 void calculate_sum()
23 {
24 std::vector<int> s = { 7, 2, 8, -9, 4, 0 };
25 hpx::lcos::local::channel<int> c;
26
27 hpx::apply(&sum, std::vector<int>(s.begin(), s.begin() + s.size()/2), c);
28 hpx::apply(&sum, std::vector<int>(s.begin() + s.size()/2, s.end()), c);
29
30 int x = c.get(hpx::launch::sync); // receive from c
31 int y = c.get(hpx::launch::sync);
32
33 int expected = std::accumulate(s.begin(), s.end(), 0);
34 HPX_TEST_EQ(expected, x + y);
35 }
36
37 ///////////////////////////////////////////////////////////////////////////////
ping(hpx::lcos::local::send_channel<std::string> pings,std::string const & msg)38 void ping(
39 hpx::lcos::local::send_channel<std::string> pings,
40 std::string const& msg)
41 {
42 pings.set(msg);
43 }
44
pong(hpx::lcos::local::receive_channel<std::string> pings,hpx::lcos::local::send_channel<std::string> pongs)45 void pong(
46 hpx::lcos::local::receive_channel<std::string> pings,
47 hpx::lcos::local::send_channel<std::string> pongs)
48 {
49 std::string msg = pings.get(hpx::launch::sync);
50 pongs.set(msg);
51 }
52
pingpong()53 void pingpong()
54 {
55 hpx::lcos::local::channel<std::string> pings;
56 hpx::lcos::local::channel<std::string> pongs;
57
58 ping(pings, "passed message");
59 pong(pings, pongs);
60
61 std::string result = pongs.get(hpx::launch::sync);
62 HPX_TEST_EQ(std::string("passed message"), result);
63 }
64
pingpong1()65 void pingpong1()
66 {
67 hpx::lcos::local::one_element_channel<std::string> pings;
68 hpx::lcos::local::one_element_channel<std::string> pongs;
69
70 for (int i = 0; i != 10; ++i)
71 {
72 ping(pings, "passed message");
73 pong(pings, pongs);
74
75 std::string result = pongs.get(hpx::launch::sync);
76 HPX_TEST_EQ(std::string("passed message"), result);
77 }
78 }
79
80 ///////////////////////////////////////////////////////////////////////////////
ping_void(hpx::lcos::local::send_channel<> pings)81 void ping_void(hpx::lcos::local::send_channel<> pings)
82 {
83 pings.set();
84 }
85
pong_void(hpx::lcos::local::receive_channel<> pings,hpx::lcos::local::send_channel<> pongs,bool & pingponged)86 void pong_void(
87 hpx::lcos::local::receive_channel<> pings,
88 hpx::lcos::local::send_channel<> pongs,
89 bool& pingponged)
90 {
91 pings.get(hpx::launch::sync);
92 pongs.set();
93
94 HPX_TEST(!pingponged);
95 pingponged = true;
96 }
97
pingpong_void()98 void pingpong_void()
99 {
100 hpx::lcos::local::channel<> pings;
101 hpx::lcos::local::channel<> pongs;
102
103 bool pingponged = false;
104
105 ping_void(pings);
106 pong_void(pings, pongs, pingponged);
107
108 pongs.get(hpx::launch::sync);
109 HPX_TEST(pingponged);
110 }
111
pingpong_void1()112 void pingpong_void1()
113 {
114 hpx::lcos::local::one_element_channel<> pings;
115 hpx::lcos::local::one_element_channel<> pongs;
116
117 for (int i = 0; i != 10; ++i)
118 {
119 bool pingponged = false;
120
121 ping_void(pings);
122 pong_void(pings, pongs, pingponged);
123
124 pongs.get(hpx::launch::sync);
125 HPX_TEST(pingponged);
126 }
127 }
128
129 ///////////////////////////////////////////////////////////////////////////////
dispatch_work()130 void dispatch_work()
131 {
132 hpx::lcos::local::channel<int> jobs;
133 hpx::lcos::local::channel<> done;
134
135 std::atomic<int> received_jobs(0);
136 std::atomic<bool> was_closed(false);
137
138 hpx::apply(
139 [jobs, done, &received_jobs, &was_closed]() mutable
140 {
141 while(true)
142 {
143 hpx::error_code ec(hpx::lightweight);
144 int next = jobs.get(hpx::launch::sync, ec);
145 (void)next;
146 if (!ec)
147 {
148 ++received_jobs;
149 }
150 else
151 {
152 was_closed = true;
153 done.set();
154 break;
155 }
156 }
157 });
158
159 for (int j = 1; j <= 3; ++j)
160 {
161 jobs.set(j);
162 }
163
164 jobs.close();
165 done.get(hpx::launch::sync);
166
167 HPX_TEST_EQ(received_jobs.load(), 3);
168 HPX_TEST(was_closed.load());
169 }
170
171 ///////////////////////////////////////////////////////////////////////////////
channel_range()172 void channel_range()
173 {
174 std::atomic<int> received_elements(0);
175
176 hpx::lcos::local::channel<std::string> queue;
177 queue.set("one");
178 queue.set("two");
179 queue.set("three");
180 queue.close();
181
182 for (auto const& elem : queue)
183 {
184 (void)elem;
185 ++received_elements;
186 }
187
188 HPX_TEST_EQ(received_elements.load(), 3);
189 }
190
channel_range_void()191 void channel_range_void()
192 {
193 std::atomic<int> received_elements(0);
194
195 hpx::lcos::local::channel<> queue;
196 queue.set();
197 queue.set();
198 queue.set();
199 queue.close();
200
201 for (auto const& elem : queue)
202 {
203 (void)elem;
204 ++received_elements;
205 }
206
207 HPX_TEST_EQ(received_elements.load(), 3);
208 }
209
210 ///////////////////////////////////////////////////////////////////////////////
deadlock_test()211 void deadlock_test()
212 {
213 bool caught_exception = false;
214 try {
215 hpx::lcos::local::channel<int> c;
216 int value = c.get(hpx::launch::sync);
217 HPX_TEST(false);
218 (void)value;
219 }
220 catch(hpx::exception const&) {
221 caught_exception = true;
222 }
223 HPX_TEST(caught_exception);
224 }
225
closed_channel_get()226 void closed_channel_get()
227 {
228 bool caught_exception = false;
229 try {
230 hpx::lcos::local::channel<int> c;
231 c.close();
232
233 int value = c.get(hpx::launch::sync);
234 HPX_TEST(false);
235 (void)value;
236 }
237 catch(hpx::exception const&) {
238 caught_exception = true;
239 }
240 HPX_TEST(caught_exception);
241 }
242
closed_channel_get_generation()243 void closed_channel_get_generation()
244 {
245 bool caught_exception = false;
246 try {
247 hpx::lcos::local::channel<int> c;
248 c.set(42, 122); // setting value for generation 122
249 c.close();
250
251 HPX_TEST_EQ(c.get(hpx::launch::sync, 122), 42);
252
253 int value = c.get(hpx::launch::sync, 123); // asking for generation 123
254 HPX_TEST(false);
255 (void)value;
256 }
257 catch(hpx::exception const&) {
258 caught_exception = true;
259 }
260 HPX_TEST(caught_exception);
261 }
262
closed_channel_set()263 void closed_channel_set()
264 {
265 bool caught_exception = false;
266 try {
267 hpx::lcos::local::channel<int> c;
268 c.close();
269
270 c.set(42);
271 HPX_TEST(false);
272 }
273 catch(hpx::exception const&) {
274 caught_exception = true;
275 }
276 HPX_TEST(caught_exception);
277 }
278
279 ///////////////////////////////////////////////////////////////////////////////
deadlock_test1()280 void deadlock_test1()
281 {
282 bool caught_exception = false;
283 try {
284 hpx::lcos::local::one_element_channel<int> c;
285 int value = c.get(hpx::launch::sync);
286 HPX_TEST(false);
287 (void)value;
288 }
289 catch(hpx::exception const&) {
290 caught_exception = true;
291 }
292 HPX_TEST(caught_exception);
293 }
294
closed_channel_get1()295 void closed_channel_get1()
296 {
297 bool caught_exception = false;
298 try {
299 hpx::lcos::local::one_element_channel<int> c;
300 c.close();
301
302 int value = c.get(hpx::launch::sync);
303 HPX_TEST(false);
304 (void)value;
305 }
306 catch(hpx::exception const&) {
307 caught_exception = true;
308 }
309 HPX_TEST(caught_exception);
310 }
311
closed_channel_set1()312 void closed_channel_set1()
313 {
314 bool caught_exception = false;
315 try {
316 hpx::lcos::local::one_element_channel<int> c;
317 c.close();
318
319 c.set(42);
320 HPX_TEST(false);
321 }
322 catch(hpx::exception const&) {
323 caught_exception = true;
324 }
325 HPX_TEST(caught_exception);
326 }
327
328 ///////////////////////////////////////////////////////////////////////////////
main(int argc,char * argv[])329 int main(int argc, char* argv[])
330 {
331 calculate_sum();
332 pingpong();
333 pingpong1();
334 pingpong_void();
335 pingpong_void1();
336 dispatch_work();
337 channel_range();
338 channel_range_void();
339
340 deadlock_test();
341 closed_channel_get();
342 closed_channel_get_generation();
343 closed_channel_set();
344
345 deadlock_test1();
346 closed_channel_get1();
347 closed_channel_set1();
348
349 return hpx::util::report_errors();
350 }
351