1 //  Copyright (c) 2020 Andrey Semashev
2 //
3 //  Distributed under the Boost Software License, Version 1.0.
4 //  See accompanying file LICENSE_1_0.txt or copy at
5 //  http://www.boost.org/LICENSE_1_0.txt)
6 
7 #ifndef BOOST_ATOMIC_TEST_IPC_WAIT_TEST_HELPERS_HPP_INCLUDED_
8 #define BOOST_ATOMIC_TEST_IPC_WAIT_TEST_HELPERS_HPP_INCLUDED_
9 
10 #include <boost/memory_order.hpp>
11 #include <boost/atomic/ipc_atomic_flag.hpp>
12 
13 #include <cstdlib>
14 #include <cstring>
15 #include <iostream>
16 #include <algorithm>
17 #include <boost/config.hpp>
18 #include <boost/chrono/chrono.hpp>
19 #include <boost/bind/bind.hpp>
20 #include <boost/thread/thread.hpp>
21 #include <boost/thread/barrier.hpp>
22 #include <boost/atomic/capabilities.hpp>
23 #include <boost/atomic/ipc_atomic_flag.hpp>
24 #include <boost/type_traits/integral_constant.hpp>
25 #include "atomic_wrapper.hpp"
26 #include "lightweight_test_stream.hpp"
27 #include "test_clock.hpp"
28 
29 //! Since some of the tests below are allowed to fail, we retry up to this many times to pass the test
30 BOOST_CONSTEXPR_OR_CONST unsigned int test_retry_count = 5u;
31 
32 //! The test verifies that the wait operation returns immediately if the passed value does not match the atomic value
33 template< template< typename > class Wrapper, typename T >
test_wait_value_mismatch(T value1,T value2)34 inline void test_wait_value_mismatch(T value1, T value2)
35 {
36     Wrapper< T > m_wrapper(value1);
37 
38     T received_value = m_wrapper.a.wait(value2);
39     BOOST_TEST(received_value == value1);
40 }
41 
42 /*!
43  * The test verifies that notify_one releases one blocked thread and that the released thread receives the modified atomic value.
44  *
45  * Technically, this test is allowed to fail since wait() is allowed to return spuriously. However, normally this should not happen.
46  */
47 template< template< typename > class Wrapper, typename T >
48 class notify_one_test
49 {
50 private:
51     struct thread_state
52     {
53         T m_received_value;
54         test_clock::time_point m_wakeup_time;
55 
thread_statenotify_one_test::thread_state56         explicit thread_state(T value) : m_received_value(value)
57         {
58         }
59     };
60 
61 private:
62     Wrapper< T > m_wrapper;
63 
64     char m_padding[1024];
65 
66     T m_value1, m_value2, m_value3;
67 
68     boost::barrier m_barrier;
69 
70     thread_state m_thread1_state;
71     thread_state m_thread2_state;
72 
73 public:
notify_one_test(T value1,T value2,T value3)74     explicit notify_one_test(T value1, T value2, T value3) :
75         m_wrapper(value1),
76         m_value1(value1),
77         m_value2(value2),
78         m_value3(value3),
79         m_barrier(3),
80         m_thread1_state(value1),
81         m_thread2_state(value1)
82     {
83     }
84 
run()85     bool run()
86     {
87         boost::thread thread1(&notify_one_test::thread_func, this, &m_thread1_state);
88         boost::thread thread2(&notify_one_test::thread_func, this, &m_thread2_state);
89 
90         m_barrier.wait();
91 
92         test_clock::time_point start_time = test_clock::now();
93 
94         boost::this_thread::sleep_for(chrono::milliseconds(200));
95 
96         m_wrapper.a.store(m_value2, boost::memory_order_release);
97         m_wrapper.a.notify_one();
98 
99         boost::this_thread::sleep_for(chrono::milliseconds(200));
100 
101         m_wrapper.a.store(m_value3, boost::memory_order_release);
102         m_wrapper.a.notify_one();
103 
104         if (!thread1.try_join_for(chrono::seconds(3)))
105         {
106             BOOST_ERROR("Thread 1 failed to join");
107             std::abort();
108         }
109         if (!thread2.try_join_for(chrono::seconds(3)))
110         {
111             BOOST_ERROR("Thread 2 failed to join");
112             std::abort();
113         }
114 
115         thread_state* first_state = &m_thread1_state;
116         thread_state* second_state = &m_thread2_state;
117         if (second_state->m_wakeup_time < first_state->m_wakeup_time)
118             std::swap(first_state, second_state);
119 
120         if (m_wrapper.a.has_native_wait_notify())
121         {
122             if ((first_state->m_wakeup_time - start_time) < chrono::milliseconds(200))
123             {
124                 std::cout << "notify_one_test: first thread woke up too soon: " << chrono::duration_cast< chrono::milliseconds >(first_state->m_wakeup_time - start_time).count() << " ms" << std::endl;
125                 return false;
126             }
127 
128             if ((first_state->m_wakeup_time - start_time) >= chrono::milliseconds(400))
129             {
130                 std::cout << "notify_one_test: first thread woke up too late: " << chrono::duration_cast< chrono::milliseconds >(first_state->m_wakeup_time - start_time).count() << " ms" << std::endl;
131                 return false;
132             }
133 
134             if ((second_state->m_wakeup_time - start_time) < chrono::milliseconds(400))
135             {
136                 std::cout << "notify_one_test: second thread woke up too soon: " << chrono::duration_cast< chrono::milliseconds >(second_state->m_wakeup_time - start_time).count() << " ms" << std::endl;
137                 return false;
138             }
139 
140             BOOST_TEST_EQ(first_state->m_received_value, m_value2);
141             BOOST_TEST_EQ(second_state->m_received_value, m_value3);
142         }
143         else
144         {
145             // With the emulated wait/notify the threads are most likely to return prior to notify
146             BOOST_TEST(first_state->m_received_value == m_value2 || first_state->m_received_value == m_value3);
147             BOOST_TEST(second_state->m_received_value == m_value2 || second_state->m_received_value == m_value3);
148         }
149 
150         return true;
151     }
152 
153 private:
thread_func(thread_state * state)154     void thread_func(thread_state* state)
155     {
156         m_barrier.wait();
157 
158         state->m_received_value = m_wrapper.a.wait(m_value1);
159         state->m_wakeup_time = test_clock::now();
160     }
161 };
162 
163 template< template< typename > class Wrapper, typename T >
test_notify_one(T value1,T value2,T value3)164 inline void test_notify_one(T value1, T value2, T value3)
165 {
166     for (unsigned int i = 0u; i < test_retry_count; ++i)
167     {
168         notify_one_test< Wrapper, T > test(value1, value2, value3);
169         if (test.run())
170             return;
171     }
172 
173     BOOST_ERROR("notify_one_test could not complete because blocked thread wake up too soon");
174 }
175 
176 /*!
177  * The test verifies that notify_all releases all blocked threads and that the released threads receive the modified atomic value.
178  *
179  * Technically, this test is allowed to fail since wait() is allowed to return spuriously. However, normally this should not happen.
180  */
181 template< template< typename > class Wrapper, typename T >
182 class notify_all_test
183 {
184 private:
185     struct thread_state
186     {
187         T m_received_value;
188         test_clock::time_point m_wakeup_time;
189 
thread_statenotify_all_test::thread_state190         explicit thread_state(T value) : m_received_value(value)
191         {
192         }
193     };
194 
195 private:
196     Wrapper< T > m_wrapper;
197 
198     char m_padding[1024];
199 
200     T m_value1, m_value2;
201 
202     boost::barrier m_barrier;
203 
204     thread_state m_thread1_state;
205     thread_state m_thread2_state;
206 
207 public:
notify_all_test(T value1,T value2)208     explicit notify_all_test(T value1, T value2) :
209         m_wrapper(value1),
210         m_value1(value1),
211         m_value2(value2),
212         m_barrier(3),
213         m_thread1_state(value1),
214         m_thread2_state(value1)
215     {
216     }
217 
run()218     bool run()
219     {
220         boost::thread thread1(&notify_all_test::thread_func, this, &m_thread1_state);
221         boost::thread thread2(&notify_all_test::thread_func, this, &m_thread2_state);
222 
223         m_barrier.wait();
224 
225         test_clock::time_point start_time = test_clock::now();
226 
227         boost::this_thread::sleep_for(chrono::milliseconds(200));
228 
229         m_wrapper.a.store(m_value2, boost::memory_order_release);
230         m_wrapper.a.notify_all();
231 
232         if (!thread1.try_join_for(chrono::seconds(3)))
233         {
234             BOOST_ERROR("Thread 1 failed to join");
235             std::abort();
236         }
237         if (!thread2.try_join_for(chrono::seconds(3)))
238         {
239             BOOST_ERROR("Thread 2 failed to join");
240             std::abort();
241         }
242 
243         if (m_wrapper.a.has_native_wait_notify())
244         {
245             if ((m_thread1_state.m_wakeup_time - start_time) < chrono::milliseconds(200))
246             {
247                 std::cout << "notify_all_test: first thread woke up too soon: " << chrono::duration_cast< chrono::milliseconds >(m_thread1_state.m_wakeup_time - start_time).count() << " ms" << std::endl;
248                 return false;
249             }
250 
251             if ((m_thread2_state.m_wakeup_time - start_time) < chrono::milliseconds(200))
252             {
253                 std::cout << "notify_all_test: second thread woke up too soon: " << chrono::duration_cast< chrono::milliseconds >(m_thread2_state.m_wakeup_time - start_time).count() << " ms" << std::endl;
254                 return false;
255             }
256         }
257 
258         BOOST_TEST_EQ(m_thread1_state.m_received_value, m_value2);
259         BOOST_TEST_EQ(m_thread2_state.m_received_value, m_value2);
260 
261         return true;
262     }
263 
264 private:
thread_func(thread_state * state)265     void thread_func(thread_state* state)
266     {
267         m_barrier.wait();
268 
269         state->m_received_value = m_wrapper.a.wait(m_value1);
270         state->m_wakeup_time = test_clock::now();
271     }
272 };
273 
274 template< template< typename > class Wrapper, typename T >
test_notify_all(T value1,T value2)275 inline void test_notify_all(T value1, T value2)
276 {
277     for (unsigned int i = 0u; i < test_retry_count; ++i)
278     {
279         notify_all_test< Wrapper, T > test(value1, value2);
280         if (test.run())
281             return;
282     }
283 
284     BOOST_ERROR("notify_all_test could not complete because blocked thread wake up too soon");
285 }
286 
287 //! Invokes all wait/notify tests
288 template< template< typename > class Wrapper, typename T >
test_wait_notify_api(T value1,T value2,T value3,boost::true_type)289 void test_wait_notify_api(T value1, T value2, T value3, boost::true_type)
290 {
291     test_wait_value_mismatch< Wrapper >(value1, value2);
292     test_notify_one< Wrapper >(value1, value2, value3);
293     test_notify_all< Wrapper >(value1, value2);
294 }
295 
296 template< template< typename > class Wrapper, typename T >
test_wait_notify_api(T value1,T value2,T value3,boost::false_type)297 inline void test_wait_notify_api(T value1, T value2, T value3, boost::false_type)
298 {
299 }
300 
301 //! Invokes all wait/notify tests, if the atomic type is lock-free
302 template< template< typename > class Wrapper, typename T >
test_wait_notify_api(T value1,T value2,T value3)303 inline void test_wait_notify_api(T value1, T value2, T value3)
304 {
305     test_wait_notify_api< Wrapper >(value1, value2, value3, boost::integral_constant< bool, Wrapper< T >::atomic_type::is_always_lock_free >());
306 }
307 
308 
test_flag_wait_notify_api()309 inline void test_flag_wait_notify_api()
310 {
311 #if BOOST_ATOMIC_FLAG_LOCK_FREE == 2
312 #ifndef BOOST_ATOMIC_NO_ATOMIC_FLAG_INIT
313     boost::ipc_atomic_flag f = BOOST_ATOMIC_FLAG_INIT;
314 #else
315     boost::ipc_atomic_flag f;
316 #endif
317 
318     bool received_value = f.wait(true);
319     BOOST_TEST(!received_value);
320     f.notify_one();
321     f.notify_all();
322 #endif // BOOST_ATOMIC_FLAG_LOCK_FREE == 2
323 }
324 
325 #endif // BOOST_ATOMIC_TEST_IPC_WAIT_TEST_HELPERS_HPP_INCLUDED_
326