1 use park::DefaultPark;
2 use worker::WorkerId;
3 
4 use std::cell::UnsafeCell;
5 use std::fmt;
6 use std::sync::atomic::AtomicUsize;
7 use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed};
8 use std::time::{Duration, Instant};
9 
10 /// State associated with a thread in the thread pool.
11 ///
12 /// The pool manages a number of threads. Some of those threads are considered
13 /// "primary" threads and process the work queue. When a task being run on a
14 /// primary thread enters a blocking context, the responsibility of processing
15 /// the work queue must be handed off to another thread. This is done by first
16 /// checking for idle threads on the backup stack. If one is found, the worker
17 /// token (`WorkerId`) is handed off to that running thread. If none are found,
18 /// a new thread is spawned.
19 ///
20 /// This state manages the exchange. A thread that is idle, not assigned to a
21 /// work queue, sits around for a specified amount of time. When the worker
22 /// token is handed off, it is first stored in `handoff`. The backup thread is
23 /// then signaled. At this point, the backup thread wakes up from sleep and
24 /// reads `handoff`. At that point, it has been promoted to a primary thread and
25 /// will begin processing inbound work on the work queue.
26 ///
27 /// The name `Backup` isn't really great for what the type does, but I have not
28 /// come up with a better name... Maybe it should just be named `Thread`.
29 #[derive(Debug)]
30 pub(crate) struct Backup {
31     /// Worker ID that is being handed to this thread.
32     handoff: UnsafeCell<Option<WorkerId>>,
33 
34     /// Thread state.
35     ///
36     /// This tracks:
37     ///
38     /// * Is queued flag
39     /// * If the pool is shutting down.
40     /// * If the thread is running
41     state: AtomicUsize,
42 
43     /// Next entry in the Treiber stack.
44     next_sleeper: UnsafeCell<BackupId>,
45 
46     /// Used to put the thread to sleep
47     park: DefaultPark,
48 }
49 
50 #[derive(Debug, Eq, PartialEq, Copy, Clone)]
51 pub(crate) struct BackupId(pub(crate) usize);
52 
53 #[derive(Debug)]
54 pub(crate) enum Handoff {
55     Worker(WorkerId),
56     Idle,
57     Terminated,
58 }
59 
60 /// Tracks thread state.
61 #[derive(Clone, Copy, Eq, PartialEq)]
62 struct State(usize);
63 
64 /// Set when the worker is pushed onto the scheduler's stack of sleeping
65 /// threads.
66 ///
67 /// This flag also serves as a "notification" bit. If another thread is
68 /// attempting to hand off a worker to the backup thread, then the pushed bit
69 /// will not be set when the thread tries to shutdown.
70 pub const PUSHED: usize = 0b001;
71 
72 /// Set when the thread is running
73 pub const RUNNING: usize = 0b010;
74 
75 /// Set when the thread pool has terminated
76 pub const TERMINATED: usize = 0b100;
77 
78 // ===== impl Backup =====
79 
80 impl Backup {
new() -> Backup81     pub fn new() -> Backup {
82         Backup {
83             handoff: UnsafeCell::new(None),
84             state: AtomicUsize::new(State::new().into()),
85             next_sleeper: UnsafeCell::new(BackupId(0)),
86             park: DefaultPark::new(),
87         }
88     }
89 
90     /// Called when the thread is starting
start(&self, worker_id: &WorkerId)91     pub fn start(&self, worker_id: &WorkerId) {
92         debug_assert!({
93             let state: State = self.state.load(Relaxed).into();
94 
95             debug_assert!(!state.is_pushed());
96             debug_assert!(state.is_running());
97             debug_assert!(!state.is_terminated());
98 
99             true
100         });
101 
102         // The handoff value is equal to `worker_id`
103         debug_assert_eq!(unsafe { (*self.handoff.get()).as_ref() }, Some(worker_id));
104 
105         unsafe {
106             *self.handoff.get() = None;
107         }
108     }
109 
is_running(&self) -> bool110     pub fn is_running(&self) -> bool {
111         let state: State = self.state.load(Relaxed).into();
112         state.is_running()
113     }
114 
115     /// Hands off the worker to a thread.
116     ///
117     /// Returns `true` if the thread needs to be spawned.
worker_handoff(&self, worker_id: WorkerId) -> bool118     pub fn worker_handoff(&self, worker_id: WorkerId) -> bool {
119         unsafe {
120             // The backup worker should not already have been handoff a worker.
121             debug_assert!((*self.handoff.get()).is_none());
122 
123             // Set the handoff
124             *self.handoff.get() = Some(worker_id);
125         }
126 
127         // This *probably* can just be `Release`... memory orderings, how do
128         // they work?
129         let prev = State::worker_handoff(&self.state);
130         debug_assert!(prev.is_pushed());
131 
132         if prev.is_running() {
133             // Wakeup the backup thread
134             self.park.notify();
135             false
136         } else {
137             true
138         }
139     }
140 
141     /// Terminate the worker
signal_stop(&self)142     pub fn signal_stop(&self) {
143         let prev: State = self.state.fetch_xor(TERMINATED | PUSHED, AcqRel).into();
144 
145         debug_assert!(!prev.is_terminated());
146         debug_assert!(prev.is_pushed());
147 
148         if prev.is_running() {
149             self.park.notify();
150         }
151     }
152 
153     /// Release the worker
release(&self)154     pub fn release(&self) {
155         let prev: State = self.state.fetch_xor(RUNNING, AcqRel).into();
156 
157         debug_assert!(prev.is_running());
158     }
159 
160     /// Wait for a worker handoff
wait_for_handoff(&self, timeout: Option<Duration>) -> Handoff161     pub fn wait_for_handoff(&self, timeout: Option<Duration>) -> Handoff {
162         let sleep_until = timeout.map(|dur| Instant::now() + dur);
163         let mut state: State = self.state.load(Acquire).into();
164 
165         // Run in a loop since there can be spurious wakeups
166         loop {
167             if !state.is_pushed() {
168                 if state.is_terminated() {
169                     return Handoff::Terminated;
170                 }
171 
172                 let worker_id = unsafe { (*self.handoff.get()).take().expect("no worker handoff") };
173                 return Handoff::Worker(worker_id);
174             }
175 
176             match sleep_until {
177                 None => {
178                     self.park.park_sync(None);
179                     state = self.state.load(Acquire).into();
180                 }
181                 Some(when) => {
182                     let now = Instant::now();
183 
184                     if now < when {
185                         self.park.park_sync(Some(when - now));
186                         state = self.state.load(Acquire).into();
187                     } else {
188                         debug_assert!(state.is_running());
189 
190                         // Transition out of running
191                         let mut next = state;
192                         next.unset_running();
193 
194                         let actual = self
195                             .state
196                             .compare_and_swap(state.into(), next.into(), AcqRel)
197                             .into();
198 
199                         if actual == state {
200                             debug_assert!(!next.is_running());
201                             return Handoff::Idle;
202                         }
203 
204                         state = actual;
205                     }
206                 }
207             }
208         }
209     }
210 
is_pushed(&self) -> bool211     pub fn is_pushed(&self) -> bool {
212         let state: State = self.state.load(Relaxed).into();
213         state.is_pushed()
214     }
215 
set_pushed(&self, ordering: Ordering)216     pub fn set_pushed(&self, ordering: Ordering) {
217         let prev: State = self.state.fetch_or(PUSHED, ordering).into();
218         debug_assert!(!prev.is_pushed());
219     }
220 
221     #[inline]
next_sleeper(&self) -> BackupId222     pub fn next_sleeper(&self) -> BackupId {
223         unsafe { *self.next_sleeper.get() }
224     }
225 
226     #[inline]
set_next_sleeper(&self, val: BackupId)227     pub fn set_next_sleeper(&self, val: BackupId) {
228         unsafe {
229             *self.next_sleeper.get() = val;
230         }
231     }
232 }
233 
234 // ===== impl State =====
235 
236 impl State {
237     /// Returns a new, default, thread `State`
new() -> State238     pub fn new() -> State {
239         State(0)
240     }
241 
242     /// Returns true if the thread entry is pushed in the sleeper stack
is_pushed(&self) -> bool243     pub fn is_pushed(&self) -> bool {
244         self.0 & PUSHED == PUSHED
245     }
246 
unset_pushed(&mut self)247     fn unset_pushed(&mut self) {
248         self.0 &= !PUSHED;
249     }
250 
is_running(&self) -> bool251     pub fn is_running(&self) -> bool {
252         self.0 & RUNNING == RUNNING
253     }
254 
set_running(&mut self)255     pub fn set_running(&mut self) {
256         self.0 |= RUNNING;
257     }
258 
unset_running(&mut self)259     pub fn unset_running(&mut self) {
260         self.0 &= !RUNNING;
261     }
262 
is_terminated(&self) -> bool263     pub fn is_terminated(&self) -> bool {
264         self.0 & TERMINATED == TERMINATED
265     }
266 
worker_handoff(state: &AtomicUsize) -> State267     fn worker_handoff(state: &AtomicUsize) -> State {
268         let mut curr: State = state.load(Acquire).into();
269 
270         loop {
271             let mut next = curr;
272             next.set_running();
273             next.unset_pushed();
274 
275             let actual = state
276                 .compare_and_swap(curr.into(), next.into(), AcqRel)
277                 .into();
278 
279             if actual == curr {
280                 return curr;
281             }
282 
283             curr = actual;
284         }
285     }
286 }
287 
288 impl From<usize> for State {
from(src: usize) -> State289     fn from(src: usize) -> State {
290         State(src)
291     }
292 }
293 
294 impl From<State> for usize {
from(src: State) -> usize295     fn from(src: State) -> usize {
296         src.0
297     }
298 }
299 
300 impl fmt::Debug for State {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result301     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
302         fmt.debug_struct("backup::State")
303             .field("is_pushed", &self.is_pushed())
304             .field("is_running", &self.is_running())
305             .field("is_terminated", &self.is_terminated())
306             .finish()
307     }
308 }
309