1 // Copyright (C) 2014 Ian Forbed
2 // Copyright (C) 2014-2017 Vicente J. Botet Escriba
3 //
4 //  Distributed under the Boost Software License, Version 1.0. (See accompanying
5 //  file LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
6 //
7 
8 #ifndef BOOST_THREAD_SYNC_PRIORITY_QUEUE
9 #define BOOST_THREAD_SYNC_PRIORITY_QUEUE
10 
11 #include <boost/thread/detail/config.hpp>
12 
13 #include <boost/thread/concurrent_queues/detail/sync_queue_base.hpp>
14 #include <boost/thread/concurrent_queues/queue_op_status.hpp>
15 #include <boost/thread/condition_variable.hpp>
16 #include <boost/thread/csbl/vector.hpp>
17 #include <boost/thread/detail/move.hpp>
18 #include <boost/thread/mutex.hpp>
19 
20 #include <boost/atomic.hpp>
21 #include <boost/chrono/duration.hpp>
22 #include <boost/chrono/time_point.hpp>
23 
24 #include <exception>
25 #include <queue>
26 #include <utility>
27 
28 #include <boost/config/abi_prefix.hpp>
29 
30 namespace boost
31 {
32 namespace detail {
33 
34   template <
35     class Type,
36     class Container = csbl::vector<Type>,
37     class Compare = std::less<Type>
38   >
39   class priority_queue
40   {
41   private:
42       Container _elements;
43       Compare _compare;
44   public:
45       typedef Type value_type;
46       typedef typename Container::size_type size_type;
47 
priority_queue(const Compare & compare=Compare ())48       explicit priority_queue(const Compare& compare = Compare())
49           : _elements(), _compare(compare)
50       { }
51 
size() const52       size_type size() const
53       {
54           return _elements.size();
55       }
56 
empty() const57       bool empty() const
58       {
59           return _elements.empty();
60       }
61 
push(Type const & element)62       void push(Type const& element)
63       {
64           _elements.push_back(element);
65           std::push_heap(_elements.begin(), _elements.end(), _compare);
66       }
push(BOOST_RV_REF (Type)element)67       void push(BOOST_RV_REF(Type) element)
68       {
69           _elements.push_back(boost::move(element));
70           std::push_heap(_elements.begin(), _elements.end(), _compare);
71       }
72 
pop()73       void pop()
74       {
75           std::pop_heap(_elements.begin(), _elements.end(), _compare);
76           _elements.pop_back();
77       }
pull()78       Type pull()
79       {
80           Type result = boost::move(_elements.front());
81           pop();
82           return boost::move(result);
83       }
84 
top() const85       Type const& top() const
86       {
87           return _elements.front();
88       }
89   };
90 }
91 
92 namespace concurrent
93 {
94   template <class ValueType,
95             class Container = csbl::vector<ValueType>,
96             class Compare = std::less<typename Container::value_type> >
97   class sync_priority_queue
98     : public detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> >
99   {
100     typedef detail::sync_queue_base<ValueType, boost::detail::priority_queue<ValueType,Container,Compare> >  super;
101 
102   public:
103     typedef ValueType value_type;
104     //typedef typename super::value_type value_type; // fixme
105     typedef typename super::underlying_queue_type underlying_queue_type;
106     typedef typename super::size_type size_type;
107     typedef typename super::op_status op_status;
108 
109     typedef chrono::steady_clock clock;
110   protected:
111 
112   public:
sync_priority_queue()113     sync_priority_queue() {}
114 
~sync_priority_queue()115     ~sync_priority_queue()
116     {
117       if(!super::closed())
118       {
119         super::close();
120       }
121     }
122 
123     void push(const ValueType& elem);
124     void push(BOOST_THREAD_RV_REF(ValueType) elem);
125 
126     queue_op_status try_push(const ValueType& elem);
127     queue_op_status try_push(BOOST_THREAD_RV_REF(ValueType) elem);
128 
129     ValueType pull();
130 
131     void pull(ValueType&);
132 
133     template <class WClock, class Duration>
134     queue_op_status pull_until(const chrono::time_point<WClock,Duration>&, ValueType&);
135     template <class Rep, class Period>
136     queue_op_status pull_for(const chrono::duration<Rep,Period>&, ValueType&);
137 
138     queue_op_status try_pull(ValueType& elem);
139     queue_op_status wait_pull(ValueType& elem);
140     queue_op_status nonblocking_pull(ValueType&);
141 
142   private:
143     void push(unique_lock<mutex>&, const ValueType& elem);
144     void push(lock_guard<mutex>&, const ValueType& elem);
145     void push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
146     void push(lock_guard<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
147 
148     queue_op_status try_push(unique_lock<mutex>&, const ValueType& elem);
149     queue_op_status try_push(unique_lock<mutex>&, BOOST_THREAD_RV_REF(ValueType) elem);
150 
151     ValueType pull(unique_lock<mutex>&);
152     ValueType pull(lock_guard<mutex>&);
153 
154     void pull(unique_lock<mutex>&, ValueType&);
155     void pull(lock_guard<mutex>&, ValueType&);
156 
157     queue_op_status try_pull(lock_guard<mutex>& lk, ValueType& elem);
158     queue_op_status try_pull(unique_lock<mutex>& lk, ValueType& elem);
159 
160     queue_op_status wait_pull(unique_lock<mutex>& lk, ValueType& elem);
161 
162     queue_op_status nonblocking_pull(unique_lock<mutex>& lk, ValueType&);
163 
164     sync_priority_queue(const sync_priority_queue&);
165     sync_priority_queue& operator= (const sync_priority_queue&);
166     sync_priority_queue(BOOST_THREAD_RV_REF(sync_priority_queue));
167     sync_priority_queue& operator= (BOOST_THREAD_RV_REF(sync_priority_queue));
168   }; //end class
169 
170 
171   //////////////////////
172   template <class T, class Container,class Cmp>
push(unique_lock<mutex> & lk,const T & elem)173   void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, const T& elem)
174   {
175     super::throw_if_closed(lk);
176     super::data_.push(elem);
177     super::notify_elem_added(lk);
178   }
179   template <class T, class Container,class Cmp>
push(lock_guard<mutex> & lk,const T & elem)180   void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, const T& elem)
181   {
182     super::throw_if_closed(lk);
183     super::data_.push(elem);
184     super::notify_elem_added(lk);
185   }
186   template <class T, class Container,class Cmp>
push(const T & elem)187   void sync_priority_queue<T,Container,Cmp>::push(const T& elem)
188   {
189     lock_guard<mutex> lk(super::mtx_);
190     push(lk, elem);
191   }
192 
193   //////////////////////
194   template <class T, class Container,class Cmp>
push(unique_lock<mutex> & lk,BOOST_THREAD_RV_REF (T)elem)195   void sync_priority_queue<T,Container,Cmp>::push(unique_lock<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
196   {
197     super::throw_if_closed(lk);
198     super::data_.push(boost::move(elem));
199     super::notify_elem_added(lk);
200   }
201   template <class T, class Container,class Cmp>
push(lock_guard<mutex> & lk,BOOST_THREAD_RV_REF (T)elem)202   void sync_priority_queue<T,Container,Cmp>::push(lock_guard<mutex>& lk, BOOST_THREAD_RV_REF(T) elem)
203   {
204     super::throw_if_closed(lk);
205     super::data_.push(boost::move(elem));
206     super::notify_elem_added(lk);
207   }
208   template <class T, class Container,class Cmp>
push(BOOST_THREAD_RV_REF (T)elem)209   void sync_priority_queue<T,Container,Cmp>::push(BOOST_THREAD_RV_REF(T) elem)
210   {
211     lock_guard<mutex> lk(super::mtx_);
212     push(lk, boost::move(elem));
213   }
214 
215   //////////////////////
216   template <class T, class Container,class Cmp>
try_push(const T & elem)217   queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(const T& elem)
218   {
219     lock_guard<mutex> lk(super::mtx_);
220     if (super::closed(lk)) return queue_op_status::closed;
221     push(lk, elem);
222     return queue_op_status::success;
223   }
224 
225   //////////////////////
226   template <class T, class Container,class Cmp>
try_push(BOOST_THREAD_RV_REF (T)elem)227   queue_op_status sync_priority_queue<T,Container,Cmp>::try_push(BOOST_THREAD_RV_REF(T) elem)
228   {
229     lock_guard<mutex> lk(super::mtx_);
230     if (super::closed(lk)) return queue_op_status::closed;
231     push(lk, boost::move(elem));
232 
233     return queue_op_status::success;
234   }
235 
236   //////////////////////
237   template <class T,class Container, class Cmp>
238   T sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&)
239   {
240     return super::data_.pull();
241   }
242   template <class T,class Container, class Cmp>
243   T sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&)
244   {
245     return super::data_.pull();
246   }
247 
248   template <class T,class Container, class Cmp>
249   T sync_priority_queue<T,Container,Cmp>::pull()
250   {
251     unique_lock<mutex> lk(super::mtx_);
252     const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
253     if (has_been_closed) super::throw_if_closed(lk);
254     return pull(lk);
255   }
256 
257   //////////////////////
258   template <class T,class Container, class Cmp>
pull(unique_lock<mutex> &,T & elem)259   void sync_priority_queue<T,Container,Cmp>::pull(unique_lock<mutex>&, T& elem)
260   {
261     elem = super::data_.pull();
262   }
263   template <class T,class Container, class Cmp>
pull(lock_guard<mutex> &,T & elem)264   void sync_priority_queue<T,Container,Cmp>::pull(lock_guard<mutex>&, T& elem)
265   {
266     elem = super::data_.pull();
267   }
268 
269   template <class T,class Container, class Cmp>
pull(T & elem)270   void sync_priority_queue<T,Container,Cmp>::pull(T& elem)
271   {
272     unique_lock<mutex> lk(super::mtx_);
273     const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
274     if (has_been_closed) super::throw_if_closed(lk);
275     pull(lk, elem);
276   }
277 
278   //////////////////////
279   template <class T, class Cont,class Cmp>
280   template <class WClock, class Duration>
281   queue_op_status
pull_until(const chrono::time_point<WClock,Duration> & tp,T & elem)282   sync_priority_queue<T,Cont,Cmp>::pull_until(const chrono::time_point<WClock,Duration>& tp, T& elem)
283   {
284     unique_lock<mutex> lk(super::mtx_);
285     const queue_op_status rc = super::wait_until_not_empty_or_closed_until(lk, tp);
286     if (rc == queue_op_status::success) pull(lk, elem);
287     return rc;
288   }
289 
290   //////////////////////
291   template <class T, class Cont,class Cmp>
292   template <class Rep, class Period>
293   queue_op_status
pull_for(const chrono::duration<Rep,Period> & dura,T & elem)294   sync_priority_queue<T,Cont,Cmp>::pull_for(const chrono::duration<Rep,Period>& dura, T& elem)
295   {
296     return pull_until(chrono::steady_clock::now() + dura, elem);
297   }
298 
299   //////////////////////
300   template <class T, class Container,class Cmp>
301   queue_op_status
try_pull(unique_lock<mutex> & lk,T & elem)302   sync_priority_queue<T,Container,Cmp>::try_pull(unique_lock<mutex>& lk, T& elem)
303   {
304     if (super::empty(lk))
305     {
306       if (super::closed(lk)) return queue_op_status::closed;
307       return queue_op_status::empty;
308     }
309     pull(lk, elem);
310     return queue_op_status::success;
311   }
312 
313   template <class T, class Container,class Cmp>
314   queue_op_status
try_pull(lock_guard<mutex> & lk,T & elem)315   sync_priority_queue<T,Container,Cmp>::try_pull(lock_guard<mutex>& lk, T& elem)
316   {
317     if (super::empty(lk))
318     {
319       if (super::closed(lk)) return queue_op_status::closed;
320       return queue_op_status::empty;
321     }
322     pull(lk, elem);
323     return queue_op_status::success;
324   }
325 
326   template <class T, class Container,class Cmp>
327   queue_op_status
try_pull(T & elem)328   sync_priority_queue<T,Container,Cmp>::try_pull(T& elem)
329   {
330     lock_guard<mutex> lk(super::mtx_);
331     return try_pull(lk, elem);
332   }
333 
334   //////////////////////
335   template <class T,class Container, class Cmp>
wait_pull(unique_lock<mutex> & lk,T & elem)336   queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(unique_lock<mutex>& lk, T& elem)
337   {
338     const bool has_been_closed = super::wait_until_not_empty_or_closed(lk);
339     if (has_been_closed) return queue_op_status::closed;
340     pull(lk, elem);
341     return queue_op_status::success;
342   }
343 
344   template <class T,class Container, class Cmp>
wait_pull(T & elem)345   queue_op_status sync_priority_queue<T,Container,Cmp>::wait_pull(T& elem)
346   {
347     unique_lock<mutex> lk(super::mtx_);
348     return wait_pull(lk, elem);
349   }
350 
351   //////////////////////
352   template <class T,class Container, class Cmp>
nonblocking_pull(T & elem)353   queue_op_status sync_priority_queue<T,Container,Cmp>::nonblocking_pull(T& elem)
354   {
355     unique_lock<mutex> lk(super::mtx_, try_to_lock);
356     if (!lk.owns_lock()) return queue_op_status::busy;
357     return try_pull(lk, elem);
358   }
359 
360 
361 
362 } //end concurrent namespace
363 
364 using concurrent::sync_priority_queue;
365 
366 } //end boost namespace
367 #include <boost/config/abi_suffix.hpp>
368 
369 #endif
370