1 #ifndef BOOST_THREAD_CONCURRENT_QUEUES_DETAIL_SYNC_QUEUE_BASE_HPP
2 #define BOOST_THREAD_CONCURRENT_QUEUES_DETAIL_SYNC_QUEUE_BASE_HPP
3 
4 //////////////////////////////////////////////////////////////////////////////
5 //
6 // (C) Copyright Vicente J. Botet Escriba 2013-2017. Distributed under the Boost
7 // Software License, Version 1.0. (See accompanying file
8 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 // See http://www.boost.org/libs/thread for documentation.
11 //
12 //////////////////////////////////////////////////////////////////////////////
13 
14 #include <boost/thread/detail/config.hpp>
15 #include <boost/thread/condition_variable.hpp>
16 #include <boost/thread/detail/move.hpp>
17 #include <boost/thread/mutex.hpp>
18 #include <boost/thread/concurrent_queues/queue_op_status.hpp>
19 
20 #include <boost/chrono/time_point.hpp>
21 #include <boost/throw_exception.hpp>
22 
23 #include <boost/config/abi_prefix.hpp>
24 
25 namespace boost
26 {
27 namespace concurrent
28 {
29 namespace detail
30 {
31 
32   template <class ValueType, class Queue>
33   class sync_queue_base
34   {
35   public:
36     typedef ValueType value_type;
37     typedef Queue underlying_queue_type;
38     typedef typename Queue::size_type size_type;
39     typedef queue_op_status op_status;
40 
41     // Constructors/Assignment/Destructors
42     BOOST_THREAD_NO_COPYABLE(sync_queue_base)
43     inline sync_queue_base();
44     //template <typename Range>
45     //inline explicit sync_queue(Range range);
46     inline ~sync_queue_base();
47 
48     // Observers
49     inline bool empty() const;
50     inline bool full() const;
51     inline size_type size() const;
52     inline bool closed() const;
53 
54     // Modifiers
55     inline void close();
56 
underlying_queue()57     inline underlying_queue_type underlying_queue() {
58       lock_guard<mutex> lk(mtx_);
59       return boost::move(data_);
60     }
61 
62   protected:
63     mutable mutex mtx_;
64     condition_variable not_empty_;
65     underlying_queue_type data_;
66     bool closed_;
67 
empty(unique_lock<mutex> &) const68     inline bool empty(unique_lock<mutex>& ) const BOOST_NOEXCEPT
69     {
70       return data_.empty();
71     }
empty(lock_guard<mutex> &) const72     inline bool empty(lock_guard<mutex>& ) const BOOST_NOEXCEPT
73     {
74       return data_.empty();
75     }
76 
size(lock_guard<mutex> &) const77     inline size_type size(lock_guard<mutex>& ) const BOOST_NOEXCEPT
78     {
79       return data_.size();
80     }
81     inline bool closed(unique_lock<mutex>& lk) const;
82     inline bool closed(lock_guard<mutex>& lk) const;
83 
84     inline void throw_if_closed(unique_lock<mutex>&);
85     inline void throw_if_closed(lock_guard<mutex>&);
86 
87     inline void wait_until_not_empty(unique_lock<mutex>& lk);
88     inline bool wait_until_not_empty_or_closed(unique_lock<mutex>& lk);
89     template <class WClock, class Duration>
90     queue_op_status wait_until_not_empty_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&);
91 
notify_not_empty_if_needed(unique_lock<mutex> &)92     inline void notify_not_empty_if_needed(unique_lock<mutex>& )
93     {
94       not_empty_.notify_one();
95     }
notify_not_empty_if_needed(lock_guard<mutex> &)96     inline void notify_not_empty_if_needed(lock_guard<mutex>& )
97     {
98       not_empty_.notify_one();
99     }
100 
101   };
102 
103   template <class ValueType, class Queue>
sync_queue_base()104   sync_queue_base<ValueType, Queue>::sync_queue_base() :
105     data_(), closed_(false)
106   {
107     BOOST_ASSERT(data_.empty());
108   }
109 
110   template <class ValueType, class Queue>
~sync_queue_base()111   sync_queue_base<ValueType, Queue>::~sync_queue_base()
112   {
113   }
114 
115   template <class ValueType, class Queue>
close()116   void sync_queue_base<ValueType, Queue>::close()
117   {
118     {
119       lock_guard<mutex> lk(mtx_);
120       closed_ = true;
121     }
122     not_empty_.notify_all();
123   }
124 
125   template <class ValueType, class Queue>
closed() const126   bool sync_queue_base<ValueType, Queue>::closed() const
127   {
128     lock_guard<mutex> lk(mtx_);
129     return closed(lk);
130   }
131   template <class ValueType, class Queue>
closed(unique_lock<mutex> &) const132   bool sync_queue_base<ValueType, Queue>::closed(unique_lock<mutex>&) const
133   {
134     return closed_;
135   }
136   template <class ValueType, class Queue>
closed(lock_guard<mutex> &) const137   bool sync_queue_base<ValueType, Queue>::closed(lock_guard<mutex>&) const
138   {
139     return closed_;
140   }
141 
142   template <class ValueType, class Queue>
empty() const143   bool sync_queue_base<ValueType, Queue>::empty() const
144   {
145     lock_guard<mutex> lk(mtx_);
146     return empty(lk);
147   }
148   template <class ValueType, class Queue>
full() const149   bool sync_queue_base<ValueType, Queue>::full() const
150   {
151     return false;
152   }
153 
154   template <class ValueType, class Queue>
size() const155   typename sync_queue_base<ValueType, Queue>::size_type sync_queue_base<ValueType, Queue>::size() const
156   {
157     lock_guard<mutex> lk(mtx_);
158     return size(lk);
159   }
160 
161   template <class ValueType, class Queue>
throw_if_closed(unique_lock<mutex> & lk)162   void sync_queue_base<ValueType, Queue>::throw_if_closed(unique_lock<mutex>& lk)
163   {
164     if (closed(lk))
165     {
166       BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
167     }
168   }
169   template <class ValueType, class Queue>
throw_if_closed(lock_guard<mutex> & lk)170   void sync_queue_base<ValueType, Queue>::throw_if_closed(lock_guard<mutex>& lk)
171   {
172     if (closed(lk))
173     {
174       BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
175     }
176   }
177 
178   template <class ValueType, class Queue>
wait_until_not_empty(unique_lock<mutex> & lk)179   void sync_queue_base<ValueType, Queue>::wait_until_not_empty(unique_lock<mutex>& lk)
180   {
181     for (;;)
182     {
183       if (! empty(lk)) break;
184       throw_if_closed(lk);
185       not_empty_.wait(lk);
186     }
187   }
188   template <class ValueType, class Queue>
wait_until_not_empty_or_closed(unique_lock<mutex> & lk)189   bool sync_queue_base<ValueType, Queue>::wait_until_not_empty_or_closed(unique_lock<mutex>& lk)
190   {
191     for (;;)
192     {
193       if (! empty(lk)) break;
194       if (closed(lk)) return true;
195       not_empty_.wait(lk);
196     }
197      return false;
198   }
199 
200   template <class ValueType, class Queue>
201   template <class WClock, class Duration>
wait_until_not_empty_until(unique_lock<mutex> & lk,chrono::time_point<WClock,Duration> const & tp)202   queue_op_status sync_queue_base<ValueType, Queue>::wait_until_not_empty_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp)
203   {
204     for (;;)
205     {
206       if (! empty(lk)) return queue_op_status::success;
207       throw_if_closed(lk);
208       if (not_empty_.wait_until(lk, tp) == cv_status::timeout ) return queue_op_status::timeout;
209     }
210   }
211 
212 
213 } // detail
214 } // concurrent
215 } // boost
216 
217 #include <boost/config/abi_suffix.hpp>
218 
219 #endif
220