1 // auto_reset_event_futex.hpp, futex-based event 2 // 3 // Copyright (C) 2013 Tim Blechmann 4 // Copyright (C) 2013 Andrey Semashev 5 // 6 // Distributed under the Boost Software License, Version 1.0. (See 7 // accompanying file LICENSE_1_0.txt or copy at 8 // http://www.boost.org/LICENSE_1_0.txt) 9 10 #ifndef BOOST_SYNC_DETAIL_EVENTS_AUTO_RESET_EVENT_FUTEX_HPP_INCLUDED_ 11 #define BOOST_SYNC_DETAIL_EVENTS_AUTO_RESET_EVENT_FUTEX_HPP_INCLUDED_ 12 13 #include <errno.h> 14 #include <sys/time.h> 15 16 #include <boost/assert.hpp> 17 #include <boost/static_assert.hpp> 18 #include <boost/utility/enable_if.hpp> 19 #include <boost/sync/detail/config.hpp> 20 #include <boost/sync/detail/atomic.hpp> 21 #include <boost/sync/detail/pause.hpp> 22 #include <boost/sync/detail/futex.hpp> 23 #include <boost/sync/detail/time_traits.hpp> 24 #include <boost/sync/detail/time_units.hpp> 25 #include <boost/sync/detail/header.hpp> 26 27 #ifdef BOOST_HAS_PRAGMA_ONCE 28 #pragma once 29 #endif 30 31 namespace boost { 32 33 namespace sync { 34 35 BOOST_SYNC_DETAIL_OPEN_ABI_NAMESPACE { 36 37 class auto_reset_event 38 { 39 BOOST_DELETED_FUNCTION(auto_reset_event(auto_reset_event const&)); 40 BOOST_DELETED_FUNCTION(auto_reset_event& operator= (auto_reset_event const&)); 41 42 private: 43 // State bits are divided into post count and waiter count. Post counter is needed to wake 44 // the correct number of threads blocked on the event in case if multiple concurrent posts are made. 45 enum 46 { 47 post_count_lowest_bit = 22u, 48 post_count_one = 1u << post_count_lowest_bit, 49 post_count_mask = 0u - post_count_one, 50 wait_count_mask = (~0u) ^ post_count_mask 51 }; 52 53 public: 54 auto_reset_event() BOOST_NOEXCEPT : m_state(0) 55 { 56 } 57 58 void post() BOOST_NOEXCEPT 59 { 60 unsigned int old_state = m_state.load(detail::atomic_ns::memory_order_acquire); 61 unsigned int waiters, posts; 62 while (true) 63 { 64 waiters = old_state & wait_count_mask; 65 posts = old_state >> post_count_lowest_bit; 66 if (waiters >= posts) 67 { 68 if (m_state.compare_exchange_weak(old_state, old_state + post_count_one, detail::atomic_ns::memory_order_acquire, detail::atomic_ns::memory_order_release)) 69 break; 70 71 detail::pause(); 72 } 73 else 74 return; // the event is already set (enough times so that all waiters are released and the event is still left signalled) 75 } 76 77 if (waiters > 0) 78 sync::detail::linux_::futex_signal(reinterpret_cast< int* >(&m_state)); 79 } 80 81 void wait() BOOST_NOEXCEPT 82 { 83 // Try the fast path first 84 if (this->try_wait()) 85 return; 86 87 // Add one waiter 88 unsigned int old_state = m_state.fetch_add(1, detail::atomic_ns::memory_order_acq_rel); 89 while (true) 90 { 91 unsigned int posts = old_state >> post_count_lowest_bit; 92 if (posts == 0) 93 { 94 again: 95 const int status = sync::detail::linux_::futex_wait(reinterpret_cast< int* >(&m_state), old_state); 96 if (status != 0) 97 { 98 const int err = errno; 99 switch (err) 100 { 101 case EINTR: // signal received 102 goto again; 103 104 case EWOULDBLOCK: // another thread changed the state 105 break; 106 107 default: 108 BOOST_ASSERT(false); 109 } 110 } 111 112 old_state = m_state.load(detail::atomic_ns::memory_order_acquire); 113 posts = old_state >> post_count_lowest_bit; 114 if (posts == 0) 115 goto again; 116 } 117 118 // Remove one post and one waiter from the counters 119 if (m_state.compare_exchange_strong(old_state, old_state - (post_count_one + 1u), detail::atomic_ns::memory_order_acquire, detail::atomic_ns::memory_order_release)) 120 break; 121 } 122 } 123 124 bool try_wait() BOOST_NOEXCEPT 125 { 126 unsigned int old_state = m_state.load(detail::atomic_ns::memory_order_acquire); 127 128 for (unsigned int posts = old_state >> post_count_lowest_bit; posts > 0; posts = old_state >> post_count_lowest_bit) 129 { 130 if (m_state.compare_exchange_weak(old_state, old_state - post_count_one, detail::atomic_ns::memory_order_acquire, detail::atomic_ns::memory_order_release)) 131 return true; 132 133 detail::pause(); 134 } 135 136 return false; 137 } 138 139 template< typename Time > 140 typename enable_if_c< sync::detail::time_traits< Time >::is_specialized, bool >::type timed_wait(Time const& timeout) 141 { 142 return priv_timed_wait(sync::detail::time_traits< Time >::to_sync_unit(timeout)); 143 } 144 145 template< typename Duration > 146 typename enable_if< detail::is_time_tag_of< Duration, detail::time_duration_tag >, bool >::type wait_for(Duration const& duration) 147 { 148 return priv_timed_wait(sync::detail::time_traits< Duration >::to_sync_unit(duration)); 149 } 150 151 template< typename TimePoint > 152 typename enable_if< detail::is_time_tag_of< TimePoint, detail::time_point_tag >, bool >::type wait_until(TimePoint const& abs_time) 153 { 154 return priv_timed_wait(sync::detail::time_traits< TimePoint >::to_sync_unit(abs_time)); 155 } 156 157 private: 158 bool priv_timed_wait(sync::detail::system_time_point const& abs_timeout) 159 { 160 // Try the fast path first 161 if (this->try_wait()) 162 return true; 163 164 // Add one waiter 165 unsigned int old_state = m_state.fetch_add(1, detail::atomic_ns::memory_order_acq_rel); 166 while (true) 167 { 168 unsigned int posts = old_state >> post_count_lowest_bit; 169 if (posts == 0) 170 { 171 again: 172 sync::detail::system_duration::native_type time_left = (abs_timeout - sync::detail::system_time_point::now()).get(); 173 if (time_left <= 0) 174 return on_wait_timed_out(); 175 const int status = sync::detail::linux_::futex_timedwait(reinterpret_cast< int* >(&m_state), old_state, time_left); 176 if (status != 0) 177 { 178 const int err = errno; 179 switch (err) 180 { 181 case ETIMEDOUT: 182 return on_wait_timed_out(); 183 184 case EINTR: // signal received 185 goto again; 186 187 case EWOULDBLOCK: // another thread changed the state 188 break; 189 190 default: 191 BOOST_ASSERT(false); 192 } 193 } 194 195 old_state = m_state.load(detail::atomic_ns::memory_order_acquire); 196 posts = old_state >> post_count_lowest_bit; 197 if (posts == 0) 198 goto again; 199 } 200 201 // Remove one post and one waiter from the counters 202 if (m_state.compare_exchange_strong(old_state, old_state - (post_count_one + 1u), detail::atomic_ns::memory_order_acquire, detail::atomic_ns::memory_order_release)) 203 break; 204 } 205 206 return true; 207 } 208 209 bool priv_timed_wait(sync::detail::system_duration dur) 210 { 211 // Try the fast path first 212 if (this->try_wait()) 213 return true; 214 215 sync::detail::system_duration::native_type time_left = dur.get(); 216 if (time_left <= 0) 217 return false; 218 219 // Add one waiter 220 unsigned int old_state = m_state.fetch_add(1, detail::atomic_ns::memory_order_acq_rel); 221 while (true) 222 { 223 unsigned int posts = old_state >> post_count_lowest_bit; 224 if (posts == 0) 225 { 226 again: 227 const int status = sync::detail::linux_::futex_timedwait(reinterpret_cast< int* >(&m_state), old_state, time_left); 228 if (status != 0) 229 { 230 const int err = errno; 231 switch (err) 232 { 233 case ETIMEDOUT: 234 return on_wait_timed_out(); 235 236 case EINTR: // signal received 237 goto again; 238 239 case EWOULDBLOCK: // another thread changed the state 240 break; 241 242 default: 243 BOOST_ASSERT(false); 244 } 245 } 246 247 old_state = m_state.load(detail::atomic_ns::memory_order_acquire); 248 posts = old_state >> post_count_lowest_bit; 249 if (posts == 0) 250 goto again; 251 } 252 253 // Remove one post and one waiter from the counters 254 if (m_state.compare_exchange_strong(old_state, old_state - (post_count_one + 1u), detail::atomic_ns::memory_order_acquire, detail::atomic_ns::memory_order_release)) 255 break; 256 } 257 258 return true; 259 } 260 261 template< typename TimePoint > 262 bool priv_timed_wait(sync::detail::chrono_time_point< TimePoint > const& t) 263 { 264 typedef TimePoint time_point; 265 typedef typename time_point::clock clock; 266 typedef typename time_point::duration duration; 267 time_point now = clock::now(); 268 do 269 { 270 if (timed_wait(sync::detail::time_traits< duration >::to_sync_unit(t.get() - now))) 271 return true; 272 now = clock::now(); 273 } 274 while (now < t.get()); 275 276 return false; 277 } 278 279 bool on_wait_timed_out() 280 { 281 unsigned int old_state = m_state.load(detail::atomic_ns::memory_order_acquire); 282 while (true) 283 { 284 unsigned int posts = old_state >> post_count_lowest_bit; 285 if (posts == 0) 286 { 287 // Remove one waiter 288 if (m_state.compare_exchange_weak(old_state, old_state - 1u, detail::atomic_ns::memory_order_acquire, detail::atomic_ns::memory_order_release)) 289 return false; 290 } 291 else 292 { 293 // Remove one post and one waiter from the counters 294 if (m_state.compare_exchange_weak(old_state, old_state - (post_count_one + 1u), detail::atomic_ns::memory_order_acquire, detail::atomic_ns::memory_order_release)) 295 return true; 296 } 297 298 detail::pause(); 299 } 300 } 301 302 private: 303 BOOST_STATIC_ASSERT_MSG(sizeof(detail::atomic_ns::atomic< unsigned int >) == sizeof(int), "Boost.Sync: unexpected size of atomic< unsigned int >"); 304 detail::atomic_ns::atomic< unsigned int > m_state; 305 }; 306 307 } // namespace abi 308 309 } // namespace sync 310 311 } // namespace boost 312 313 #include <boost/sync/detail/footer.hpp> 314 315 #endif // BOOST_SYNC_DETAIL_EVENTS_AUTO_RESET_EVENT_FUTEX_HPP_INCLUDED_ 316