1 // auto_reset_event_futex.hpp, futex-based event
2 //
3 // Copyright (C) 2013 Tim Blechmann
4 // Copyright (C) 2013 Andrey Semashev
5 //
6 // Distributed under the Boost Software License, Version 1.0. (See
7 // accompanying file LICENSE_1_0.txt or copy at
8 // http://www.boost.org/LICENSE_1_0.txt)
9 
10 #ifndef BOOST_SYNC_DETAIL_EVENTS_AUTO_RESET_EVENT_FUTEX_HPP_INCLUDED_
11 #define BOOST_SYNC_DETAIL_EVENTS_AUTO_RESET_EVENT_FUTEX_HPP_INCLUDED_
12 
13 #include <errno.h>
14 #include <sys/time.h>
15 
16 #include <boost/assert.hpp>
17 #include <boost/static_assert.hpp>
18 #include <boost/utility/enable_if.hpp>
19 #include <boost/sync/detail/config.hpp>
20 #include <boost/sync/detail/atomic.hpp>
21 #include <boost/sync/detail/pause.hpp>
22 #include <boost/sync/detail/futex.hpp>
23 #include <boost/sync/detail/time_traits.hpp>
24 #include <boost/sync/detail/time_units.hpp>
25 #include <boost/sync/detail/header.hpp>
26 
27 #ifdef BOOST_HAS_PRAGMA_ONCE
28 #pragma once
29 #endif
30 
31 namespace boost {
32 
33 namespace sync {
34 
35 BOOST_SYNC_DETAIL_OPEN_ABI_NAMESPACE {
36 
37 class auto_reset_event
38 {
39     BOOST_DELETED_FUNCTION(auto_reset_event(auto_reset_event const&));
40     BOOST_DELETED_FUNCTION(auto_reset_event& operator= (auto_reset_event const&));
41 
42 private:
43     // State bits are divided into post count and waiter count. Post counter is needed to wake
44     // the correct number of threads blocked on the event in case if multiple concurrent posts are made.
45     enum
46     {
47         post_count_lowest_bit = 22u,
48         post_count_one = 1u << post_count_lowest_bit,
49         post_count_mask = 0u - post_count_one,
50         wait_count_mask = (~0u) ^ post_count_mask
51     };
52 
53 public:
54     auto_reset_event() BOOST_NOEXCEPT : m_state(0)
55     {
56     }
57 
58     void post() BOOST_NOEXCEPT
59     {
60         unsigned int old_state = m_state.load(detail::atomic_ns::memory_order_acquire);
61         unsigned int waiters, posts;
62         while (true)
63         {
64             waiters = old_state & wait_count_mask;
65             posts = old_state >> post_count_lowest_bit;
66             if (waiters >= posts)
67             {
68                 if (m_state.compare_exchange_weak(old_state, old_state + post_count_one, detail::atomic_ns::memory_order_acquire, detail::atomic_ns::memory_order_release))
69                     break;
70 
71                 detail::pause();
72             }
73             else
74                 return; // the event is already set (enough times so that all waiters are released and the event is still left signalled)
75         }
76 
77         if (waiters > 0)
78             sync::detail::linux_::futex_signal(reinterpret_cast< int* >(&m_state));
79     }
80 
81     void wait() BOOST_NOEXCEPT
82     {
83         // Try the fast path first
84         if (this->try_wait())
85             return;
86 
87         // Add one waiter
88         unsigned int old_state = m_state.fetch_add(1, detail::atomic_ns::memory_order_acq_rel);
89         while (true)
90         {
91             unsigned int posts = old_state >> post_count_lowest_bit;
92             if (posts == 0)
93             {
94             again:
95                 const int status = sync::detail::linux_::futex_wait(reinterpret_cast< int* >(&m_state), old_state);
96                 if (status != 0)
97                 {
98                     const int err = errno;
99                     switch (err)
100                     {
101                     case EINTR:       // signal received
102                         goto again;
103 
104                     case EWOULDBLOCK: // another thread changed the state
105                         break;
106 
107                     default:
108                         BOOST_ASSERT(false);
109                     }
110                 }
111 
112                 old_state = m_state.load(detail::atomic_ns::memory_order_acquire);
113                 posts = old_state >> post_count_lowest_bit;
114                 if (posts == 0)
115                     goto again;
116             }
117 
118             // Remove one post and one waiter from the counters
119             if (m_state.compare_exchange_strong(old_state, old_state - (post_count_one + 1u), detail::atomic_ns::memory_order_acquire, detail::atomic_ns::memory_order_release))
120                 break;
121         }
122     }
123 
124     bool try_wait() BOOST_NOEXCEPT
125     {
126         unsigned int old_state = m_state.load(detail::atomic_ns::memory_order_acquire);
127 
128         for (unsigned int posts = old_state >> post_count_lowest_bit; posts > 0; posts = old_state >> post_count_lowest_bit)
129         {
130             if (m_state.compare_exchange_weak(old_state, old_state - post_count_one, detail::atomic_ns::memory_order_acquire, detail::atomic_ns::memory_order_release))
131                 return true;
132 
133             detail::pause();
134         }
135 
136         return false;
137     }
138 
139     template< typename Time >
140     typename enable_if_c< sync::detail::time_traits< Time >::is_specialized, bool >::type timed_wait(Time const& timeout)
141     {
142         return priv_timed_wait(sync::detail::time_traits< Time >::to_sync_unit(timeout));
143     }
144 
145     template< typename Duration >
146     typename enable_if< detail::is_time_tag_of< Duration, detail::time_duration_tag >, bool >::type wait_for(Duration const& duration)
147     {
148         return priv_timed_wait(sync::detail::time_traits< Duration >::to_sync_unit(duration));
149     }
150 
151     template< typename TimePoint >
152     typename enable_if< detail::is_time_tag_of< TimePoint, detail::time_point_tag >, bool >::type wait_until(TimePoint const& abs_time)
153     {
154         return priv_timed_wait(sync::detail::time_traits< TimePoint >::to_sync_unit(abs_time));
155     }
156 
157 private:
158     bool priv_timed_wait(sync::detail::system_time_point const& abs_timeout)
159     {
160         // Try the fast path first
161         if (this->try_wait())
162             return true;
163 
164         // Add one waiter
165         unsigned int old_state = m_state.fetch_add(1, detail::atomic_ns::memory_order_acq_rel);
166         while (true)
167         {
168             unsigned int posts = old_state >> post_count_lowest_bit;
169             if (posts == 0)
170             {
171             again:
172                 sync::detail::system_duration::native_type time_left = (abs_timeout - sync::detail::system_time_point::now()).get();
173                 if (time_left <= 0)
174                     return on_wait_timed_out();
175                 const int status = sync::detail::linux_::futex_timedwait(reinterpret_cast< int* >(&m_state), old_state, time_left);
176                 if (status != 0)
177                 {
178                     const int err = errno;
179                     switch (err)
180                     {
181                     case ETIMEDOUT:
182                         return on_wait_timed_out();
183 
184                     case EINTR:       // signal received
185                         goto again;
186 
187                     case EWOULDBLOCK: // another thread changed the state
188                         break;
189 
190                     default:
191                         BOOST_ASSERT(false);
192                     }
193                 }
194 
195                 old_state = m_state.load(detail::atomic_ns::memory_order_acquire);
196                 posts = old_state >> post_count_lowest_bit;
197                 if (posts == 0)
198                     goto again;
199             }
200 
201             // Remove one post and one waiter from the counters
202             if (m_state.compare_exchange_strong(old_state, old_state - (post_count_one + 1u), detail::atomic_ns::memory_order_acquire, detail::atomic_ns::memory_order_release))
203                 break;
204         }
205 
206         return true;
207     }
208 
209     bool priv_timed_wait(sync::detail::system_duration dur)
210     {
211         // Try the fast path first
212         if (this->try_wait())
213             return true;
214 
215         sync::detail::system_duration::native_type time_left = dur.get();
216         if (time_left <= 0)
217             return false;
218 
219         // Add one waiter
220         unsigned int old_state = m_state.fetch_add(1, detail::atomic_ns::memory_order_acq_rel);
221         while (true)
222         {
223             unsigned int posts = old_state >> post_count_lowest_bit;
224             if (posts == 0)
225             {
226             again:
227                 const int status = sync::detail::linux_::futex_timedwait(reinterpret_cast< int* >(&m_state), old_state, time_left);
228                 if (status != 0)
229                 {
230                     const int err = errno;
231                     switch (err)
232                     {
233                     case ETIMEDOUT:
234                         return on_wait_timed_out();
235 
236                     case EINTR:       // signal received
237                         goto again;
238 
239                     case EWOULDBLOCK: // another thread changed the state
240                         break;
241 
242                     default:
243                         BOOST_ASSERT(false);
244                     }
245                 }
246 
247                 old_state = m_state.load(detail::atomic_ns::memory_order_acquire);
248                 posts = old_state >> post_count_lowest_bit;
249                 if (posts == 0)
250                     goto again;
251             }
252 
253             // Remove one post and one waiter from the counters
254             if (m_state.compare_exchange_strong(old_state, old_state - (post_count_one + 1u), detail::atomic_ns::memory_order_acquire, detail::atomic_ns::memory_order_release))
255                 break;
256         }
257 
258         return true;
259     }
260 
261     template< typename TimePoint >
262     bool priv_timed_wait(sync::detail::chrono_time_point< TimePoint > const& t)
263     {
264         typedef TimePoint time_point;
265         typedef typename time_point::clock clock;
266         typedef typename time_point::duration duration;
267         time_point now = clock::now();
268         do
269         {
270             if (timed_wait(sync::detail::time_traits< duration >::to_sync_unit(t.get() - now)))
271                 return true;
272             now = clock::now();
273         }
274         while (now < t.get());
275 
276         return false;
277     }
278 
279     bool on_wait_timed_out()
280     {
281         unsigned int old_state = m_state.load(detail::atomic_ns::memory_order_acquire);
282         while (true)
283         {
284             unsigned int posts = old_state >> post_count_lowest_bit;
285             if (posts == 0)
286             {
287                 // Remove one waiter
288                 if (m_state.compare_exchange_weak(old_state, old_state - 1u, detail::atomic_ns::memory_order_acquire, detail::atomic_ns::memory_order_release))
289                     return false;
290             }
291             else
292             {
293                 // Remove one post and one waiter from the counters
294                 if (m_state.compare_exchange_weak(old_state, old_state - (post_count_one + 1u), detail::atomic_ns::memory_order_acquire, detail::atomic_ns::memory_order_release))
295                     return true;
296             }
297 
298             detail::pause();
299         }
300     }
301 
302 private:
303     BOOST_STATIC_ASSERT_MSG(sizeof(detail::atomic_ns::atomic< unsigned int >) == sizeof(int), "Boost.Sync: unexpected size of atomic< unsigned int >");
304     detail::atomic_ns::atomic< unsigned int > m_state;
305 };
306 
307 } // namespace abi
308 
309 } // namespace sync
310 
311 } // namespace boost
312 
313 #include <boost/sync/detail/footer.hpp>
314 
315 #endif // BOOST_SYNC_DETAIL_EVENTS_AUTO_RESET_EVENT_FUTEX_HPP_INCLUDED_
316