1 2 // Copyright Oliver Kowalke 2013. 3 // Distributed under the Boost Software License, Version 1.0. 4 // (See accompanying file LICENSE_1_0.txt or copy at 5 // http://www.boost.org/LICENSE_1_0.txt) 6 // 7 8 #ifndef BOOST_FIBERS_BOUNDED_CHANNEL_H 9 #define BOOST_FIBERS_BOUNDED_CHANNEL_H 10 11 #warn "template bounded_channel is deprecated" 12 13 #include <algorithm> 14 #include <atomic> 15 #include <chrono> 16 #include <cstddef> 17 #include <memory> 18 #include <mutex> 19 #include <system_error> 20 #include <utility> 21 22 #include <boost/config.hpp> 23 #include <boost/intrusive_ptr.hpp> 24 25 #include <boost/fiber/detail/config.hpp> 26 #include <boost/fiber/exceptions.hpp> 27 #include <boost/fiber/exceptions.hpp> 28 #include <boost/fiber/condition_variable.hpp> 29 #include <boost/fiber/mutex.hpp> 30 #include <boost/fiber/channel_op_status.hpp> 31 32 #ifdef BOOST_HAS_ABI_HEADERS 33 # include BOOST_ABI_PREFIX 34 #endif 35 36 namespace boost { 37 namespace fibers { 38 39 template< typename T, 40 typename Allocator = std::allocator< T > 41 > 42 class bounded_channel { 43 public: 44 typedef T value_type; 45 46 private: 47 struct node { 48 typedef intrusive_ptr< node > ptr_t; 49 typedef typename std::allocator_traits< Allocator >::template rebind_alloc< 50 node 51 > allocator_t; 52 typedef std::allocator_traits< allocator_t > allocator_traits_t; 53 54 #if ! defined(BOOST_FIBERS_NO_ATOMICS) 55 std::atomic< std::size_t > use_count{ 0 }; 56 #else 57 std::size_t use_count{ 0 }; 58 #endif 59 allocator_t alloc; 60 T va; 61 ptr_t nxt{}; 62 nodeboost::fibers::bounded_channel::node63 node( T const& t, allocator_t const& alloc_) noexcept : 64 alloc{ alloc_ }, 65 va{ t } { 66 } 67 nodeboost::fibers::bounded_channel::node68 node( T && t, allocator_t & alloc_) noexcept : 69 alloc{ alloc_ }, 70 va{ std::move( t) } { 71 } 72 73 friend intrusive_ptr_add_ref(node * p)74 void intrusive_ptr_add_ref( node * p) noexcept { 75 ++p->use_count; 76 } 77 78 friend intrusive_ptr_release(node * p)79 void intrusive_ptr_release( node * p) noexcept { 80 if ( 0 == --p->use_count) { 81 allocator_t alloc( p->alloc); 82 allocator_traits_t::destroy( alloc, p); 83 allocator_traits_t::deallocate( alloc, p, 1); 84 } 85 } 86 }; 87 88 using ptr_t = typename node::ptr_t; 89 using allocator_t = typename node::allocator_t; 90 using allocator_traits_t = typename node::allocator_traits_t; 91 92 enum class queue_status { 93 open = 0, 94 closed 95 }; 96 97 allocator_t alloc_; 98 queue_status state_{ queue_status::open }; 99 std::size_t count_{ 0 }; 100 ptr_t head_{}; 101 ptr_t * tail_; 102 mutable mutex mtx_{}; 103 condition_variable not_empty_cond_{}; 104 condition_variable not_full_cond_{}; 105 std::size_t hwm_; 106 std::size_t lwm_; 107 is_closed_() const108 bool is_closed_() const noexcept { 109 return queue_status::closed == state_; 110 } 111 close_(std::unique_lock<boost::fibers::mutex> & lk)112 void close_( std::unique_lock< boost::fibers::mutex > & lk) noexcept { 113 state_ = queue_status::closed; 114 lk.unlock(); 115 not_empty_cond_.notify_all(); 116 not_full_cond_.notify_all(); 117 } 118 size_() const119 std::size_t size_() const noexcept { 120 return count_; 121 } 122 is_empty_() const123 bool is_empty_() const noexcept { 124 return ! head_; 125 } 126 is_full_() const127 bool is_full_() const noexcept { 128 return count_ >= hwm_; 129 } 130 push_(ptr_t new_node,std::unique_lock<boost::fibers::mutex> & lk)131 channel_op_status push_( ptr_t new_node, 132 std::unique_lock< boost::fibers::mutex > & lk) { 133 if ( is_closed_() ) { 134 return channel_op_status::closed; 135 } 136 not_full_cond_.wait( lk, 137 [this](){ 138 return ! is_full_(); 139 }); 140 return push_and_notify_( new_node, lk); 141 } 142 try_push_(ptr_t new_node,std::unique_lock<boost::fibers::mutex> & lk)143 channel_op_status try_push_( ptr_t new_node, 144 std::unique_lock< boost::fibers::mutex > & lk) noexcept { 145 if ( is_closed_() ) { 146 return channel_op_status::closed; 147 } 148 if ( is_full_() ) { 149 return channel_op_status::full; 150 } 151 return push_and_notify_( new_node, lk); 152 } 153 154 template< typename Clock, typename Duration > push_wait_until_(ptr_t new_node,std::chrono::time_point<Clock,Duration> const & timeout_time,std::unique_lock<boost::fibers::mutex> & lk)155 channel_op_status push_wait_until_( ptr_t new_node, 156 std::chrono::time_point< Clock, Duration > const& timeout_time, 157 std::unique_lock< boost::fibers::mutex > & lk) { 158 if ( is_closed_() ) { 159 return channel_op_status::closed; 160 } 161 if ( ! not_full_cond_.wait_until( lk, timeout_time, 162 [this](){ 163 return ! is_full_(); 164 })) { 165 return channel_op_status::timeout; 166 } 167 return push_and_notify_( new_node, lk); 168 } 169 push_and_notify_(ptr_t new_node,std::unique_lock<boost::fibers::mutex> & lk)170 channel_op_status push_and_notify_( ptr_t new_node, 171 std::unique_lock< boost::fibers::mutex > & lk) noexcept { 172 push_tail_( new_node); 173 lk.unlock(); 174 not_empty_cond_.notify_one(); 175 return channel_op_status::success; 176 } 177 push_tail_(ptr_t new_node)178 void push_tail_( ptr_t new_node) noexcept { 179 * tail_ = new_node; 180 tail_ = & new_node->nxt; 181 ++count_; 182 } 183 value_pop_(std::unique_lock<boost::fibers::mutex> & lk)184 value_type value_pop_( std::unique_lock< boost::fibers::mutex > & lk) { 185 BOOST_ASSERT( ! is_empty_() ); 186 auto old_head = pop_head_(); 187 if ( size_() <= lwm_) { 188 if ( lwm_ == hwm_) { 189 lk.unlock(); 190 not_full_cond_.notify_one(); 191 } else { 192 lk.unlock(); 193 // more than one producer could be waiting 194 // to push a value 195 not_full_cond_.notify_all(); 196 } 197 } 198 return std::move( old_head->va); 199 } 200 pop_head_()201 ptr_t pop_head_() noexcept { 202 auto old_head = head_; 203 head_ = old_head->nxt; 204 if ( ! head_) { 205 tail_ = & head_; 206 } 207 old_head->nxt.reset(); 208 --count_; 209 return old_head; 210 } 211 212 public: bounded_channel(std::size_t hwm,std::size_t lwm,Allocator const & alloc=Allocator ())213 bounded_channel( std::size_t hwm, std::size_t lwm, 214 Allocator const& alloc = Allocator() ) : 215 alloc_{ alloc }, 216 tail_{ & head_ }, 217 hwm_{ hwm }, 218 lwm_{ lwm } { 219 if ( hwm_ <= lwm_) { 220 throw fiber_error( std::make_error_code( std::errc::invalid_argument), 221 "boost fiber: high-watermark is less than or equal to low-watermark for bounded_channel"); 222 } 223 if ( 0 == hwm) { 224 throw fiber_error( std::make_error_code( std::errc::invalid_argument), 225 "boost fiber: high-watermark is zero"); 226 } 227 } 228 bounded_channel(std::size_t wm,Allocator const & alloc=Allocator ())229 bounded_channel( std::size_t wm, 230 Allocator const& alloc = Allocator() ) : 231 alloc_{ alloc }, 232 tail_{ & head_ }, 233 hwm_{ wm }, 234 lwm_{ wm - 1 } { 235 if ( 0 == wm) { 236 throw fiber_error( std::make_error_code( std::errc::invalid_argument), 237 "boost fiber: watermark is zero"); 238 } 239 } 240 241 bounded_channel( bounded_channel const&) = delete; 242 bounded_channel & operator=( bounded_channel const&) = delete; 243 upper_bound() const244 std::size_t upper_bound() const noexcept { 245 return hwm_; 246 } 247 lower_bound() const248 std::size_t lower_bound() const noexcept { 249 return lwm_; 250 } 251 close()252 void close() noexcept { 253 std::unique_lock< mutex > lk( mtx_); 254 close_( lk); 255 } 256 push(value_type const & va)257 channel_op_status push( value_type const& va) { 258 typename allocator_traits_t::pointer ptr{ 259 allocator_traits_t::allocate( alloc_, 1) }; 260 try { 261 allocator_traits_t::construct( alloc_, ptr, va, alloc_); 262 } catch (...) { 263 allocator_traits_t::deallocate( alloc_, ptr, 1); 264 throw; 265 } 266 std::unique_lock< mutex > lk( mtx_); 267 return push_( { detail::convert( ptr) }, lk); 268 } 269 push(value_type && va)270 channel_op_status push( value_type && va) { 271 typename allocator_traits_t::pointer ptr{ 272 allocator_traits_t::allocate( alloc_, 1) }; 273 try { 274 allocator_traits_t::construct( 275 alloc_, ptr, std::move( va), alloc_); 276 } catch (...) { 277 allocator_traits_t::deallocate( alloc_, ptr, 1); 278 throw; 279 } 280 std::unique_lock< mutex > lk( mtx_); 281 return push_( { detail::convert( ptr) }, lk); 282 } 283 284 template< typename Rep, typename Period > push_wait_for(value_type const & va,std::chrono::duration<Rep,Period> const & timeout_duration)285 channel_op_status push_wait_for( value_type const& va, 286 std::chrono::duration< Rep, Period > const& timeout_duration) { 287 return push_wait_until( va, 288 std::chrono::steady_clock::now() + timeout_duration); 289 } 290 291 template< typename Rep, typename Period > push_wait_for(value_type && va,std::chrono::duration<Rep,Period> const & timeout_duration)292 channel_op_status push_wait_for( value_type && va, 293 std::chrono::duration< Rep, Period > const& timeout_duration) { 294 return push_wait_until( std::forward< value_type >( va), 295 std::chrono::steady_clock::now() + timeout_duration); 296 } 297 298 template< typename Clock, typename Duration > push_wait_until(value_type const & va,std::chrono::time_point<Clock,Duration> const & timeout_time)299 channel_op_status push_wait_until( value_type const& va, 300 std::chrono::time_point< Clock, Duration > const& timeout_time) { 301 typename allocator_traits_t::pointer ptr{ 302 allocator_traits_t::allocate( alloc_, 1) }; 303 try { 304 allocator_traits_t::construct( alloc_, ptr, va, alloc_); 305 } catch (...) { 306 allocator_traits_t::deallocate( alloc_, ptr, 1); 307 throw; 308 } 309 std::unique_lock< mutex > lk( mtx_); 310 return push_wait_until_( { detail::convert( ptr) }, timeout_time, lk); 311 } 312 313 template< typename Clock, typename Duration > push_wait_until(value_type && va,std::chrono::time_point<Clock,Duration> const & timeout_time)314 channel_op_status push_wait_until( value_type && va, 315 std::chrono::time_point< Clock, Duration > const& timeout_time) { 316 typename allocator_traits_t::pointer ptr{ 317 allocator_traits_t::allocate( alloc_, 1) }; 318 try { 319 allocator_traits_t::construct( 320 alloc_, ptr, std::move( va), alloc_); 321 } catch (...) { 322 allocator_traits_t::deallocate( alloc_, ptr, 1); 323 throw; 324 } 325 std::unique_lock< mutex > lk( mtx_); 326 return push_wait_until_( { detail::convert( ptr) }, timeout_time, lk); 327 } 328 try_push(value_type const & va)329 channel_op_status try_push( value_type const& va) { 330 typename allocator_traits_t::pointer ptr{ 331 allocator_traits_t::allocate( alloc_, 1) }; 332 try { 333 allocator_traits_t::construct( alloc_, ptr, va, alloc_); 334 } catch (...) { 335 allocator_traits_t::deallocate( alloc_, ptr, 1); 336 throw; 337 } 338 std::unique_lock< mutex > lk( mtx_); 339 return try_push_( { detail::convert( ptr) }, lk); 340 } 341 try_push(value_type && va)342 channel_op_status try_push( value_type && va) { 343 typename allocator_traits_t::pointer ptr{ 344 allocator_traits_t::allocate( alloc_, 1) }; 345 try { 346 allocator_traits_t::construct( 347 alloc_, ptr, std::move( va), alloc_); 348 } catch (...) { 349 allocator_traits_t::deallocate( alloc_, ptr, 1); 350 throw; 351 } 352 std::unique_lock< mutex > lk( mtx_); 353 return try_push_( { detail::convert( ptr) }, lk); 354 } 355 pop(value_type & va)356 channel_op_status pop( value_type & va) { 357 std::unique_lock< mutex > lk( mtx_); 358 not_empty_cond_.wait( lk, 359 [this](){ 360 return is_closed_() || ! is_empty_(); 361 }); 362 if ( is_closed_() && is_empty_() ) { 363 return channel_op_status::closed; 364 } 365 va = value_pop_( lk); 366 return channel_op_status::success; 367 } 368 value_pop()369 value_type value_pop() { 370 std::unique_lock< mutex > lk( mtx_); 371 not_empty_cond_.wait( lk, 372 [this](){ 373 return is_closed_() || ! is_empty_(); 374 }); 375 if ( is_closed_() && is_empty_() ) { 376 throw fiber_error( 377 std::make_error_code( std::errc::operation_not_permitted), 378 "boost fiber: queue is closed"); 379 } 380 return value_pop_( lk); 381 } 382 try_pop(value_type & va)383 channel_op_status try_pop( value_type & va) { 384 std::unique_lock< mutex > lk( mtx_); 385 if ( is_closed_() && is_empty_() ) { 386 // let other fibers run 387 lk.unlock(); 388 this_fiber::yield(); 389 return channel_op_status::closed; 390 } 391 if ( is_empty_() ) { 392 // let other fibers run 393 lk.unlock(); 394 this_fiber::yield(); 395 return channel_op_status::empty; 396 } 397 va = value_pop_( lk); 398 return channel_op_status::success; 399 } 400 401 template< typename Rep, typename Period > pop_wait_for(value_type & va,std::chrono::duration<Rep,Period> const & timeout_duration)402 channel_op_status pop_wait_for( value_type & va, 403 std::chrono::duration< Rep, Period > const& timeout_duration) { 404 return pop_wait_until( va, 405 std::chrono::steady_clock::now() + timeout_duration); 406 } 407 408 template< typename Clock, typename Duration > pop_wait_until(value_type & va,std::chrono::time_point<Clock,Duration> const & timeout_time)409 channel_op_status pop_wait_until( value_type & va, 410 std::chrono::time_point< Clock, Duration > const& timeout_time) { 411 std::unique_lock< mutex > lk( mtx_); 412 if ( ! not_empty_cond_.wait_until( lk, 413 timeout_time, 414 [this](){ 415 return is_closed_() || ! is_empty_(); 416 })) { 417 return channel_op_status::timeout; 418 } 419 if ( is_closed_() && is_empty_() ) { 420 return channel_op_status::closed; 421 } 422 va = value_pop_( lk); 423 return channel_op_status::success; 424 } 425 }; 426 427 }} 428 429 #ifdef BOOST_HAS_ABI_HEADERS 430 # include BOOST_ABI_SUFFIX 431 #endif 432 433 #endif // BOOST_FIBERS_BOUNDED_CHANNEL_H 434