1
2 // Copyright Oliver Kowalke 2015.
3 // Distributed under the Boost Software License, Version 1.0.
4 // (See accompanying file LICENSE_1_0.txt or copy at
5 // http://www.boost.org/LICENSE_1_0.txt)
6 //
7
8 #include "boost/fiber/numa/algo/work_stealing.hpp"
9
10 #include <cmath>
11 #include <random>
12
13 #include <boost/assert.hpp>
14 #include <boost/context/detail/prefetch.hpp>
15
16 #include "boost/fiber/detail/thread_barrier.hpp"
17 #include "boost/fiber/type.hpp"
18
19 #ifdef BOOST_HAS_ABI_HEADERS
20 # include BOOST_ABI_PREFIX
21 #endif
22
23 namespace boost {
24 namespace fibers {
25 namespace numa {
26 namespace algo {
27
28 std::vector< intrusive_ptr< work_stealing > > work_stealing::schedulers_{};
29
get_local_cpus(std::uint32_t node_id,std::vector<boost::fibers::numa::node> const & topo)30 std::vector< std::uint32_t > get_local_cpus( std::uint32_t node_id, std::vector< boost::fibers::numa::node > const& topo) {
31 for ( auto & node : topo) {
32 if ( node_id == node.id) {
33 // store IDs of logical cpus that belong to this local NUMA node
34 return std::vector< std::uint32_t >{ node.logical_cpus.begin(), node.logical_cpus.end() };
35 }
36 }
37 return std::vector< std::uint32_t >{};
38 }
39
get_remote_cpus(std::uint32_t node_id,std::vector<boost::fibers::numa::node> const & topo)40 std::vector< std::uint32_t > get_remote_cpus( std::uint32_t node_id, std::vector< boost::fibers::numa::node > const& topo) {
41 std::vector< std::uint32_t > remote_cpus;
42 for ( auto & node : topo) {
43 if ( node_id != node.id) {
44 // store IDs of logical cpus that belong to a remote NUMA node
45 // no ordering regarding to the NUMA distance
46 remote_cpus.insert( remote_cpus.end(), node.logical_cpus.begin(), node.logical_cpus.end() );
47 }
48 }
49 return remote_cpus;
50 }
51
52 void
init_(std::vector<boost::fibers::numa::node> const & topo,std::vector<intrusive_ptr<work_stealing>> & schedulers)53 work_stealing::init_( std::vector< boost::fibers::numa::node > const& topo,
54 std::vector< intrusive_ptr< work_stealing > > & schedulers) {
55 std::uint32_t max_cpu_id = 0;
56 for ( auto & node : topo) {
57 max_cpu_id = (std::max)( max_cpu_id, * node.logical_cpus.rbegin() );
58 }
59 // resize array of schedulers to max. CPU ID, initilized with nullptr
60 // CPU ID acts as the index in the scheduler array
61 // if a logical cpus is offline, schedulers_ will contain a nullptr
62 // logical cpus index starts at `0` -> add 1
63 std::vector< intrusive_ptr< work_stealing > >{ max_cpu_id + 1, nullptr }.swap( schedulers);
64 }
65
work_stealing(std::uint32_t cpu_id,std::uint32_t node_id,std::vector<boost::fibers::numa::node> const & topo,bool suspend)66 work_stealing::work_stealing(
67 std::uint32_t cpu_id,
68 std::uint32_t node_id,
69 std::vector< boost::fibers::numa::node > const& topo,
70 bool suspend) :
71 cpu_id_{ cpu_id },
72 local_cpus_{ get_local_cpus( node_id, topo) },
73 remote_cpus_{ get_remote_cpus( node_id, topo) },
74 suspend_{ suspend } {
75 // pin current thread to logical cpu
76 boost::fibers::numa::pin_thread( cpu_id_);
77 // calculate thread count
78 std::size_t thread_count = 0;
79 for ( auto & node : topo) {
80 thread_count += node.logical_cpus.size();
81 }
82 static boost::fibers::detail::thread_barrier b{ thread_count };
83 // initialize the array of schedulers
84 static std::once_flag flag;
85 std::call_once( flag, & work_stealing::init_, topo, std::ref( schedulers_) );
86 // register pointer of this scheduler
87 schedulers_[cpu_id_] = this;
88 b.wait();
89 }
90
91 void
awakened(context * ctx)92 work_stealing::awakened( context * ctx) noexcept {
93 if ( ! ctx->is_context( type::pinned_context) ) {
94 ctx->detach();
95 }
96 rqueue_.push( ctx);
97 }
98
99 context *
pick_next()100 work_stealing::pick_next() noexcept {
101 context * victim = rqueue_.pop();
102 if ( nullptr != victim) {
103 boost::context::detail::prefetch_range( victim, sizeof( context) );
104 if ( ! victim->is_context( type::pinned_context) ) {
105 context::active()->attach( victim);
106 }
107 } else {
108 std::uint32_t cpu_id = 0;
109 std::size_t count = 0, size = local_cpus_.size();
110 static thread_local std::minstd_rand generator{ std::random_device{}() };
111 std::uniform_int_distribution< std::uint32_t > local_distribution{
112 0, static_cast< std::uint32_t >( local_cpus_.size() - 1) };
113 std::uniform_int_distribution< std::uint32_t > remote_distribution{
114 0, static_cast< std::uint32_t >( remote_cpus_.size() - 1) };
115 do {
116 do {
117 ++count;
118 // random selection of one logical cpu
119 // that belongs to the local NUMA node
120 cpu_id = local_cpus_[local_distribution( generator)];
121 // prevent stealing from own scheduler
122 } while ( cpu_id == cpu_id_);
123 // steal context from other scheduler
124 // schedulers_[cpu_id] should never contain a nullptr
125 BOOST_ASSERT( nullptr != schedulers_[cpu_id]);
126 victim = schedulers_[cpu_id]->steal();
127 } while ( nullptr == victim && count < size);
128 if ( nullptr != victim) {
129 boost::context::detail::prefetch_range( victim, sizeof( context) );
130 BOOST_ASSERT( ! victim->is_context( type::pinned_context) );
131 context::active()->attach( victim);
132 } else if ( ! remote_cpus_.empty() ) {
133 cpu_id = 0;
134 count = 0;
135 size = remote_cpus_.size();
136 do {
137 ++count;
138 // random selection of one logical cpu
139 // that belongs to a remote NUMA node
140 cpu_id = remote_cpus_[remote_distribution( generator)];
141 // remote cpu ID should never be equal to local cpu ID
142 BOOST_ASSERT( cpu_id != cpu_id_);
143 // schedulers_[cpu_id] should never contain a nullptr
144 BOOST_ASSERT( nullptr != schedulers_[cpu_id]);
145 // steal context from other scheduler
146 victim = schedulers_[cpu_id]->steal();
147 } while ( nullptr == victim && count < size);
148 if ( nullptr != victim) {
149 boost::context::detail::prefetch_range( victim, sizeof( context) );
150 BOOST_ASSERT( ! victim->is_context( type::pinned_context) );
151 // move memory from remote NUMA-node to
152 // memory of local NUMA-node
153 context::active()->attach( victim);
154 }
155 }
156 }
157 return victim;
158 }
159
160 void
suspend_until(std::chrono::steady_clock::time_point const & time_point)161 work_stealing::suspend_until( std::chrono::steady_clock::time_point const& time_point) noexcept {
162 if ( suspend_) {
163 if ( (std::chrono::steady_clock::time_point::max)() == time_point) {
164 std::unique_lock< std::mutex > lk{ mtx_ };
165 cnd_.wait( lk, [this](){ return flag_; });
166 flag_ = false;
167 } else {
168 std::unique_lock< std::mutex > lk{ mtx_ };
169 cnd_.wait_until( lk, time_point, [this](){ return flag_; });
170 flag_ = false;
171 }
172 }
173 }
174
175 void
notify()176 work_stealing::notify() noexcept {
177 if ( suspend_) {
178 std::unique_lock< std::mutex > lk{ mtx_ };
179 flag_ = true;
180 lk.unlock();
181 cnd_.notify_all();
182 }
183 }
184
185 }}}}
186
187 #ifdef BOOST_HAS_ABI_HEADERS
188 # include BOOST_ABI_SUFFIX
189 #endif
190