1 
2 //          Copyright Oliver Kowalke 2013.
3 // Distributed under the Boost Software License, Version 1.0.
4 //    (See accompanying file LICENSE_1_0.txt or copy at
5 //          http://www.boost.org/LICENSE_1_0.txt)
6 
7 #include "boost/fiber/algo/shared_work.hpp"
8 
9 #include <boost/assert.hpp>
10 
11 #include "boost/fiber/type.hpp"
12 
13 #ifdef BOOST_HAS_ABI_HEADERS
14 #  include BOOST_ABI_PREFIX
15 #endif
16 
17 namespace boost {
18 namespace fibers {
19 namespace algo {
20 
21 //[awakened_ws
22 void
awakened(context * ctx)23 shared_work::awakened( context * ctx) noexcept {
24     if ( ctx->is_context( type::pinned_context) ) { /*<
25             recognize when we're passed this thread's main fiber (or an
26             implicit library helper fiber): never put those on the shared
27             queue
28         >*/
29         lqueue_.push_back( * ctx);
30     } else {
31         ctx->detach();
32         std::unique_lock< std::mutex > lk{ rqueue_mtx_ }; /*<
33                 worker fiber, enqueue on shared queue
34             >*/
35         rqueue_.push_back( ctx);
36     }
37 }
38 //]
39 
40 //[pick_next_ws
41 context *
pick_next()42 shared_work::pick_next() noexcept {
43     context * ctx = nullptr;
44     std::unique_lock< std::mutex > lk{ rqueue_mtx_ };
45     if ( ! rqueue_.empty() ) { /*<
46             pop an item from the ready queue
47         >*/
48         ctx = rqueue_.front();
49         rqueue_.pop_front();
50         lk.unlock();
51         BOOST_ASSERT( nullptr != ctx);
52         context::active()->attach( ctx); /*<
53             attach context to current scheduler via the active fiber
54             of this thread
55         >*/
56     } else {
57         lk.unlock();
58         if ( ! lqueue_.empty() ) { /*<
59                 nothing in the ready queue, return main or dispatcher fiber
60             >*/
61             ctx = & lqueue_.front();
62             lqueue_.pop_front();
63         }
64     }
65     return ctx;
66 }
67 //]
68 
69 void
suspend_until(std::chrono::steady_clock::time_point const & time_point)70 shared_work::suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept {
71     if ( suspend_) {
72         if ( (std::chrono::steady_clock::time_point::max)() == time_point) {
73             std::unique_lock< std::mutex > lk{ mtx_ };
74             cnd_.wait( lk, [this](){ return flag_; });
75             flag_ = false;
76         } else {
77             std::unique_lock< std::mutex > lk{ mtx_ };
78             cnd_.wait_until( lk, time_point, [this](){ return flag_; });
79             flag_ = false;
80         }
81     }
82 }
83 
84 void
notify()85 shared_work::notify() noexcept {
86     if ( suspend_) {
87         std::unique_lock< std::mutex > lk{ mtx_ };
88         flag_ = true;
89         lk.unlock();
90         cnd_.notify_all();
91     }
92 }
93 
94 shared_work::rqueue_type shared_work::rqueue_{};
95 std::mutex shared_work::rqueue_mtx_{};
96 
97 }}}
98 
99 #ifdef BOOST_HAS_ABI_HEADERS
100 #  include BOOST_ABI_SUFFIX
101 #endif
102