1 #ifndef BOOST_THREAD_CONCURRENT_QUEUES_DETAIL_SYNC_QUEUE_BASE_HPP 2 #define BOOST_THREAD_CONCURRENT_QUEUES_DETAIL_SYNC_QUEUE_BASE_HPP 3 4 ////////////////////////////////////////////////////////////////////////////// 5 // 6 // (C) Copyright Vicente J. Botet Escriba 2013-2017. Distributed under the Boost 7 // Software License, Version 1.0. (See accompanying file 8 // LICENSE_1_0.txt or copy at http://www.boost.org/LICENSE_1_0.txt) 9 // 10 // See http://www.boost.org/libs/thread for documentation. 11 // 12 ////////////////////////////////////////////////////////////////////////////// 13 14 #include <boost/thread/detail/config.hpp> 15 #include <boost/thread/condition_variable.hpp> 16 #include <boost/thread/detail/move.hpp> 17 #include <boost/thread/mutex.hpp> 18 #include <boost/thread/concurrent_queues/queue_op_status.hpp> 19 20 #include <boost/chrono/time_point.hpp> 21 #include <boost/throw_exception.hpp> 22 23 #include <boost/config/abi_prefix.hpp> 24 25 namespace boost 26 { 27 namespace concurrent 28 { 29 namespace detail 30 { 31 32 template <class ValueType, class Queue> 33 class sync_queue_base 34 { 35 public: 36 typedef ValueType value_type; 37 typedef Queue underlying_queue_type; 38 typedef typename Queue::size_type size_type; 39 typedef queue_op_status op_status; 40 41 // Constructors/Assignment/Destructors 42 BOOST_THREAD_NO_COPYABLE(sync_queue_base) 43 inline sync_queue_base(); 44 //template <typename Range> 45 //inline explicit sync_queue(Range range); 46 inline ~sync_queue_base(); 47 48 // Observers 49 inline bool empty() const; 50 inline bool full() const; 51 inline size_type size() const; 52 inline bool closed() const; 53 54 // Modifiers 55 inline void close(); 56 underlying_queue()57 inline underlying_queue_type underlying_queue() { 58 lock_guard<mutex> lk(mtx_); 59 return boost::move(data_); 60 } 61 62 protected: 63 mutable mutex mtx_; 64 condition_variable not_empty_; 65 underlying_queue_type data_; 66 bool closed_; 67 empty(unique_lock<mutex> &) const68 inline bool empty(unique_lock<mutex>& ) const BOOST_NOEXCEPT 69 { 70 return data_.empty(); 71 } empty(lock_guard<mutex> &) const72 inline bool empty(lock_guard<mutex>& ) const BOOST_NOEXCEPT 73 { 74 return data_.empty(); 75 } 76 size(lock_guard<mutex> &) const77 inline size_type size(lock_guard<mutex>& ) const BOOST_NOEXCEPT 78 { 79 return data_.size(); 80 } 81 inline bool closed(unique_lock<mutex>& lk) const; 82 inline bool closed(lock_guard<mutex>& lk) const; 83 84 inline void throw_if_closed(unique_lock<mutex>&); 85 inline void throw_if_closed(lock_guard<mutex>&); 86 87 inline void wait_until_not_empty(unique_lock<mutex>& lk); 88 inline bool wait_until_not_empty_or_closed(unique_lock<mutex>& lk); 89 template <class WClock, class Duration> 90 queue_op_status wait_until_not_empty_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&); 91 notify_not_empty_if_needed(unique_lock<mutex> &)92 inline void notify_not_empty_if_needed(unique_lock<mutex>& ) 93 { 94 not_empty_.notify_one(); 95 } notify_not_empty_if_needed(lock_guard<mutex> &)96 inline void notify_not_empty_if_needed(lock_guard<mutex>& ) 97 { 98 not_empty_.notify_one(); 99 } 100 101 }; 102 103 template <class ValueType, class Queue> sync_queue_base()104 sync_queue_base<ValueType, Queue>::sync_queue_base() : 105 data_(), closed_(false) 106 { 107 BOOST_ASSERT(data_.empty()); 108 } 109 110 template <class ValueType, class Queue> ~sync_queue_base()111 sync_queue_base<ValueType, Queue>::~sync_queue_base() 112 { 113 } 114 115 template <class ValueType, class Queue> close()116 void sync_queue_base<ValueType, Queue>::close() 117 { 118 { 119 lock_guard<mutex> lk(mtx_); 120 closed_ = true; 121 } 122 not_empty_.notify_all(); 123 } 124 125 template <class ValueType, class Queue> closed() const126 bool sync_queue_base<ValueType, Queue>::closed() const 127 { 128 lock_guard<mutex> lk(mtx_); 129 return closed(lk); 130 } 131 template <class ValueType, class Queue> closed(unique_lock<mutex> &) const132 bool sync_queue_base<ValueType, Queue>::closed(unique_lock<mutex>&) const 133 { 134 return closed_; 135 } 136 template <class ValueType, class Queue> closed(lock_guard<mutex> &) const137 bool sync_queue_base<ValueType, Queue>::closed(lock_guard<mutex>&) const 138 { 139 return closed_; 140 } 141 142 template <class ValueType, class Queue> empty() const143 bool sync_queue_base<ValueType, Queue>::empty() const 144 { 145 lock_guard<mutex> lk(mtx_); 146 return empty(lk); 147 } 148 template <class ValueType, class Queue> full() const149 bool sync_queue_base<ValueType, Queue>::full() const 150 { 151 return false; 152 } 153 154 template <class ValueType, class Queue> size() const155 typename sync_queue_base<ValueType, Queue>::size_type sync_queue_base<ValueType, Queue>::size() const 156 { 157 lock_guard<mutex> lk(mtx_); 158 return size(lk); 159 } 160 161 template <class ValueType, class Queue> throw_if_closed(unique_lock<mutex> & lk)162 void sync_queue_base<ValueType, Queue>::throw_if_closed(unique_lock<mutex>& lk) 163 { 164 if (closed(lk)) 165 { 166 BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); 167 } 168 } 169 template <class ValueType, class Queue> throw_if_closed(lock_guard<mutex> & lk)170 void sync_queue_base<ValueType, Queue>::throw_if_closed(lock_guard<mutex>& lk) 171 { 172 if (closed(lk)) 173 { 174 BOOST_THROW_EXCEPTION( sync_queue_is_closed() ); 175 } 176 } 177 178 template <class ValueType, class Queue> wait_until_not_empty(unique_lock<mutex> & lk)179 void sync_queue_base<ValueType, Queue>::wait_until_not_empty(unique_lock<mutex>& lk) 180 { 181 for (;;) 182 { 183 if (! empty(lk)) break; 184 throw_if_closed(lk); 185 not_empty_.wait(lk); 186 } 187 } 188 template <class ValueType, class Queue> wait_until_not_empty_or_closed(unique_lock<mutex> & lk)189 bool sync_queue_base<ValueType, Queue>::wait_until_not_empty_or_closed(unique_lock<mutex>& lk) 190 { 191 for (;;) 192 { 193 if (! empty(lk)) break; 194 if (closed(lk)) return true; 195 not_empty_.wait(lk); 196 } 197 return false; 198 } 199 200 template <class ValueType, class Queue> 201 template <class WClock, class Duration> wait_until_not_empty_until(unique_lock<mutex> & lk,chrono::time_point<WClock,Duration> const & tp)202 queue_op_status sync_queue_base<ValueType, Queue>::wait_until_not_empty_until(unique_lock<mutex>& lk, chrono::time_point<WClock,Duration> const&tp) 203 { 204 for (;;) 205 { 206 if (! empty(lk)) return queue_op_status::success; 207 throw_if_closed(lk); 208 if (not_empty_.wait_until(lk, tp) == cv_status::timeout ) return queue_op_status::timeout; 209 } 210 } 211 212 213 } // detail 214 } // concurrent 215 } // boost 216 217 #include <boost/config/abi_suffix.hpp> 218 219 #endif 220