1 //  Copyright (c) 2007-2013 Hartmut Kaiser
2 //  Copyright (c) 2013-2015 Agustin Berge
3 //
4 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
5 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 
7 #ifndef HPX_LCOS_LOCAL_DETAIL_CONDITION_VARIABLE_HPP
8 #define HPX_LCOS_LOCAL_DETAIL_CONDITION_VARIABLE_HPP
9 
10 #include <hpx/config.hpp>
11 #include <hpx/error_code.hpp>
12 #include <hpx/lcos/local/spinlock.hpp>
13 #include <hpx/runtime/threads/thread_data_fwd.hpp>
14 #include <hpx/runtime/threads/thread_enums.hpp>
15 #include <hpx/util/steady_clock.hpp>
16 
17 #include <boost/intrusive/slist.hpp>
18 
19 #include <cstddef>
20 #include <mutex>
21 #include <utility>
22 
23 ///////////////////////////////////////////////////////////////////////////////
24 namespace hpx { namespace lcos { namespace local { namespace detail
25 {
26     class condition_variable
27     {
28     public:
29         HPX_NON_COPYABLE(condition_variable);
30 
31     private:
32         typedef lcos::local::spinlock mutex_type;
33 
34     private:
35         // define data structures needed for intrusive slist container used for
36         // the queues
37         struct queue_entry
38         {
39             typedef boost::intrusive::slist_member_hook<
40                 boost::intrusive::link_mode<boost::intrusive::normal_link>
41             > hook_type;
42 
queue_entryhpx::lcos::local::detail::condition_variable::queue_entry43             queue_entry(threads::thread_id_type const& id, void* q)
44               : id_(id), q_(q)
45             {}
46 
47             threads::thread_id_type id_;
48             void* q_;
49             hook_type slist_hook_;
50         };
51 
52         typedef boost::intrusive::member_hook<
53             queue_entry, queue_entry::hook_type,
54             &queue_entry::slist_hook_
55         > slist_option_type;
56 
57         typedef boost::intrusive::slist<
58             queue_entry, slist_option_type,
59             boost::intrusive::cache_last<true>,
60             boost::intrusive::constant_time_size<true>
61         > queue_type;
62 
63         struct reset_queue_entry
64         {
reset_queue_entryhpx::lcos::local::detail::condition_variable::reset_queue_entry65             reset_queue_entry(queue_entry& e, queue_type& q)
66               : e_(e), last_(q.last())
67             {}
68 
~reset_queue_entryhpx::lcos::local::detail::condition_variable::reset_queue_entry69             ~reset_queue_entry()
70             {
71                 if (e_.id_ != threads::invalid_thread_id)
72                 {
73                     queue_type* q = static_cast<queue_type*>(e_.q_);
74                     q->erase(last_);     // remove entry from queue
75                 }
76             }
77 
78             queue_entry& e_;
79             queue_type::const_iterator last_;
80         };
81 
82     public:
83         HPX_EXPORT condition_variable();
84 
85         HPX_EXPORT ~condition_variable();
86 
87         HPX_EXPORT bool empty(
88             std::unique_lock<mutex_type> const& lock) const;
89 
90         HPX_EXPORT std::size_t size(
91             std::unique_lock<mutex_type> const& lock) const;
92 
93         // Return false if no more threads are waiting (returns true if queue
94         // is non-empty).
95         HPX_EXPORT bool notify_one(std::unique_lock<mutex_type> lock,
96             threads::thread_priority priority, error_code& ec = throws);
97 
98         HPX_EXPORT void notify_all(std::unique_lock<mutex_type> lock,
99             threads::thread_priority priority, error_code& ec = throws);
100 
notify_one(std::unique_lock<mutex_type> lock,error_code & ec=throws)101         bool notify_one(std::unique_lock<mutex_type> lock,
102             error_code& ec = throws)
103         {
104             return notify_one(std::move(lock),
105                 threads::thread_priority_default, ec);
106         }
107 
notify_all(std::unique_lock<mutex_type> lock,error_code & ec=throws)108         void notify_all(std::unique_lock<mutex_type> lock,
109             error_code& ec = throws)
110         {
111             return notify_all(std::move(lock),
112                 threads::thread_priority_default, ec);
113         }
114 
115         HPX_EXPORT void abort_all(
116             std::unique_lock<mutex_type> lock);
117 
118         HPX_EXPORT threads::thread_state_ex_enum wait(
119             std::unique_lock<mutex_type>& lock,
120             char const* description, error_code& ec = throws);
121 
wait(std::unique_lock<mutex_type> & lock,error_code & ec=throws)122         threads::thread_state_ex_enum wait(
123             std::unique_lock<mutex_type>& lock,
124             error_code& ec = throws)
125         {
126             return wait(lock, "condition_variable::wait", ec);
127         }
128 
129         HPX_EXPORT threads::thread_state_ex_enum wait_until(
130             std::unique_lock<mutex_type>& lock,
131             util::steady_time_point const& abs_time,
132             char const* description, error_code& ec = throws);
133 
wait_until(std::unique_lock<mutex_type> & lock,util::steady_time_point const & abs_time,error_code & ec=throws)134         threads::thread_state_ex_enum wait_until(
135             std::unique_lock<mutex_type>& lock,
136             util::steady_time_point const& abs_time,
137             error_code& ec = throws)
138         {
139             return wait_until(lock, abs_time,
140                 "condition_variable::wait_until", ec);
141         }
142 
wait_for(std::unique_lock<mutex_type> & lock,util::steady_duration const & rel_time,char const * description,error_code & ec=throws)143         threads::thread_state_ex_enum wait_for(
144             std::unique_lock<mutex_type>& lock,
145             util::steady_duration const& rel_time,
146             char const* description, error_code& ec = throws)
147         {
148             return wait_until(lock, rel_time.from_now(), description, ec);
149         }
150 
wait_for(std::unique_lock<mutex_type> & lock,util::steady_duration const & rel_time,error_code & ec=throws)151         threads::thread_state_ex_enum wait_for(
152             std::unique_lock<mutex_type>& lock,
153             util::steady_duration const& rel_time,
154             error_code& ec = throws)
155         {
156             return wait_until(lock, rel_time.from_now(),
157                 "condition_variable::wait_for", ec);
158         }
159 
160     private:
161         template <typename Mutex>
162         void abort_all(std::unique_lock<Mutex> lock);
163 
164         // re-add the remaining items to the original queue
165         HPX_EXPORT void prepend_entries(
166             std::unique_lock<mutex_type>& lock, queue_type& queue);
167 
168     private:
169         queue_type queue_;
170     };
171 
172 }}}}
173 
174 #endif /*HPX_LCOS_LOCAL_DETAIL_CONDITION_VARIABLE_HPP*/
175