1 //! Coordinates idling workers
2 
3 use crate::loom::sync::atomic::AtomicUsize;
4 use crate::loom::sync::Mutex;
5 
6 use std::fmt;
7 use std::sync::atomic::Ordering::{self, SeqCst};
8 
9 pub(super) struct Idle {
10     /// Tracks both the number of searching workers and the number of unparked
11     /// workers.
12     ///
13     /// Used as a fast-path to avoid acquiring the lock when needed.
14     state: AtomicUsize,
15 
16     /// Sleeping workers
17     sleepers: Mutex<Vec<usize>>,
18 
19     /// Total number of workers.
20     num_workers: usize,
21 }
22 
23 const UNPARK_SHIFT: usize = 16;
24 const UNPARK_MASK: usize = !SEARCH_MASK;
25 const SEARCH_MASK: usize = (1 << UNPARK_SHIFT) - 1;
26 
27 #[derive(Copy, Clone)]
28 struct State(usize);
29 
30 impl Idle {
new(num_workers: usize) -> Idle31     pub(super) fn new(num_workers: usize) -> Idle {
32         let init = State::new(num_workers);
33 
34         Idle {
35             state: AtomicUsize::new(init.into()),
36             sleepers: Mutex::new(Vec::with_capacity(num_workers)),
37             num_workers,
38         }
39     }
40 
41     /// If there are no workers actively searching, returns the index of a
42     /// worker currently sleeping.
worker_to_notify(&self) -> Option<usize>43     pub(super) fn worker_to_notify(&self) -> Option<usize> {
44         // If at least one worker is spinning, work being notified will
45         // eventually be found. A searching thread will find **some** work and
46         // notify another worker, eventually leading to our work being found.
47         //
48         // For this to happen, this load must happen before the thread
49         // transitioning `num_searching` to zero. Acquire / Release does not
50         // provide sufficient guarantees, so this load is done with `SeqCst` and
51         // will pair with the `fetch_sub(1)` when transitioning out of
52         // searching.
53         if !self.notify_should_wakeup() {
54             return None;
55         }
56 
57         // Acquire the lock
58         let mut sleepers = self.sleepers.lock();
59 
60         // Check again, now that the lock is acquired
61         if !self.notify_should_wakeup() {
62             return None;
63         }
64 
65         // A worker should be woken up, atomically increment the number of
66         // searching workers as well as the number of unparked workers.
67         State::unpark_one(&self.state);
68 
69         // Get the worker to unpark
70         let ret = sleepers.pop();
71         debug_assert!(ret.is_some());
72 
73         ret
74     }
75 
76     /// Returns `true` if the worker needs to do a final check for submitted
77     /// work.
transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool78     pub(super) fn transition_worker_to_parked(&self, worker: usize, is_searching: bool) -> bool {
79         // Acquire the lock
80         let mut sleepers = self.sleepers.lock();
81 
82         // Decrement the number of unparked threads
83         let ret = State::dec_num_unparked(&self.state, is_searching);
84 
85         // Track the sleeping worker
86         sleepers.push(worker);
87 
88         ret
89     }
90 
transition_worker_to_searching(&self) -> bool91     pub(super) fn transition_worker_to_searching(&self) -> bool {
92         let state = State::load(&self.state, SeqCst);
93         if 2 * state.num_searching() >= self.num_workers {
94             return false;
95         }
96 
97         // It is possible for this routine to allow more than 50% of the workers
98         // to search. That is OK. Limiting searchers is only an optimization to
99         // prevent too much contention.
100         State::inc_num_searching(&self.state, SeqCst);
101         true
102     }
103 
104     /// A lightweight transition from searching -> running.
105     ///
106     /// Returns `true` if this is the final searching worker. The caller
107     /// **must** notify a new worker.
transition_worker_from_searching(&self) -> bool108     pub(super) fn transition_worker_from_searching(&self) -> bool {
109         State::dec_num_searching(&self.state)
110     }
111 
112     /// Unpark a specific worker. This happens if tasks are submitted from
113     /// within the worker's park routine.
unpark_worker_by_id(&self, worker_id: usize)114     pub(super) fn unpark_worker_by_id(&self, worker_id: usize) {
115         let mut sleepers = self.sleepers.lock();
116 
117         for index in 0..sleepers.len() {
118             if sleepers[index] == worker_id {
119                 sleepers.swap_remove(index);
120 
121                 // Update the state accordingly while the lock is held.
122                 State::unpark_one(&self.state);
123 
124                 return;
125             }
126         }
127     }
128 
129     /// Returns `true` if `worker_id` is contained in the sleep set.
is_parked(&self, worker_id: usize) -> bool130     pub(super) fn is_parked(&self, worker_id: usize) -> bool {
131         let sleepers = self.sleepers.lock();
132         sleepers.contains(&worker_id)
133     }
134 
notify_should_wakeup(&self) -> bool135     fn notify_should_wakeup(&self) -> bool {
136         let state = State(self.state.fetch_add(0, SeqCst));
137         state.num_searching() == 0 && state.num_unparked() < self.num_workers
138     }
139 }
140 
141 impl State {
new(num_workers: usize) -> State142     fn new(num_workers: usize) -> State {
143         // All workers start in the unparked state
144         let ret = State(num_workers << UNPARK_SHIFT);
145         debug_assert_eq!(num_workers, ret.num_unparked());
146         debug_assert_eq!(0, ret.num_searching());
147         ret
148     }
149 
load(cell: &AtomicUsize, ordering: Ordering) -> State150     fn load(cell: &AtomicUsize, ordering: Ordering) -> State {
151         State(cell.load(ordering))
152     }
153 
unpark_one(cell: &AtomicUsize)154     fn unpark_one(cell: &AtomicUsize) {
155         cell.fetch_add(1 | (1 << UNPARK_SHIFT), SeqCst);
156     }
157 
inc_num_searching(cell: &AtomicUsize, ordering: Ordering)158     fn inc_num_searching(cell: &AtomicUsize, ordering: Ordering) {
159         cell.fetch_add(1, ordering);
160     }
161 
162     /// Returns `true` if this is the final searching worker
dec_num_searching(cell: &AtomicUsize) -> bool163     fn dec_num_searching(cell: &AtomicUsize) -> bool {
164         let state = State(cell.fetch_sub(1, SeqCst));
165         state.num_searching() == 1
166     }
167 
168     /// Track a sleeping worker
169     ///
170     /// Returns `true` if this is the final searching worker.
dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> bool171     fn dec_num_unparked(cell: &AtomicUsize, is_searching: bool) -> bool {
172         let mut dec = 1 << UNPARK_SHIFT;
173 
174         if is_searching {
175             dec += 1;
176         }
177 
178         let prev = State(cell.fetch_sub(dec, SeqCst));
179         is_searching && prev.num_searching() == 1
180     }
181 
182     /// Number of workers currently searching
num_searching(self) -> usize183     fn num_searching(self) -> usize {
184         self.0 & SEARCH_MASK
185     }
186 
187     /// Number of workers currently unparked
num_unparked(self) -> usize188     fn num_unparked(self) -> usize {
189         (self.0 & UNPARK_MASK) >> UNPARK_SHIFT
190     }
191 }
192 
193 impl From<usize> for State {
from(src: usize) -> State194     fn from(src: usize) -> State {
195         State(src)
196     }
197 }
198 
199 impl From<State> for usize {
from(src: State) -> usize200     fn from(src: State) -> usize {
201         src.0
202     }
203 }
204 
205 impl fmt::Debug for State {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result206     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
207         fmt.debug_struct("worker::State")
208             .field("num_unparked", &self.num_unparked())
209             .field("num_searching", &self.num_searching())
210             .finish()
211     }
212 }
213 
214 #[test]
test_state()215 fn test_state() {
216     assert_eq!(0, UNPARK_MASK & SEARCH_MASK);
217     assert_eq!(0, !(UNPARK_MASK | SEARCH_MASK));
218 
219     let state = State::new(10);
220     assert_eq!(10, state.num_unparked());
221     assert_eq!(0, state.num_searching());
222 }
223