1 
2 //          Copyright Oliver Kowalke 2013.
3 // Distributed under the Boost Software License, Version 1.0.
4 //    (See accompanying file LICENSE_1_0.txt or copy at
5 //          http://www.boost.org/LICENSE_1_0.txt)
6 //
7 
8 #ifndef BOOST_FIBERS_BOUNDED_CHANNEL_H
9 #define BOOST_FIBERS_BOUNDED_CHANNEL_H
10 
11 #warn "template bounded_channel is deprecated"
12 
13 #include <algorithm>
14 #include <atomic>
15 #include <chrono>
16 #include <cstddef>
17 #include <memory>
18 #include <mutex>
19 #include <system_error>
20 #include <utility>
21 
22 #include <boost/config.hpp>
23 #include <boost/intrusive_ptr.hpp>
24 
25 #include <boost/fiber/detail/config.hpp>
26 #include <boost/fiber/exceptions.hpp>
27 #include <boost/fiber/exceptions.hpp>
28 #include <boost/fiber/condition_variable.hpp>
29 #include <boost/fiber/mutex.hpp>
30 #include <boost/fiber/channel_op_status.hpp>
31 
32 #ifdef BOOST_HAS_ABI_HEADERS
33 #  include BOOST_ABI_PREFIX
34 #endif
35 
36 namespace boost {
37 namespace fibers {
38 
39 template< typename T,
40           typename Allocator = std::allocator< T >
41 >
42 class bounded_channel {
43 public:
44     typedef T   value_type;
45 
46 private:
47     struct node {
48         typedef intrusive_ptr< node >                   ptr_t;
49         typedef typename std::allocator_traits< Allocator >::template rebind_alloc<
50             node
51         >                                               allocator_t;
52         typedef std::allocator_traits< allocator_t >    allocator_traits_t;
53 
54 #if ! defined(BOOST_FIBERS_NO_ATOMICS)
55         std::atomic< std::size_t >  use_count{ 0 };
56 #else
57         std::size_t                 use_count{ 0 };
58 #endif
59         allocator_t                 alloc;
60         T                           va;
61         ptr_t                       nxt{};
62 
nodeboost::fibers::bounded_channel::node63         node( T const& t, allocator_t const& alloc_) noexcept :
64             alloc{ alloc_ },
65             va{ t } {
66         }
67 
nodeboost::fibers::bounded_channel::node68         node( T && t, allocator_t & alloc_) noexcept :
69             alloc{ alloc_ },
70             va{ std::move( t) } {
71         }
72 
73         friend
intrusive_ptr_add_ref(node * p)74         void intrusive_ptr_add_ref( node * p) noexcept {
75             ++p->use_count;
76         }
77 
78         friend
intrusive_ptr_release(node * p)79         void intrusive_ptr_release( node * p) noexcept {
80             if ( 0 == --p->use_count) {
81                 allocator_t alloc( p->alloc);
82                 allocator_traits_t::destroy( alloc, p);
83                 allocator_traits_t::deallocate( alloc, p, 1);
84             }
85         }
86     };
87 
88     using ptr_t = typename node::ptr_t;
89     using allocator_t = typename node::allocator_t;
90     using allocator_traits_t = typename node::allocator_traits_t;
91 
92     enum class queue_status {
93         open = 0,
94         closed
95     };
96 
97     allocator_t         alloc_;
98     queue_status        state_{ queue_status::open };
99     std::size_t         count_{ 0 };
100     ptr_t               head_{};
101     ptr_t           *   tail_;
102     mutable mutex       mtx_{};
103     condition_variable  not_empty_cond_{};
104     condition_variable  not_full_cond_{};
105     std::size_t         hwm_;
106     std::size_t         lwm_;
107 
is_closed_() const108     bool is_closed_() const noexcept {
109         return queue_status::closed == state_;
110     }
111 
close_(std::unique_lock<boost::fibers::mutex> & lk)112     void close_( std::unique_lock< boost::fibers::mutex > & lk) noexcept {
113         state_ = queue_status::closed;
114         lk.unlock();
115         not_empty_cond_.notify_all();
116         not_full_cond_.notify_all();
117     }
118 
size_() const119     std::size_t size_() const noexcept {
120         return count_;
121     }
122 
is_empty_() const123     bool is_empty_() const noexcept {
124         return ! head_;
125     }
126 
is_full_() const127     bool is_full_() const noexcept {
128         return count_ >= hwm_;
129     }
130 
push_(ptr_t new_node,std::unique_lock<boost::fibers::mutex> & lk)131     channel_op_status push_( ptr_t new_node,
132                              std::unique_lock< boost::fibers::mutex > & lk) {
133         if ( is_closed_() ) {
134             return channel_op_status::closed;
135         }
136         not_full_cond_.wait( lk,
137                              [this](){
138                                 return ! is_full_();
139                              });
140         return push_and_notify_( new_node, lk);
141     }
142 
try_push_(ptr_t new_node,std::unique_lock<boost::fibers::mutex> & lk)143     channel_op_status try_push_( ptr_t new_node,
144                                  std::unique_lock< boost::fibers::mutex > & lk) noexcept {
145         if ( is_closed_() ) {
146             return channel_op_status::closed;
147         }
148         if ( is_full_() ) {
149             return channel_op_status::full;
150         }
151         return push_and_notify_( new_node, lk);
152     }
153 
154     template< typename Clock, typename Duration >
push_wait_until_(ptr_t new_node,std::chrono::time_point<Clock,Duration> const & timeout_time,std::unique_lock<boost::fibers::mutex> & lk)155     channel_op_status push_wait_until_( ptr_t new_node,
156                                         std::chrono::time_point< Clock, Duration > const& timeout_time,
157                                         std::unique_lock< boost::fibers::mutex > & lk) {
158         if ( is_closed_() ) {
159             return channel_op_status::closed;
160         }
161         if ( ! not_full_cond_.wait_until( lk, timeout_time,
162                                           [this](){
163                                                return ! is_full_();
164                                           })) {
165             return channel_op_status::timeout;
166         }
167         return push_and_notify_( new_node, lk);
168     }
169 
push_and_notify_(ptr_t new_node,std::unique_lock<boost::fibers::mutex> & lk)170     channel_op_status push_and_notify_( ptr_t new_node,
171                                         std::unique_lock< boost::fibers::mutex > & lk) noexcept {
172         push_tail_( new_node);
173         lk.unlock();
174         not_empty_cond_.notify_one();
175         return channel_op_status::success;
176     }
177 
push_tail_(ptr_t new_node)178     void push_tail_( ptr_t new_node) noexcept {
179         * tail_ = new_node;
180         tail_ = & new_node->nxt;
181         ++count_;
182     }
183 
value_pop_(std::unique_lock<boost::fibers::mutex> & lk)184     value_type value_pop_( std::unique_lock< boost::fibers::mutex > & lk) {
185         BOOST_ASSERT( ! is_empty_() );
186         auto old_head = pop_head_();
187         if ( size_() <= lwm_) {
188             if ( lwm_ == hwm_) {
189                 lk.unlock();
190                 not_full_cond_.notify_one();
191             } else {
192                 lk.unlock();
193                 // more than one producer could be waiting
194                 // to push a value
195                 not_full_cond_.notify_all();
196             }
197         }
198         return std::move( old_head->va);
199     }
200 
pop_head_()201     ptr_t pop_head_() noexcept {
202         auto old_head = head_;
203         head_ = old_head->nxt;
204         if ( ! head_) {
205             tail_ = & head_;
206         }
207         old_head->nxt.reset();
208         --count_;
209         return old_head;
210     }
211 
212 public:
bounded_channel(std::size_t hwm,std::size_t lwm,Allocator const & alloc=Allocator ())213     bounded_channel( std::size_t hwm, std::size_t lwm,
214                      Allocator const& alloc = Allocator() ) :
215         alloc_{ alloc },
216         tail_{ & head_ },
217         hwm_{ hwm },
218         lwm_{ lwm } {
219         if ( hwm_ <= lwm_) {
220             throw fiber_error( std::make_error_code( std::errc::invalid_argument),
221                                "boost fiber: high-watermark is less than or equal to low-watermark for bounded_channel");
222         }
223         if ( 0 == hwm) {
224             throw fiber_error( std::make_error_code( std::errc::invalid_argument),
225                                "boost fiber: high-watermark is zero");
226         }
227     }
228 
bounded_channel(std::size_t wm,Allocator const & alloc=Allocator ())229     bounded_channel( std::size_t wm,
230                      Allocator const& alloc = Allocator() ) :
231         alloc_{ alloc },
232         tail_{ & head_ },
233         hwm_{ wm },
234         lwm_{ wm - 1 } {
235         if ( 0 == wm) {
236             throw fiber_error( std::make_error_code( std::errc::invalid_argument),
237                                "boost fiber: watermark is zero");
238         }
239     }
240 
241     bounded_channel( bounded_channel const&) = delete;
242     bounded_channel & operator=( bounded_channel const&) = delete;
243 
upper_bound() const244     std::size_t upper_bound() const noexcept {
245         return hwm_;
246     }
247 
lower_bound() const248     std::size_t lower_bound() const noexcept {
249         return lwm_;
250     }
251 
close()252     void close() noexcept {
253         std::unique_lock< mutex > lk( mtx_);
254         close_( lk);
255     }
256 
push(value_type const & va)257     channel_op_status push( value_type const& va) {
258         typename allocator_traits_t::pointer ptr{
259             allocator_traits_t::allocate( alloc_, 1) };
260         try {
261             allocator_traits_t::construct( alloc_, ptr, va, alloc_);
262         } catch (...) {
263             allocator_traits_t::deallocate( alloc_, ptr, 1);
264             throw;
265         }
266         std::unique_lock< mutex > lk( mtx_);
267         return push_( { detail::convert( ptr) }, lk);
268     }
269 
push(value_type && va)270     channel_op_status push( value_type && va) {
271         typename allocator_traits_t::pointer ptr{
272             allocator_traits_t::allocate( alloc_, 1) };
273         try {
274             allocator_traits_t::construct(
275                     alloc_, ptr, std::move( va), alloc_);
276         } catch (...) {
277             allocator_traits_t::deallocate( alloc_, ptr, 1);
278             throw;
279         }
280         std::unique_lock< mutex > lk( mtx_);
281         return push_( { detail::convert( ptr) }, lk);
282     }
283 
284     template< typename Rep, typename Period >
push_wait_for(value_type const & va,std::chrono::duration<Rep,Period> const & timeout_duration)285     channel_op_status push_wait_for( value_type const& va,
286                                      std::chrono::duration< Rep, Period > const& timeout_duration) {
287         return push_wait_until( va,
288                                 std::chrono::steady_clock::now() + timeout_duration);
289     }
290 
291     template< typename Rep, typename Period >
push_wait_for(value_type && va,std::chrono::duration<Rep,Period> const & timeout_duration)292     channel_op_status push_wait_for( value_type && va,
293                                      std::chrono::duration< Rep, Period > const& timeout_duration) {
294         return push_wait_until( std::forward< value_type >( va),
295                                 std::chrono::steady_clock::now() + timeout_duration);
296     }
297 
298     template< typename Clock, typename Duration >
push_wait_until(value_type const & va,std::chrono::time_point<Clock,Duration> const & timeout_time)299     channel_op_status push_wait_until( value_type const& va,
300                                        std::chrono::time_point< Clock, Duration > const& timeout_time) {
301         typename allocator_traits_t::pointer ptr{
302             allocator_traits_t::allocate( alloc_, 1) };
303         try {
304             allocator_traits_t::construct( alloc_, ptr, va, alloc_);
305         } catch (...) {
306             allocator_traits_t::deallocate( alloc_, ptr, 1);
307             throw;
308         }
309         std::unique_lock< mutex > lk( mtx_);
310         return push_wait_until_( { detail::convert( ptr) }, timeout_time, lk);
311     }
312 
313     template< typename Clock, typename Duration >
push_wait_until(value_type && va,std::chrono::time_point<Clock,Duration> const & timeout_time)314     channel_op_status push_wait_until( value_type && va,
315                                        std::chrono::time_point< Clock, Duration > const& timeout_time) {
316         typename allocator_traits_t::pointer ptr{
317             allocator_traits_t::allocate( alloc_, 1) };
318         try {
319             allocator_traits_t::construct(
320                     alloc_, ptr, std::move( va), alloc_);
321         } catch (...) {
322             allocator_traits_t::deallocate( alloc_, ptr, 1);
323             throw;
324         }
325         std::unique_lock< mutex > lk( mtx_);
326         return push_wait_until_( { detail::convert( ptr) }, timeout_time, lk);
327     }
328 
try_push(value_type const & va)329     channel_op_status try_push( value_type const& va) {
330         typename allocator_traits_t::pointer ptr{
331             allocator_traits_t::allocate( alloc_, 1) };
332         try {
333             allocator_traits_t::construct( alloc_, ptr, va, alloc_);
334         } catch (...) {
335             allocator_traits_t::deallocate( alloc_, ptr, 1);
336             throw;
337         }
338         std::unique_lock< mutex > lk( mtx_);
339         return try_push_( { detail::convert( ptr) }, lk);
340     }
341 
try_push(value_type && va)342     channel_op_status try_push( value_type && va) {
343         typename allocator_traits_t::pointer ptr{
344             allocator_traits_t::allocate( alloc_, 1) };
345         try {
346             allocator_traits_t::construct(
347                     alloc_, ptr, std::move( va), alloc_);
348         } catch (...) {
349             allocator_traits_t::deallocate( alloc_, ptr, 1);
350             throw;
351         }
352         std::unique_lock< mutex > lk( mtx_);
353         return try_push_( { detail::convert( ptr) }, lk);
354     }
355 
pop(value_type & va)356     channel_op_status pop( value_type & va) {
357         std::unique_lock< mutex > lk( mtx_);
358         not_empty_cond_.wait( lk,
359                               [this](){
360                                 return is_closed_() || ! is_empty_();
361                               });
362         if ( is_closed_() && is_empty_() ) {
363             return channel_op_status::closed;
364         }
365         va = value_pop_( lk);
366         return channel_op_status::success;
367     }
368 
value_pop()369     value_type value_pop() {
370         std::unique_lock< mutex > lk( mtx_);
371         not_empty_cond_.wait( lk,
372                               [this](){
373                                 return is_closed_() || ! is_empty_();
374                               });
375         if ( is_closed_() && is_empty_() ) {
376             throw fiber_error(
377                     std::make_error_code( std::errc::operation_not_permitted),
378                     "boost fiber: queue is closed");
379         }
380         return value_pop_( lk);
381     }
382 
try_pop(value_type & va)383     channel_op_status try_pop( value_type & va) {
384         std::unique_lock< mutex > lk( mtx_);
385         if ( is_closed_() && is_empty_() ) {
386             // let other fibers run
387             lk.unlock();
388             this_fiber::yield();
389             return channel_op_status::closed;
390         }
391         if ( is_empty_() ) {
392             // let other fibers run
393             lk.unlock();
394             this_fiber::yield();
395             return channel_op_status::empty;
396         }
397         va = value_pop_( lk);
398         return channel_op_status::success;
399     }
400 
401     template< typename Rep, typename Period >
pop_wait_for(value_type & va,std::chrono::duration<Rep,Period> const & timeout_duration)402     channel_op_status pop_wait_for( value_type & va,
403                                     std::chrono::duration< Rep, Period > const& timeout_duration) {
404         return pop_wait_until( va,
405                                std::chrono::steady_clock::now() + timeout_duration);
406     }
407 
408     template< typename Clock, typename Duration >
pop_wait_until(value_type & va,std::chrono::time_point<Clock,Duration> const & timeout_time)409     channel_op_status pop_wait_until( value_type & va,
410                                       std::chrono::time_point< Clock, Duration > const& timeout_time) {
411         std::unique_lock< mutex > lk( mtx_);
412         if ( ! not_empty_cond_.wait_until( lk,
413                                            timeout_time,
414                                            [this](){
415                                                 return is_closed_() || ! is_empty_();
416                                            })) {
417             return channel_op_status::timeout;
418         }
419         if ( is_closed_() && is_empty_() ) {
420             return channel_op_status::closed;
421         }
422         va = value_pop_( lk);
423         return channel_op_status::success;
424     }
425 };
426 
427 }}
428 
429 #ifdef BOOST_HAS_ABI_HEADERS
430 #  include BOOST_ABI_SUFFIX
431 #endif
432 
433 #endif // BOOST_FIBERS_BOUNDED_CHANNEL_H
434