1 // Copyright (c) 2007-2013 Hartmut Kaiser 2 // Copyright (c) 2013-2015 Agustin Berge 3 // 4 // Distributed under the Boost Software License, Version 1.0. (See accompanying 5 // file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 6 7 #ifndef HPX_LCOS_LOCAL_DETAIL_CONDITION_VARIABLE_HPP 8 #define HPX_LCOS_LOCAL_DETAIL_CONDITION_VARIABLE_HPP 9 10 #include <hpx/config.hpp> 11 #include <hpx/error_code.hpp> 12 #include <hpx/lcos/local/spinlock.hpp> 13 #include <hpx/runtime/threads/thread_data_fwd.hpp> 14 #include <hpx/runtime/threads/thread_enums.hpp> 15 #include <hpx/util/steady_clock.hpp> 16 17 #include <boost/intrusive/slist.hpp> 18 19 #include <cstddef> 20 #include <mutex> 21 #include <utility> 22 23 /////////////////////////////////////////////////////////////////////////////// 24 namespace hpx { namespace lcos { namespace local { namespace detail 25 { 26 class condition_variable 27 { 28 public: 29 HPX_NON_COPYABLE(condition_variable); 30 31 private: 32 typedef lcos::local::spinlock mutex_type; 33 34 private: 35 // define data structures needed for intrusive slist container used for 36 // the queues 37 struct queue_entry 38 { 39 typedef boost::intrusive::slist_member_hook< 40 boost::intrusive::link_mode<boost::intrusive::normal_link> 41 > hook_type; 42 queue_entryhpx::lcos::local::detail::condition_variable::queue_entry43 queue_entry(threads::thread_id_type const& id, void* q) 44 : id_(id), q_(q) 45 {} 46 47 threads::thread_id_type id_; 48 void* q_; 49 hook_type slist_hook_; 50 }; 51 52 typedef boost::intrusive::member_hook< 53 queue_entry, queue_entry::hook_type, 54 &queue_entry::slist_hook_ 55 > slist_option_type; 56 57 typedef boost::intrusive::slist< 58 queue_entry, slist_option_type, 59 boost::intrusive::cache_last<true>, 60 boost::intrusive::constant_time_size<true> 61 > queue_type; 62 63 struct reset_queue_entry 64 { reset_queue_entryhpx::lcos::local::detail::condition_variable::reset_queue_entry65 reset_queue_entry(queue_entry& e, queue_type& q) 66 : e_(e), last_(q.last()) 67 {} 68 ~reset_queue_entryhpx::lcos::local::detail::condition_variable::reset_queue_entry69 ~reset_queue_entry() 70 { 71 if (e_.id_ != threads::invalid_thread_id) 72 { 73 queue_type* q = static_cast<queue_type*>(e_.q_); 74 q->erase(last_); // remove entry from queue 75 } 76 } 77 78 queue_entry& e_; 79 queue_type::const_iterator last_; 80 }; 81 82 public: 83 HPX_EXPORT condition_variable(); 84 85 HPX_EXPORT ~condition_variable(); 86 87 HPX_EXPORT bool empty( 88 std::unique_lock<mutex_type> const& lock) const; 89 90 HPX_EXPORT std::size_t size( 91 std::unique_lock<mutex_type> const& lock) const; 92 93 // Return false if no more threads are waiting (returns true if queue 94 // is non-empty). 95 HPX_EXPORT bool notify_one(std::unique_lock<mutex_type> lock, 96 threads::thread_priority priority, error_code& ec = throws); 97 98 HPX_EXPORT void notify_all(std::unique_lock<mutex_type> lock, 99 threads::thread_priority priority, error_code& ec = throws); 100 notify_one(std::unique_lock<mutex_type> lock,error_code & ec=throws)101 bool notify_one(std::unique_lock<mutex_type> lock, 102 error_code& ec = throws) 103 { 104 return notify_one(std::move(lock), 105 threads::thread_priority_default, ec); 106 } 107 notify_all(std::unique_lock<mutex_type> lock,error_code & ec=throws)108 void notify_all(std::unique_lock<mutex_type> lock, 109 error_code& ec = throws) 110 { 111 return notify_all(std::move(lock), 112 threads::thread_priority_default, ec); 113 } 114 115 HPX_EXPORT void abort_all( 116 std::unique_lock<mutex_type> lock); 117 118 HPX_EXPORT threads::thread_state_ex_enum wait( 119 std::unique_lock<mutex_type>& lock, 120 char const* description, error_code& ec = throws); 121 wait(std::unique_lock<mutex_type> & lock,error_code & ec=throws)122 threads::thread_state_ex_enum wait( 123 std::unique_lock<mutex_type>& lock, 124 error_code& ec = throws) 125 { 126 return wait(lock, "condition_variable::wait", ec); 127 } 128 129 HPX_EXPORT threads::thread_state_ex_enum wait_until( 130 std::unique_lock<mutex_type>& lock, 131 util::steady_time_point const& abs_time, 132 char const* description, error_code& ec = throws); 133 wait_until(std::unique_lock<mutex_type> & lock,util::steady_time_point const & abs_time,error_code & ec=throws)134 threads::thread_state_ex_enum wait_until( 135 std::unique_lock<mutex_type>& lock, 136 util::steady_time_point const& abs_time, 137 error_code& ec = throws) 138 { 139 return wait_until(lock, abs_time, 140 "condition_variable::wait_until", ec); 141 } 142 wait_for(std::unique_lock<mutex_type> & lock,util::steady_duration const & rel_time,char const * description,error_code & ec=throws)143 threads::thread_state_ex_enum wait_for( 144 std::unique_lock<mutex_type>& lock, 145 util::steady_duration const& rel_time, 146 char const* description, error_code& ec = throws) 147 { 148 return wait_until(lock, rel_time.from_now(), description, ec); 149 } 150 wait_for(std::unique_lock<mutex_type> & lock,util::steady_duration const & rel_time,error_code & ec=throws)151 threads::thread_state_ex_enum wait_for( 152 std::unique_lock<mutex_type>& lock, 153 util::steady_duration const& rel_time, 154 error_code& ec = throws) 155 { 156 return wait_until(lock, rel_time.from_now(), 157 "condition_variable::wait_for", ec); 158 } 159 160 private: 161 template <typename Mutex> 162 void abort_all(std::unique_lock<Mutex> lock); 163 164 // re-add the remaining items to the original queue 165 HPX_EXPORT void prepend_entries( 166 std::unique_lock<mutex_type>& lock, queue_type& queue); 167 168 private: 169 queue_type queue_; 170 }; 171 172 }}}} 173 174 #endif /*HPX_LCOS_LOCAL_DETAIL_CONDITION_VARIABLE_HPP*/ 175