1 
2 //          Copyright Oliver Kowalke 2015.
3 // Distributed under the Boost Software License, Version 1.0.
4 //    (See accompanying file LICENSE_1_0.txt or copy at
5 //          http://www.boost.org/LICENSE_1_0.txt)
6 //
7 
8 #include "boost/fiber/algo/work_stealing.hpp"
9 
10 #include <random>
11 
12 #include <boost/assert.hpp>
13 #include <boost/context/detail/prefetch.hpp>
14 
15 #include "boost/fiber/detail/thread_barrier.hpp"
16 #include "boost/fiber/type.hpp"
17 
18 #ifdef BOOST_HAS_ABI_HEADERS
19 #  include BOOST_ABI_PREFIX
20 #endif
21 
22 namespace boost {
23 namespace fibers {
24 namespace algo {
25 
26 std::atomic< std::uint32_t > work_stealing::counter_{ 0 };
27 std::vector< intrusive_ptr< work_stealing > > work_stealing::schedulers_{};
28 
29 void
init_(std::uint32_t thread_count,std::vector<intrusive_ptr<work_stealing>> & schedulers)30 work_stealing::init_( std::uint32_t thread_count,
31                       std::vector< intrusive_ptr< work_stealing > > & schedulers) {
32     // resize array of schedulers to thread_count, initilized with nullptr
33     std::vector< intrusive_ptr< work_stealing > >{ thread_count, nullptr }.swap( schedulers);
34 }
35 
work_stealing(std::uint32_t thread_count,bool suspend)36 work_stealing::work_stealing( std::uint32_t thread_count, bool suspend) :
37         id_{ counter_++ },
38         thread_count_{ thread_count },
39         suspend_{ suspend } {
40     static boost::fibers::detail::thread_barrier b{ thread_count };
41     // initialize the array of schedulers
42     static std::once_flag flag;
43     std::call_once( flag, & work_stealing::init_, thread_count_, std::ref( schedulers_) );
44     // register pointer of this scheduler
45     schedulers_[id_] = this;
46     b.wait();
47 }
48 
49 void
awakened(context * ctx)50 work_stealing::awakened( context * ctx) noexcept {
51     if ( ! ctx->is_context( type::pinned_context) ) {
52         ctx->detach();
53     }
54     rqueue_.push( ctx);
55 }
56 
57 context *
pick_next()58 work_stealing::pick_next() noexcept {
59     context * victim = rqueue_.pop();
60     if ( nullptr != victim) {
61         boost::context::detail::prefetch_range( victim, sizeof( context) );
62         if ( ! victim->is_context( type::pinned_context) ) {
63             context::active()->attach( victim);
64         }
65     } else {
66         std::uint32_t id = 0;
67         std::size_t count = 0, size = schedulers_.size();
68         static thread_local std::minstd_rand generator{ std::random_device{}() };
69         std::uniform_int_distribution< std::uint32_t > distribution{
70             0, static_cast< std::uint32_t >( thread_count_ - 1) };
71         do {
72             do {
73                 ++count;
74                 // random selection of one logical cpu
75                 // that belongs to the local NUMA node
76                 id = distribution( generator);
77                 // prevent stealing from own scheduler
78             } while ( id == id_);
79             // steal context from other scheduler
80             victim = schedulers_[id]->steal();
81         } while ( nullptr == victim && count < size);
82         if ( nullptr != victim) {
83             boost::context::detail::prefetch_range( victim, sizeof( context) );
84             BOOST_ASSERT( ! victim->is_context( type::pinned_context) );
85             context::active()->attach( victim);
86         }
87     }
88     return victim;
89 }
90 
91 void
suspend_until(std::chrono::steady_clock::time_point const & time_point)92 work_stealing::suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept {
93     if ( suspend_) {
94         if ( (std::chrono::steady_clock::time_point::max)() == time_point) {
95             std::unique_lock< std::mutex > lk{ mtx_ };
96             cnd_.wait( lk, [this](){ return flag_; });
97             flag_ = false;
98         } else {
99             std::unique_lock< std::mutex > lk{ mtx_ };
100             cnd_.wait_until( lk, time_point, [this](){ return flag_; });
101             flag_ = false;
102         }
103     }
104 }
105 
106 void
notify()107 work_stealing::notify() noexcept {
108     if ( suspend_) {
109         std::unique_lock< std::mutex > lk{ mtx_ };
110         flag_ = true;
111         lk.unlock();
112         cnd_.notify_all();
113     }
114 }
115 
116 }}}
117 
118 #ifdef BOOST_HAS_ABI_HEADERS
119 #  include BOOST_ABI_SUFFIX
120 #endif
121