1 #ifndef BOOST_THREAD_CONCURRENT_QUEUES_SYNC_BOUNDED_QUEUE_HPP
2 #define BOOST_THREAD_CONCURRENT_QUEUES_SYNC_BOUNDED_QUEUE_HPP
3 
4 //////////////////////////////////////////////////////////////////////////////
5 //
6 // (C) Copyright Vicente J. Botet Escriba 2013-2014. Distributed under the Boost
7 // Software License, Version 1.0. (See accompanying file
8 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt)
9 //
10 // See http://www.boost.org/libs/thread for documentation.
11 //
12 //////////////////////////////////////////////////////////////////////////////
13 
14 #include <boost/thread/detail/config.hpp>
15 #include <boost/thread/condition_variable.hpp>
16 #include <boost/thread/mutex.hpp>
17 #include <boost/thread/detail/move.hpp>
18 #include <boost/throw_exception.hpp>
19 #include <boost/thread/concurrent_queues/queue_op_status.hpp>
20 
21 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
22 #include <boost/smart_ptr/shared_ptr.hpp>
23 #include <boost/smart_ptr/make_shared.hpp>
24 #endif
25 #include <boost/config/abi_prefix.hpp>
26 
27 namespace boost
28 {
29 namespace concurrent
30 {
31   template <typename ValueType>
32   class sync_bounded_queue
33   {
34   public:
35     typedef ValueType value_type;
36     typedef std::size_t size_type;
37 
38     // Constructors/Assignment/Destructors
39     BOOST_THREAD_NO_COPYABLE(sync_bounded_queue)
40     explicit sync_bounded_queue(size_type max_elems);
41     template <typename Range>
42     sync_bounded_queue(size_type max_elems, Range range);
43     ~sync_bounded_queue();
44 
45     // Observers
46     inline bool empty() const;
47     inline bool full() const;
48     inline size_type capacity() const;
49     inline size_type size() const;
50     inline bool closed() const;
51 
52     // Modifiers
53     inline void close();
54 
55 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
56     inline void push(const value_type& x);
57     inline void push(BOOST_THREAD_RV_REF(value_type) x);
58     inline bool try_push(const value_type& x);
59     inline bool try_push(BOOST_THREAD_RV_REF(value_type) x);
60     inline bool try_push(no_block_tag, const value_type& x);
61     inline bool try_push(no_block_tag, BOOST_THREAD_RV_REF(value_type) x);
62 #endif
63     inline void push_back(const value_type& x);
64     inline void push_back(BOOST_THREAD_RV_REF(value_type) x);
65     inline queue_op_status try_push_back(const value_type& x);
66     inline queue_op_status try_push_back(BOOST_THREAD_RV_REF(value_type) x);
67     inline queue_op_status nonblocking_push_back(const value_type& x);
68     inline queue_op_status nonblocking_push_back(BOOST_THREAD_RV_REF(value_type) x);
69     inline queue_op_status wait_push_back(const value_type& x);
70     inline queue_op_status wait_push_back(BOOST_THREAD_RV_REF(value_type) x);
71 
72     // Observers/Modifiers
73 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
74     inline void pull(value_type&);
75     // enable_if is_nothrow_copy_movable<value_type>
76     inline value_type pull();
77     inline shared_ptr<ValueType> ptr_pull();
78     inline bool try_pull(value_type&);
79     inline bool try_pull(no_block_tag,value_type&);
80     inline shared_ptr<ValueType> try_pull();
81 #endif
82     inline void pull_front(value_type&);
83     // enable_if is_nothrow_copy_movable<value_type>
84     inline value_type pull_front();
85     inline queue_op_status try_pull_front(value_type&);
86     inline queue_op_status nonblocking_pull_front(value_type&);
87 
88     inline queue_op_status wait_pull_front(ValueType& elem);
89 
90   private:
91     mutable mutex mtx_;
92     condition_variable not_empty_;
93     condition_variable not_full_;
94     size_type waiting_full_;
95     size_type waiting_empty_;
96     value_type* data_;
97     size_type in_;
98     size_type out_;
99     size_type capacity_;
100     bool closed_;
101 
inc(size_type idx) const102     inline size_type inc(size_type idx) const BOOST_NOEXCEPT
103     {
104       return (idx + 1) % capacity_;
105     }
106 
empty(unique_lock<mutex> &) const107     inline bool empty(unique_lock<mutex>& ) const BOOST_NOEXCEPT
108     {
109       return in_ == out_;
110     }
empty(lock_guard<mutex> &) const111     inline bool empty(lock_guard<mutex>& ) const BOOST_NOEXCEPT
112     {
113       return in_ == out_;
114     }
full(unique_lock<mutex> &) const115     inline bool full(unique_lock<mutex>& ) const BOOST_NOEXCEPT
116     {
117       return (inc(in_) == out_);
118     }
full(lock_guard<mutex> &) const119     inline bool full(lock_guard<mutex>& ) const BOOST_NOEXCEPT
120     {
121       return (inc(in_) == out_);
122     }
capacity(lock_guard<mutex> &) const123     inline size_type capacity(lock_guard<mutex>& ) const BOOST_NOEXCEPT
124     {
125       return capacity_-1;
126     }
size(lock_guard<mutex> & lk) const127     inline size_type size(lock_guard<mutex>& lk) const BOOST_NOEXCEPT
128     {
129       if (full(lk)) return capacity(lk);
130       return ((in_+capacity(lk)-out_) % capacity(lk));
131     }
132 
133     inline void throw_if_closed(unique_lock<mutex>&);
134     inline bool closed(unique_lock<mutex>&) const;
135 
136 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
137     inline bool try_pull(value_type& x, unique_lock<mutex>& lk);
138     inline shared_ptr<value_type> try_pull(unique_lock<mutex>& lk);
139     inline bool try_push(const value_type& x, unique_lock<mutex>& lk);
140     inline bool try_push(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk);
141 #endif
142     inline queue_op_status try_pull_front(value_type& x, unique_lock<mutex>& lk);
143     inline queue_op_status try_push_back(const value_type& x, unique_lock<mutex>& lk);
144     inline queue_op_status try_push_back(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk);
145 
146     inline queue_op_status wait_pull_front(value_type& x, unique_lock<mutex>& lk);
147     inline queue_op_status wait_push_back(const value_type& x, unique_lock<mutex>& lk);
148     inline queue_op_status wait_push_back(BOOST_THREAD_RV_REF(value_type) x, unique_lock<mutex>& lk);
149 
150     inline void wait_until_not_empty(unique_lock<mutex>& lk);
151     inline void wait_until_not_empty(unique_lock<mutex>& lk, bool&);
152     inline size_type wait_until_not_full(unique_lock<mutex>& lk);
153     inline size_type wait_until_not_full(unique_lock<mutex>& lk, bool&);
154 
155 
notify_not_empty_if_needed(unique_lock<mutex> & lk)156     inline void notify_not_empty_if_needed(unique_lock<mutex>& lk)
157     {
158       if (waiting_empty_ > 0)
159       {
160         --waiting_empty_;
161         lk.unlock();
162         not_empty_.notify_one();
163       }
164     }
notify_not_full_if_needed(unique_lock<mutex> & lk)165     inline void notify_not_full_if_needed(unique_lock<mutex>& lk)
166     {
167       if (waiting_full_ > 0)
168       {
169         --waiting_full_;
170         lk.unlock();
171         not_full_.notify_one();
172       }
173     }
174 
175 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
pull(value_type & elem,unique_lock<mutex> & lk)176     inline void pull(value_type& elem, unique_lock<mutex>& lk)
177     {
178       elem = boost::move(data_[out_]);
179       out_ = inc(out_);
180       notify_not_full_if_needed(lk);
181     }
pull(unique_lock<mutex> & lk)182     inline value_type pull(unique_lock<mutex>& lk)
183     {
184       value_type elem = boost::move(data_[out_]);
185       out_ = inc(out_);
186       notify_not_full_if_needed(lk);
187       return boost::move(elem);
188     }
ptr_pull(unique_lock<mutex> & lk)189     inline boost::shared_ptr<value_type> ptr_pull(unique_lock<mutex>& lk)
190     {
191       shared_ptr<value_type> res = make_shared<value_type>(boost::move(data_[out_]));
192       out_ = inc(out_);
193       notify_not_full_if_needed(lk);
194       return res;
195     }
196 #endif
pull_front(value_type & elem,unique_lock<mutex> & lk)197     inline void pull_front(value_type& elem, unique_lock<mutex>& lk)
198     {
199       elem = boost::move(data_[out_]);
200       out_ = inc(out_);
201       notify_not_full_if_needed(lk);
202     }
pull_front(unique_lock<mutex> & lk)203     inline value_type pull_front(unique_lock<mutex>& lk)
204     {
205       value_type elem = boost::move(data_[out_]);
206       out_ = inc(out_);
207       notify_not_full_if_needed(lk);
208       return boost::move(elem);
209     }
210 
set_in(size_type in,unique_lock<mutex> & lk)211     inline void set_in(size_type in, unique_lock<mutex>& lk)
212     {
213       in_ = in;
214       notify_not_empty_if_needed(lk);
215     }
216 
push_at(const value_type & elem,size_type in_p_1,unique_lock<mutex> & lk)217     inline void push_at(const value_type& elem, size_type in_p_1, unique_lock<mutex>& lk)
218     {
219       data_[in_] = elem;
220       set_in(in_p_1, lk);
221     }
222 
push_at(BOOST_THREAD_RV_REF (value_type)elem,size_type in_p_1,unique_lock<mutex> & lk)223     inline void push_at(BOOST_THREAD_RV_REF(value_type) elem, size_type in_p_1, unique_lock<mutex>& lk)
224     {
225       data_[in_] = boost::move(elem);
226       set_in(in_p_1, lk);
227     }
228   };
229 
230   template <typename ValueType>
sync_bounded_queue(typename sync_bounded_queue<ValueType>::size_type max_elems)231   sync_bounded_queue<ValueType>::sync_bounded_queue(typename sync_bounded_queue<ValueType>::size_type max_elems) :
232     waiting_full_(0), waiting_empty_(0), data_(new value_type[max_elems + 1]), in_(0), out_(0), capacity_(max_elems + 1),
233         closed_(false)
234   {
235     BOOST_ASSERT_MSG(max_elems >= 1, "number of elements must be > 1");
236   }
237 
238 //  template <typename ValueType>
239 //  template <typename Range>
240 //  sync_bounded_queue<ValueType>::sync_bounded_queue(size_type max_elems, Range range) :
241 //    waiting_full_(0), waiting_empty_(0), data_(new value_type[max_elems + 1]), in_(0), out_(0), capacity_(max_elems + 1),
242 //        closed_(false)
243 //  {
244 //    BOOST_ASSERT_MSG(max_elems >= 1, "number of elements must be > 1");
245 //    BOOST_ASSERT_MSG(max_elems == size(range), "number of elements must match range's size");
246 //    try
247 //    {
248 //      typedef typename Range::iterator iterator_t;
249 //      iterator_t first = boost::begin(range);
250 //      iterator_t end = boost::end(range);
251 //      size_type in = 0;
252 //      for (iterator_t cur = first; cur != end; ++cur, ++in)
253 //      {
254 //        data_[in] = *cur;
255 //      }
256 //      set_in(in);
257 //    }
258 //    catch (...)
259 //    {
260 //      delete[] data_;
261 //    }
262 //  }
263 
264   template <typename ValueType>
~sync_bounded_queue()265   sync_bounded_queue<ValueType>::~sync_bounded_queue()
266   {
267     delete[] data_;
268   }
269 
270   template <typename ValueType>
close()271   void sync_bounded_queue<ValueType>::close()
272   {
273     {
274       lock_guard<mutex> lk(mtx_);
275       closed_ = true;
276     }
277     not_empty_.notify_all();
278     not_full_.notify_all();
279   }
280 
281   template <typename ValueType>
closed() const282   bool sync_bounded_queue<ValueType>::closed() const
283   {
284     lock_guard<mutex> lk(mtx_);
285     return closed_;
286   }
287   template <typename ValueType>
closed(unique_lock<mutex> &) const288   bool sync_bounded_queue<ValueType>::closed(unique_lock<mutex>& ) const
289   {
290     return closed_;
291   }
292 
293   template <typename ValueType>
empty() const294   bool sync_bounded_queue<ValueType>::empty() const
295   {
296     lock_guard<mutex> lk(mtx_);
297     return empty(lk);
298   }
299   template <typename ValueType>
full() const300   bool sync_bounded_queue<ValueType>::full() const
301   {
302     lock_guard<mutex> lk(mtx_);
303     return full(lk);
304   }
305 
306   template <typename ValueType>
capacity() const307   typename sync_bounded_queue<ValueType>::size_type sync_bounded_queue<ValueType>::capacity() const
308   {
309     lock_guard<mutex> lk(mtx_);
310     return capacity(lk);
311   }
312 
313   template <typename ValueType>
size() const314   typename sync_bounded_queue<ValueType>::size_type sync_bounded_queue<ValueType>::size() const
315   {
316     lock_guard<mutex> lk(mtx_);
317     return size(lk);
318   }
319 
320 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
321   template <typename ValueType>
try_pull(ValueType & elem,unique_lock<mutex> & lk)322   bool sync_bounded_queue<ValueType>::try_pull(ValueType& elem, unique_lock<mutex>& lk)
323   {
324     if (empty(lk))
325     {
326       throw_if_closed(lk);
327       return false;
328     }
329     pull(elem, lk);
330     return true;
331   }
332   template <typename ValueType>
try_pull(unique_lock<mutex> & lk)333   shared_ptr<ValueType> sync_bounded_queue<ValueType>::try_pull(unique_lock<mutex>& lk)
334   {
335     if (empty(lk))
336     {
337       throw_if_closed(lk);
338       return shared_ptr<ValueType>();
339     }
340     return ptr_pull(lk);
341   }
342   template <typename ValueType>
try_pull(ValueType & elem)343   bool sync_bounded_queue<ValueType>::try_pull(ValueType& elem)
344   {
345       unique_lock<mutex> lk(mtx_);
346       return try_pull(elem, lk);
347   }
348 #endif
349 
350   template <typename ValueType>
try_pull_front(ValueType & elem,unique_lock<mutex> & lk)351   queue_op_status sync_bounded_queue<ValueType>::try_pull_front(ValueType& elem, unique_lock<mutex>& lk)
352   {
353     if (empty(lk))
354     {
355       if (closed(lk)) return queue_op_status::closed;
356       return queue_op_status::empty;
357     }
358     pull_front(elem, lk);
359     return queue_op_status::success;
360   }
361 
362   template <typename ValueType>
try_pull_front(ValueType & elem)363   queue_op_status sync_bounded_queue<ValueType>::try_pull_front(ValueType& elem)
364   {
365       unique_lock<mutex> lk(mtx_);
366       return try_pull_front(elem, lk);
367   }
368 
369 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
370   template <typename ValueType>
try_pull(no_block_tag,ValueType & elem)371   bool sync_bounded_queue<ValueType>::try_pull(no_block_tag,ValueType& elem)
372   {
373       unique_lock<mutex> lk(mtx_, try_to_lock);
374       if (!lk.owns_lock())
375       {
376         return false;
377       }
378       return try_pull(elem, lk);
379   }
380   template <typename ValueType>
try_pull()381   boost::shared_ptr<ValueType> sync_bounded_queue<ValueType>::try_pull()
382   {
383       unique_lock<mutex> lk(mtx_);
384       return try_pull(lk);
385   }
386 #endif
387 
388   template <typename ValueType>
nonblocking_pull_front(ValueType & elem)389   queue_op_status sync_bounded_queue<ValueType>::nonblocking_pull_front(ValueType& elem)
390   {
391       unique_lock<mutex> lk(mtx_, try_to_lock);
392       if (!lk.owns_lock())
393       {
394         return queue_op_status::busy;
395       }
396       return try_pull_front(elem, lk);
397   }
398 
399   template <typename ValueType>
throw_if_closed(unique_lock<mutex> &)400   void sync_bounded_queue<ValueType>::throw_if_closed(unique_lock<mutex>&)
401   {
402     if (closed_)
403     {
404       BOOST_THROW_EXCEPTION( sync_queue_is_closed() );
405     }
406   }
407 
408   template <typename ValueType>
wait_until_not_empty(unique_lock<mutex> & lk)409   void sync_bounded_queue<ValueType>::wait_until_not_empty(unique_lock<mutex>& lk)
410   {
411     for (;;)
412     {
413       if (out_ != in_) break;
414       throw_if_closed(lk);
415       ++waiting_empty_;
416       not_empty_.wait(lk);
417     }
418   }
419   template <typename ValueType>
wait_until_not_empty(unique_lock<mutex> & lk,bool & closed)420   void sync_bounded_queue<ValueType>::wait_until_not_empty(unique_lock<mutex>& lk, bool & closed)
421   {
422     for (;;)
423     {
424       if (out_ != in_) break;
425       if (closed_) {closed=true; return;}
426       ++waiting_empty_;
427       not_empty_.wait(lk);
428     }
429   }
430 
431 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
432   template <typename ValueType>
pull(ValueType & elem)433   void sync_bounded_queue<ValueType>::pull(ValueType& elem)
434   {
435       unique_lock<mutex> lk(mtx_);
436       wait_until_not_empty(lk);
437       pull(elem, lk);
438   }
439 //  template <typename ValueType>
440 //  void sync_bounded_queue<ValueType>::pull(ValueType& elem, bool & closed)
441 //  {
442 //      unique_lock<mutex> lk(mtx_);
443 //      wait_until_not_empty(lk, closed);
444 //      if (closed) {return;}
445 //      pull(elem, lk);
446 //  }
447 
448   // enable if ValueType is nothrow movable
449   template <typename ValueType>
pull()450   ValueType sync_bounded_queue<ValueType>::pull()
451   {
452       unique_lock<mutex> lk(mtx_);
453       wait_until_not_empty(lk);
454       return pull(lk);
455   }
456   template <typename ValueType>
ptr_pull()457   boost::shared_ptr<ValueType> sync_bounded_queue<ValueType>::ptr_pull()
458   {
459       unique_lock<mutex> lk(mtx_);
460       wait_until_not_empty(lk);
461       return ptr_pull(lk);
462   }
463 
464 #endif
465 
466   template <typename ValueType>
pull_front(ValueType & elem)467   void sync_bounded_queue<ValueType>::pull_front(ValueType& elem)
468   {
469       unique_lock<mutex> lk(mtx_);
470       wait_until_not_empty(lk);
471       pull_front(elem, lk);
472   }
473 
474   // enable if ValueType is nothrow movable
475   template <typename ValueType>
pull_front()476   ValueType sync_bounded_queue<ValueType>::pull_front()
477   {
478       unique_lock<mutex> lk(mtx_);
479       wait_until_not_empty(lk);
480       return pull_front(lk);
481   }
482 
483   template <typename ValueType>
wait_pull_front(ValueType & elem,unique_lock<mutex> & lk)484   queue_op_status sync_bounded_queue<ValueType>::wait_pull_front(ValueType& elem, unique_lock<mutex>& lk)
485   {
486       if (empty(lk) && closed(lk)) {return queue_op_status::closed;}
487       bool is_closed = false;
488       wait_until_not_empty(lk, is_closed);
489       if (is_closed) {return queue_op_status::closed;}
490       pull_front(elem, lk);
491       return queue_op_status::success;
492   }
493   template <typename ValueType>
wait_pull_front(ValueType & elem)494   queue_op_status sync_bounded_queue<ValueType>::wait_pull_front(ValueType& elem)
495   {
496     unique_lock<mutex> lk(mtx_);
497     return wait_pull_front(elem, lk);
498   }
499 
500 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
501   template <typename ValueType>
try_push(const ValueType & elem,unique_lock<mutex> & lk)502   bool sync_bounded_queue<ValueType>::try_push(const ValueType& elem, unique_lock<mutex>& lk)
503   {
504     throw_if_closed(lk);
505     size_type in_p_1 = inc(in_);
506     if (in_p_1 == out_)  // full()
507     {
508       return false;
509     }
510     push_at(elem, in_p_1, lk);
511     return true;
512   }
513   template <typename ValueType>
try_push(const ValueType & elem)514   bool sync_bounded_queue<ValueType>::try_push(const ValueType& elem)
515   {
516       unique_lock<mutex> lk(mtx_);
517       return try_push(elem, lk);
518   }
519 
520 #endif
521 
522   template <typename ValueType>
try_push_back(const ValueType & elem,unique_lock<mutex> & lk)523   queue_op_status sync_bounded_queue<ValueType>::try_push_back(const ValueType& elem, unique_lock<mutex>& lk)
524   {
525     if (closed(lk)) return queue_op_status::closed;
526     size_type in_p_1 = inc(in_);
527     if (in_p_1 == out_)  // full()
528     {
529       return queue_op_status::full;
530     }
531     push_at(elem, in_p_1, lk);
532     return queue_op_status::success;
533   }
534 
535   template <typename ValueType>
try_push_back(const ValueType & elem)536   queue_op_status sync_bounded_queue<ValueType>::try_push_back(const ValueType& elem)
537   {
538     unique_lock<mutex> lk(mtx_);
539     return try_push_back(elem, lk);
540   }
541 
542   template <typename ValueType>
wait_push_back(const ValueType & elem,unique_lock<mutex> & lk)543   queue_op_status sync_bounded_queue<ValueType>::wait_push_back(const ValueType& elem, unique_lock<mutex>& lk)
544   {
545     if (closed(lk)) return queue_op_status::closed;
546     push_at(elem, wait_until_not_full(lk), lk);
547     return queue_op_status::success;
548   }
549   template <typename ValueType>
wait_push_back(const ValueType & elem)550   queue_op_status sync_bounded_queue<ValueType>::wait_push_back(const ValueType& elem)
551   {
552     unique_lock<mutex> lk(mtx_);
553     return wait_push_back(elem, lk);
554   }
555 
556 
557 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
558   template <typename ValueType>
try_push(no_block_tag,const ValueType & elem)559   bool sync_bounded_queue<ValueType>::try_push(no_block_tag, const ValueType& elem)
560   {
561       unique_lock<mutex> lk(mtx_, try_to_lock);
562       if (!lk.owns_lock()) return false;
563       return try_push(elem, lk);
564   }
565 #endif
566 
567   template <typename ValueType>
nonblocking_push_back(const ValueType & elem)568   queue_op_status sync_bounded_queue<ValueType>::nonblocking_push_back(const ValueType& elem)
569   {
570     unique_lock<mutex> lk(mtx_, try_to_lock);
571     if (!lk.owns_lock()) return queue_op_status::busy;
572     return try_push_back(elem, lk);
573   }
574 
575   template <typename ValueType>
wait_until_not_full(unique_lock<mutex> & lk)576   typename sync_bounded_queue<ValueType>::size_type sync_bounded_queue<ValueType>::wait_until_not_full(unique_lock<mutex>& lk)
577   {
578     for (;;)
579     {
580       throw_if_closed(lk);
581       size_type in_p_1 = inc(in_);
582       if (in_p_1 != out_) // ! full()
583       {
584         return in_p_1;
585       }
586       ++waiting_full_;
587       not_full_.wait(lk);
588     }
589   }
590 
591 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
592   template <typename ValueType>
push(const ValueType & elem)593   void sync_bounded_queue<ValueType>::push(const ValueType& elem)
594   {
595       unique_lock<mutex> lk(mtx_);
596       push_at(elem, wait_until_not_full(lk), lk);
597   }
598 #endif
599   template <typename ValueType>
push_back(const ValueType & elem)600   void sync_bounded_queue<ValueType>::push_back(const ValueType& elem)
601   {
602       unique_lock<mutex> lk(mtx_);
603       push_at(elem, wait_until_not_full(lk), lk);
604   }
605 
606 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
607   template <typename ValueType>
try_push(BOOST_THREAD_RV_REF (ValueType)elem,unique_lock<mutex> & lk)608   bool sync_bounded_queue<ValueType>::try_push(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk)
609   {
610     throw_if_closed(lk);
611     size_type in_p_1 = inc(in_);
612     if (in_p_1 == out_) // full()
613     {
614       return false;
615     }
616     push_at(boost::move(elem), in_p_1, lk);
617     return true;
618   }
619 
620   template <typename ValueType>
try_push(BOOST_THREAD_RV_REF (ValueType)elem)621   bool sync_bounded_queue<ValueType>::try_push(BOOST_THREAD_RV_REF(ValueType) elem)
622   {
623       unique_lock<mutex> lk(mtx_);
624       return try_push(boost::move(elem), lk);
625   }
626 #endif
627 
628   template <typename ValueType>
try_push_back(BOOST_THREAD_RV_REF (ValueType)elem,unique_lock<mutex> & lk)629   queue_op_status sync_bounded_queue<ValueType>::try_push_back(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk)
630   {
631     if (closed(lk)) return queue_op_status::closed;
632     size_type in_p_1 = inc(in_);
633     if (in_p_1 == out_) // full()
634     {
635       return queue_op_status::full;
636     }
637     push_at(boost::move(elem), in_p_1, lk);
638     return queue_op_status::success;
639   }
640   template <typename ValueType>
try_push_back(BOOST_THREAD_RV_REF (ValueType)elem)641   queue_op_status sync_bounded_queue<ValueType>::try_push_back(BOOST_THREAD_RV_REF(ValueType) elem)
642   {
643       unique_lock<mutex> lk(mtx_);
644       return try_push_back(boost::move(elem), lk);
645   }
646 
647   template <typename ValueType>
wait_push_back(BOOST_THREAD_RV_REF (ValueType)elem,unique_lock<mutex> & lk)648   queue_op_status sync_bounded_queue<ValueType>::wait_push_back(BOOST_THREAD_RV_REF(ValueType) elem, unique_lock<mutex>& lk)
649   {
650     if (closed(lk)) return queue_op_status::closed;
651     push_at(boost::move(elem), wait_until_not_full(lk), lk);
652     return queue_op_status::success;
653   }
654   template <typename ValueType>
wait_push_back(BOOST_THREAD_RV_REF (ValueType)elem)655   queue_op_status sync_bounded_queue<ValueType>::wait_push_back(BOOST_THREAD_RV_REF(ValueType) elem)
656   {
657       unique_lock<mutex> lk(mtx_);
658       return try_push_back(boost::move(elem), lk);
659   }
660 
661 
662 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
663   template <typename ValueType>
try_push(no_block_tag,BOOST_THREAD_RV_REF (ValueType)elem)664   bool sync_bounded_queue<ValueType>::try_push(no_block_tag, BOOST_THREAD_RV_REF(ValueType) elem)
665   {
666       unique_lock<mutex> lk(mtx_, try_to_lock);
667       if (!lk.owns_lock())
668       {
669         return false;
670       }
671       return try_push(boost::move(elem), lk);
672   }
673 #endif
674   template <typename ValueType>
nonblocking_push_back(BOOST_THREAD_RV_REF (ValueType)elem)675   queue_op_status sync_bounded_queue<ValueType>::nonblocking_push_back(BOOST_THREAD_RV_REF(ValueType) elem)
676   {
677       unique_lock<mutex> lk(mtx_, try_to_lock);
678       if (!lk.owns_lock())
679       {
680         return queue_op_status::busy;
681       }
682       return try_push_back(boost::move(elem), lk);
683   }
684 
685 #ifndef BOOST_THREAD_QUEUE_DEPRECATE_OLD
686   template <typename ValueType>
push(BOOST_THREAD_RV_REF (ValueType)elem)687   void sync_bounded_queue<ValueType>::push(BOOST_THREAD_RV_REF(ValueType) elem)
688   {
689       unique_lock<mutex> lk(mtx_);
690       push_at(boost::move(elem), wait_until_not_full(lk), lk);
691   }
692 #endif
693   template <typename ValueType>
push_back(BOOST_THREAD_RV_REF (ValueType)elem)694   void sync_bounded_queue<ValueType>::push_back(BOOST_THREAD_RV_REF(ValueType) elem)
695   {
696       unique_lock<mutex> lk(mtx_);
697       push_at(boost::move(elem), wait_until_not_full(lk), lk);
698   }
699 
700   template <typename ValueType>
operator <<(sync_bounded_queue<ValueType> & sbq,BOOST_THREAD_RV_REF (ValueType)elem)701   sync_bounded_queue<ValueType>& operator<<(sync_bounded_queue<ValueType>& sbq, BOOST_THREAD_RV_REF(ValueType) elem)
702   {
703     sbq.push_back(boost::move(elem));
704     return sbq;
705   }
706 
707   template <typename ValueType>
operator <<(sync_bounded_queue<ValueType> & sbq,ValueType const & elem)708   sync_bounded_queue<ValueType>& operator<<(sync_bounded_queue<ValueType>& sbq, ValueType const&elem)
709   {
710     sbq.push_back(elem);
711     return sbq;
712   }
713 
714   template <typename ValueType>
operator >>(sync_bounded_queue<ValueType> & sbq,ValueType & elem)715   sync_bounded_queue<ValueType>& operator>>(sync_bounded_queue<ValueType>& sbq, ValueType &elem)
716   {
717     sbq.pull_front(elem);
718     return sbq;
719   }
720 }
721 using concurrent::sync_bounded_queue;
722 
723 }
724 
725 #include <boost/config/abi_suffix.hpp>
726 
727 #endif
728