1 
2 //          Copyright Oliver Kowalke 2015.
3 // Distributed under the Boost Software License, Version 1.0.
4 //    (See accompanying file LICENSE_1_0.txt or copy at
5 //          http://www.boost.org/LICENSE_1_0.txt)
6 //
7 
8 #include "boost/fiber/numa/algo/work_stealing.hpp"
9 
10 #include <cmath>
11 #include <random>
12 
13 #include <boost/assert.hpp>
14 #include <boost/context/detail/prefetch.hpp>
15 
16 #include "boost/fiber/detail/thread_barrier.hpp"
17 #include "boost/fiber/type.hpp"
18 
19 #ifdef BOOST_HAS_ABI_HEADERS
20 #  include BOOST_ABI_PREFIX
21 #endif
22 
23 namespace boost {
24 namespace fibers {
25 namespace numa {
26 namespace algo {
27 
28 std::vector< intrusive_ptr< work_stealing > > work_stealing::schedulers_{};
29 
get_local_cpus(std::uint32_t node_id,std::vector<boost::fibers::numa::node> const & topo)30 std::vector< std::uint32_t > get_local_cpus( std::uint32_t node_id, std::vector< boost::fibers::numa::node > const& topo) {
31     for ( auto & node : topo) {
32         if ( node_id == node.id) {
33             // store IDs of logical cpus that belong to this local NUMA node
34             return std::vector< std::uint32_t >{ node.logical_cpus.begin(), node.logical_cpus.end() };
35         }
36     }
37     return std::vector< std::uint32_t >{};
38 }
39 
get_remote_cpus(std::uint32_t node_id,std::vector<boost::fibers::numa::node> const & topo)40 std::vector< std::uint32_t > get_remote_cpus( std::uint32_t node_id, std::vector< boost::fibers::numa::node > const& topo) {
41     std::vector< std::uint32_t > remote_cpus;
42     for ( auto & node : topo) {
43         if ( node_id != node.id) {
44             // store IDs of logical cpus that belong to a remote NUMA node
45             // no ordering regarding to the NUMA distance
46             remote_cpus.insert( remote_cpus.end(), node.logical_cpus.begin(), node.logical_cpus.end() );
47         }
48     }
49     return remote_cpus;
50 }
51 
52 void
init_(std::vector<boost::fibers::numa::node> const & topo,std::vector<intrusive_ptr<work_stealing>> & schedulers)53 work_stealing::init_( std::vector< boost::fibers::numa::node > const& topo,
54                       std::vector< intrusive_ptr< work_stealing > > & schedulers) {
55     std::uint32_t max_cpu_id = 0;
56     for ( auto & node : topo) {
57         max_cpu_id = (std::max)( max_cpu_id, * node.logical_cpus.rbegin() );
58     }
59     // resize array of schedulers to max. CPU ID, initilized with nullptr
60     // CPU ID acts as the index in the scheduler array
61     // if a logical cpus is offline, schedulers_ will contain a nullptr
62     // logical cpus index starts at `0` -> add 1
63     std::vector< intrusive_ptr< work_stealing > >{ max_cpu_id + 1, nullptr }.swap( schedulers);
64 }
65 
work_stealing(std::uint32_t cpu_id,std::uint32_t node_id,std::vector<boost::fibers::numa::node> const & topo,bool suspend)66 work_stealing::work_stealing(
67     std::uint32_t cpu_id,
68     std::uint32_t node_id,
69     std::vector< boost::fibers::numa::node > const& topo,
70     bool suspend) :
71         cpu_id_{ cpu_id },
72         local_cpus_{ get_local_cpus( node_id, topo) },
73         remote_cpus_{ get_remote_cpus( node_id, topo) },
74         suspend_{ suspend } {
75     // pin current thread to logical cpu
76     boost::fibers::numa::pin_thread( cpu_id_);
77     // calculate thread count
78     std::size_t thread_count = 0;
79     for ( auto & node : topo) {
80         thread_count += node.logical_cpus.size();
81     }
82     static boost::fibers::detail::thread_barrier b{ thread_count };
83     // initialize the array of schedulers
84     static std::once_flag flag;
85     std::call_once( flag, & work_stealing::init_, topo, std::ref( schedulers_) );
86     // register pointer of this scheduler
87     schedulers_[cpu_id_] = this;
88     b.wait();
89 }
90 
91 void
awakened(context * ctx)92 work_stealing::awakened( context * ctx) noexcept {
93     if ( ! ctx->is_context( type::pinned_context) ) {
94         ctx->detach();
95     }
96     rqueue_.push( ctx);
97 }
98 
99 context *
pick_next()100 work_stealing::pick_next() noexcept {
101     context * victim = rqueue_.pop();
102     if ( nullptr != victim) {
103         boost::context::detail::prefetch_range( victim, sizeof( context) );
104         if ( ! victim->is_context( type::pinned_context) ) {
105             context::active()->attach( victim);
106         }
107     } else {
108         std::uint32_t cpu_id = 0;
109         std::size_t count = 0, size = local_cpus_.size();
110         static thread_local std::minstd_rand generator{ std::random_device{}() };
111         std::uniform_int_distribution< std::uint32_t > local_distribution{
112             0, static_cast< std::uint32_t >( local_cpus_.size() - 1) };
113         std::uniform_int_distribution< std::uint32_t > remote_distribution{
114             0, static_cast< std::uint32_t >( remote_cpus_.size() - 1) };
115         do {
116             do {
117                 ++count;
118                 // random selection of one logical cpu
119                 // that belongs to the local NUMA node
120                 cpu_id = local_cpus_[local_distribution( generator)];
121                 // prevent stealing from own scheduler
122             } while ( cpu_id == cpu_id_);
123             // steal context from other scheduler
124             // schedulers_[cpu_id] should never contain a nullptr
125             BOOST_ASSERT( nullptr != schedulers_[cpu_id]);
126             victim = schedulers_[cpu_id]->steal();
127         } while ( nullptr == victim && count < size);
128         if ( nullptr != victim) {
129             boost::context::detail::prefetch_range( victim, sizeof( context) );
130             BOOST_ASSERT( ! victim->is_context( type::pinned_context) );
131             context::active()->attach( victim);
132         } else if ( ! remote_cpus_.empty() ) {
133             cpu_id = 0;
134             count = 0;
135             size = remote_cpus_.size();
136             do {
137                 ++count;
138                 // random selection of one logical cpu
139                 // that belongs to a remote NUMA node
140                 cpu_id = remote_cpus_[remote_distribution( generator)];
141                 // remote cpu ID should never be equal to local cpu ID
142                 BOOST_ASSERT( cpu_id != cpu_id_);
143                 // schedulers_[cpu_id] should never contain a nullptr
144                 BOOST_ASSERT( nullptr != schedulers_[cpu_id]);
145                 // steal context from other scheduler
146                 victim = schedulers_[cpu_id]->steal();
147             } while ( nullptr == victim && count < size);
148             if ( nullptr != victim) {
149                 boost::context::detail::prefetch_range( victim, sizeof( context) );
150                 BOOST_ASSERT( ! victim->is_context( type::pinned_context) );
151                 // move memory from remote NUMA-node to
152                 // memory of local NUMA-node
153                 context::active()->attach( victim);
154             }
155         }
156     }
157     return victim;
158 }
159 
160 void
suspend_until(std::chrono::steady_clock::time_point const & time_point)161 work_stealing::suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept {
162     if ( suspend_) {
163         if ( (std::chrono::steady_clock::time_point::max)() == time_point) {
164             std::unique_lock< std::mutex > lk{ mtx_ };
165             cnd_.wait( lk, [this](){ return flag_; });
166             flag_ = false;
167         } else {
168             std::unique_lock< std::mutex > lk{ mtx_ };
169             cnd_.wait_until( lk, time_point, [this](){ return flag_; });
170             flag_ = false;
171         }
172     }
173 }
174 
175 void
notify()176 work_stealing::notify() noexcept {
177     if ( suspend_) {
178         std::unique_lock< std::mutex > lk{ mtx_ };
179         flag_ = true;
180         lk.unlock();
181         cnd_.notify_all();
182     }
183 }
184 
185 }}}}
186 
187 #ifdef BOOST_HAS_ABI_HEADERS
188 #  include BOOST_ABI_SUFFIX
189 #endif
190