1 //////////////////////////////////////////////////////////////////////////////
2 //
3 // (C) Copyright Ion Gaztanaga 2005-2012. Distributed under the Boost
4 // Software License, Version 1.0. (See accompanying file
5 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 // See http://www.boost.org/libs/interprocess for documentation.
8 //
9 //////////////////////////////////////////////////////////////////////////////
10 
11 #ifndef BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP
12 #define BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP
13 
14 #ifndef BOOST_CONFIG_HPP
15 #  include <boost/config.hpp>
16 #endif
17 #
18 #if defined(BOOST_HAS_PRAGMA_ONCE)
19 #  pragma once
20 #endif
21 
22 #include <boost/interprocess/detail/config_begin.hpp>
23 #include <boost/interprocess/detail/workaround.hpp>
24 #include <boost/interprocess/sync/spin/mutex.hpp>
25 #include <boost/interprocess/detail/posix_time_types_wrk.hpp>
26 #include <boost/interprocess/detail/atomic.hpp>
27 #include <boost/interprocess/sync/scoped_lock.hpp>
28 #include <boost/interprocess/exceptions.hpp>
29 #include <boost/interprocess/detail/os_thread_functions.hpp>
30 #include <boost/interprocess/sync/spin/wait.hpp>
31 #include <boost/move/utility_core.hpp>
32 #include <boost/cstdint.hpp>
33 
34 namespace boost {
35 namespace interprocess {
36 namespace ipcdetail {
37 
38 class spin_condition
39 {
40    spin_condition(const spin_condition &);
41    spin_condition &operator=(const spin_condition &);
42    public:
43    spin_condition();
44    ~spin_condition();
45 
46    void notify_one();
47    void notify_all();
48 
49    template <typename L>
timed_wait(L & lock,const boost::posix_time::ptime & abs_time)50    bool timed_wait(L& lock, const boost::posix_time::ptime &abs_time)
51    {
52       if (!lock)
53          throw lock_exception();
54       //Handle infinity absolute time here to avoid complications in do_timed_wait
55       if(abs_time == boost::posix_time::pos_infin){
56          this->wait(lock);
57          return true;
58       }
59       return this->do_timed_wait(abs_time, *lock.mutex());
60    }
61 
62    template <typename L, typename Pr>
timed_wait(L & lock,const boost::posix_time::ptime & abs_time,Pr pred)63    bool timed_wait(L& lock, const boost::posix_time::ptime &abs_time, Pr pred)
64    {
65       if (!lock)
66          throw lock_exception();
67       //Handle infinity absolute time here to avoid complications in do_timed_wait
68       if(abs_time == boost::posix_time::pos_infin){
69          this->wait(lock, pred);
70          return true;
71       }
72       while (!pred()){
73          if (!this->do_timed_wait(abs_time, *lock.mutex()))
74             return pred();
75       }
76       return true;
77    }
78 
79    template <typename L>
wait(L & lock)80    void wait(L& lock)
81    {
82       if (!lock)
83          throw lock_exception();
84       do_wait(*lock.mutex());
85    }
86 
87    template <typename L, typename Pr>
wait(L & lock,Pr pred)88    void wait(L& lock, Pr pred)
89    {
90       if (!lock)
91          throw lock_exception();
92 
93       while (!pred())
94          do_wait(*lock.mutex());
95    }
96 
97    template<class InterprocessMutex>
98    void do_wait(InterprocessMutex &mut);
99 
100    template<class InterprocessMutex>
101    bool do_timed_wait(const boost::posix_time::ptime &abs_time, InterprocessMutex &mut);
102 
103    private:
104    template<class InterprocessMutex>
105    bool do_timed_wait(bool tout_enabled, const boost::posix_time::ptime &abs_time, InterprocessMutex &mut);
106 
107    enum { SLEEP = 0, NOTIFY_ONE, NOTIFY_ALL };
108    spin_mutex  m_enter_mut;
109    volatile boost::uint32_t    m_command;
110    volatile boost::uint32_t    m_num_waiters;
111    void notify(boost::uint32_t command);
112 };
113 
spin_condition()114 inline spin_condition::spin_condition()
115 {
116    //Note that this class is initialized to zero.
117    //So zeroed memory can be interpreted as an initialized
118    //condition variable
119    m_command      = SLEEP;
120    m_num_waiters  = 0;
121 }
122 
~spin_condition()123 inline spin_condition::~spin_condition()
124 {
125    //Notify all waiting threads
126    //to allow POSIX semantics on condition destruction
127    this->notify_all();
128 }
129 
notify_one()130 inline void spin_condition::notify_one()
131 {
132    this->notify(NOTIFY_ONE);
133 }
134 
notify_all()135 inline void spin_condition::notify_all()
136 {
137    this->notify(NOTIFY_ALL);
138 }
139 
notify(boost::uint32_t command)140 inline void spin_condition::notify(boost::uint32_t command)
141 {
142    //This mutex guarantees that no other thread can enter to the
143    //do_timed_wait method logic, so that thread count will be
144    //constant until the function writes a NOTIFY_ALL command.
145    //It also guarantees that no other notification can be signaled
146    //on this spin_condition before this one ends
147    m_enter_mut.lock();
148 
149    //Return if there are no waiters
150    if(!atomic_read32(&m_num_waiters)) {
151       m_enter_mut.unlock();
152       return;
153    }
154 
155    //Notify that all threads should execute wait logic
156    spin_wait swait;
157    while(SLEEP != atomic_cas32(const_cast<boost::uint32_t*>(&m_command), command, SLEEP)){
158       swait.yield();
159    }
160    //The enter mutex will rest locked until the last waiting thread unlocks it
161 }
162 
163 template<class InterprocessMutex>
do_wait(InterprocessMutex & mut)164 inline void spin_condition::do_wait(InterprocessMutex &mut)
165 {
166    this->do_timed_wait(false, boost::posix_time::ptime(), mut);
167 }
168 
169 template<class InterprocessMutex>
do_timed_wait(const boost::posix_time::ptime & abs_time,InterprocessMutex & mut)170 inline bool spin_condition::do_timed_wait
171    (const boost::posix_time::ptime &abs_time, InterprocessMutex &mut)
172 {
173    return this->do_timed_wait(true, abs_time, mut);
174 }
175 
176 template<class InterprocessMutex>
do_timed_wait(bool tout_enabled,const boost::posix_time::ptime & abs_time,InterprocessMutex & mut)177 inline bool spin_condition::do_timed_wait(bool tout_enabled,
178                                      const boost::posix_time::ptime &abs_time,
179                                      InterprocessMutex &mut)
180 {
181    boost::posix_time::ptime now = microsec_clock::universal_time();
182 
183    if(tout_enabled){
184       if(now >= abs_time) return false;
185    }
186 
187    typedef boost::interprocess::scoped_lock<spin_mutex> InternalLock;
188    //The enter mutex guarantees that while executing a notification,
189    //no other thread can execute the do_timed_wait method.
190    {
191       //---------------------------------------------------------------
192       InternalLock lock;
193       if(tout_enabled){
194          InternalLock dummy(m_enter_mut, abs_time);
195          lock = boost::move(dummy);
196       }
197       else{
198          InternalLock dummy(m_enter_mut);
199          lock = boost::move(dummy);
200       }
201 
202       if(!lock)
203          return false;
204       //---------------------------------------------------------------
205       //We increment the waiting thread count protected so that it will be
206       //always constant when another thread enters the notification logic.
207       //The increment marks this thread as "waiting on spin_condition"
208       atomic_inc32(const_cast<boost::uint32_t*>(&m_num_waiters));
209 
210       //We unlock the external mutex atomically with the increment
211       mut.unlock();
212    }
213 
214    //By default, we suppose that no timeout has happened
215    bool timed_out  = false, unlock_enter_mut= false;
216 
217    //Loop until a notification indicates that the thread should
218    //exit or timeout occurs
219    while(1){
220       //The thread sleeps/spins until a spin_condition commands a notification
221       //Notification occurred, we will lock the checking mutex so that
222       spin_wait swait;
223       while(atomic_read32(&m_command) == SLEEP){
224          swait.yield();
225 
226          //Check for timeout
227          if(tout_enabled){
228             now = microsec_clock::universal_time();
229 
230             if(now >= abs_time){
231                //If we can lock the mutex it means that no notification
232                //is being executed in this spin_condition variable
233                timed_out = m_enter_mut.try_lock();
234 
235                //If locking fails, indicates that another thread is executing
236                //notification, so we play the notification game
237                if(!timed_out){
238                   //There is an ongoing notification, we will try again later
239                   continue;
240                }
241                //No notification in execution, since enter mutex is locked.
242                //We will execute time-out logic, so we will decrement count,
243                //release the enter mutex and return false.
244                break;
245             }
246          }
247       }
248 
249       //If a timeout occurred, the mutex will not execute checking logic
250       if(tout_enabled && timed_out){
251          //Decrement wait count
252          atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
253          unlock_enter_mut = true;
254          break;
255       }
256       else{
257          boost::uint32_t result = atomic_cas32
258                         (const_cast<boost::uint32_t*>(&m_command), SLEEP, NOTIFY_ONE);
259          if(result == SLEEP){
260             //Other thread has been notified and since it was a NOTIFY one
261             //command, this thread must sleep again
262             continue;
263          }
264          else if(result == NOTIFY_ONE){
265             //If it was a NOTIFY_ONE command, only this thread should
266             //exit. This thread has atomically marked command as sleep before
267             //so no other thread will exit.
268             //Decrement wait count.
269             unlock_enter_mut = true;
270             atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
271             break;
272          }
273          else{
274             //If it is a NOTIFY_ALL command, all threads should return
275             //from do_timed_wait function. Decrement wait count.
276             unlock_enter_mut = 1 == atomic_dec32(const_cast<boost::uint32_t*>(&m_num_waiters));
277             //Check if this is the last thread of notify_all waiters
278             //Only the last thread will release the mutex
279             if(unlock_enter_mut){
280                atomic_cas32(const_cast<boost::uint32_t*>(&m_command), SLEEP, NOTIFY_ALL);
281             }
282             break;
283          }
284       }
285    }
286 
287    //Unlock the enter mutex if it is a single notification, if this is
288    //the last notified thread in a notify_all or a timeout has occurred
289    if(unlock_enter_mut){
290       m_enter_mut.unlock();
291    }
292 
293    //Lock external again before returning from the method
294    mut.lock();
295    return !timed_out;
296 }
297 
298 }  //namespace ipcdetail
299 }  //namespace interprocess
300 }  //namespace boost
301 
302 #include <boost/interprocess/detail/config_end.hpp>
303 
304 #endif   //BOOST_INTERPROCESS_DETAIL_SPIN_CONDITION_HPP
305