1 use park::DefaultPark; 2 use worker::WorkerId; 3 4 use std::cell::UnsafeCell; 5 use std::fmt; 6 use std::sync::atomic::AtomicUsize; 7 use std::sync::atomic::Ordering::{self, AcqRel, Acquire, Relaxed}; 8 use std::time::{Duration, Instant}; 9 10 /// State associated with a thread in the thread pool. 11 /// 12 /// The pool manages a number of threads. Some of those threads are considered 13 /// "primary" threads and process the work queue. When a task being run on a 14 /// primary thread enters a blocking context, the responsibility of processing 15 /// the work queue must be handed off to another thread. This is done by first 16 /// checking for idle threads on the backup stack. If one is found, the worker 17 /// token (`WorkerId`) is handed off to that running thread. If none are found, 18 /// a new thread is spawned. 19 /// 20 /// This state manages the exchange. A thread that is idle, not assigned to a 21 /// work queue, sits around for a specified amount of time. When the worker 22 /// token is handed off, it is first stored in `handoff`. The backup thread is 23 /// then signaled. At this point, the backup thread wakes up from sleep and 24 /// reads `handoff`. At that point, it has been promoted to a primary thread and 25 /// will begin processing inbound work on the work queue. 26 /// 27 /// The name `Backup` isn't really great for what the type does, but I have not 28 /// come up with a better name... Maybe it should just be named `Thread`. 29 #[derive(Debug)] 30 pub(crate) struct Backup { 31 /// Worker ID that is being handed to this thread. 32 handoff: UnsafeCell<Option<WorkerId>>, 33 34 /// Thread state. 35 /// 36 /// This tracks: 37 /// 38 /// * Is queued flag 39 /// * If the pool is shutting down. 40 /// * If the thread is running 41 state: AtomicUsize, 42 43 /// Next entry in the Treiber stack. 44 next_sleeper: UnsafeCell<BackupId>, 45 46 /// Used to put the thread to sleep 47 park: DefaultPark, 48 } 49 50 #[derive(Debug, Eq, PartialEq, Copy, Clone)] 51 pub(crate) struct BackupId(pub(crate) usize); 52 53 #[derive(Debug)] 54 pub(crate) enum Handoff { 55 Worker(WorkerId), 56 Idle, 57 Terminated, 58 } 59 60 /// Tracks thread state. 61 #[derive(Clone, Copy, Eq, PartialEq)] 62 struct State(usize); 63 64 /// Set when the worker is pushed onto the scheduler's stack of sleeping 65 /// threads. 66 /// 67 /// This flag also serves as a "notification" bit. If another thread is 68 /// attempting to hand off a worker to the backup thread, then the pushed bit 69 /// will not be set when the thread tries to shutdown. 70 pub const PUSHED: usize = 0b001; 71 72 /// Set when the thread is running 73 pub const RUNNING: usize = 0b010; 74 75 /// Set when the thread pool has terminated 76 pub const TERMINATED: usize = 0b100; 77 78 // ===== impl Backup ===== 79 80 impl Backup { new() -> Backup81 pub fn new() -> Backup { 82 Backup { 83 handoff: UnsafeCell::new(None), 84 state: AtomicUsize::new(State::new().into()), 85 next_sleeper: UnsafeCell::new(BackupId(0)), 86 park: DefaultPark::new(), 87 } 88 } 89 90 /// Called when the thread is starting start(&self, worker_id: &WorkerId)91 pub fn start(&self, worker_id: &WorkerId) { 92 debug_assert!({ 93 let state: State = self.state.load(Relaxed).into(); 94 95 debug_assert!(!state.is_pushed()); 96 debug_assert!(state.is_running()); 97 debug_assert!(!state.is_terminated()); 98 99 true 100 }); 101 102 // The handoff value is equal to `worker_id` 103 debug_assert_eq!(unsafe { (*self.handoff.get()).as_ref() }, Some(worker_id)); 104 105 unsafe { 106 *self.handoff.get() = None; 107 } 108 } 109 is_running(&self) -> bool110 pub fn is_running(&self) -> bool { 111 let state: State = self.state.load(Relaxed).into(); 112 state.is_running() 113 } 114 115 /// Hands off the worker to a thread. 116 /// 117 /// Returns `true` if the thread needs to be spawned. worker_handoff(&self, worker_id: WorkerId) -> bool118 pub fn worker_handoff(&self, worker_id: WorkerId) -> bool { 119 unsafe { 120 // The backup worker should not already have been handoff a worker. 121 debug_assert!((*self.handoff.get()).is_none()); 122 123 // Set the handoff 124 *self.handoff.get() = Some(worker_id); 125 } 126 127 // This *probably* can just be `Release`... memory orderings, how do 128 // they work? 129 let prev = State::worker_handoff(&self.state); 130 debug_assert!(prev.is_pushed()); 131 132 if prev.is_running() { 133 // Wakeup the backup thread 134 self.park.notify(); 135 false 136 } else { 137 true 138 } 139 } 140 141 /// Terminate the worker signal_stop(&self)142 pub fn signal_stop(&self) { 143 let prev: State = self.state.fetch_xor(TERMINATED | PUSHED, AcqRel).into(); 144 145 debug_assert!(!prev.is_terminated()); 146 debug_assert!(prev.is_pushed()); 147 148 if prev.is_running() { 149 self.park.notify(); 150 } 151 } 152 153 /// Release the worker release(&self)154 pub fn release(&self) { 155 let prev: State = self.state.fetch_xor(RUNNING, AcqRel).into(); 156 157 debug_assert!(prev.is_running()); 158 } 159 160 /// Wait for a worker handoff wait_for_handoff(&self, timeout: Option<Duration>) -> Handoff161 pub fn wait_for_handoff(&self, timeout: Option<Duration>) -> Handoff { 162 let sleep_until = timeout.map(|dur| Instant::now() + dur); 163 let mut state: State = self.state.load(Acquire).into(); 164 165 // Run in a loop since there can be spurious wakeups 166 loop { 167 if !state.is_pushed() { 168 if state.is_terminated() { 169 return Handoff::Terminated; 170 } 171 172 let worker_id = unsafe { (*self.handoff.get()).take().expect("no worker handoff") }; 173 return Handoff::Worker(worker_id); 174 } 175 176 match sleep_until { 177 None => { 178 self.park.park_sync(None); 179 state = self.state.load(Acquire).into(); 180 } 181 Some(when) => { 182 let now = Instant::now(); 183 184 if now < when { 185 self.park.park_sync(Some(when - now)); 186 state = self.state.load(Acquire).into(); 187 } else { 188 debug_assert!(state.is_running()); 189 190 // Transition out of running 191 let mut next = state; 192 next.unset_running(); 193 194 let actual = self 195 .state 196 .compare_and_swap(state.into(), next.into(), AcqRel) 197 .into(); 198 199 if actual == state { 200 debug_assert!(!next.is_running()); 201 return Handoff::Idle; 202 } 203 204 state = actual; 205 } 206 } 207 } 208 } 209 } 210 is_pushed(&self) -> bool211 pub fn is_pushed(&self) -> bool { 212 let state: State = self.state.load(Relaxed).into(); 213 state.is_pushed() 214 } 215 set_pushed(&self, ordering: Ordering)216 pub fn set_pushed(&self, ordering: Ordering) { 217 let prev: State = self.state.fetch_or(PUSHED, ordering).into(); 218 debug_assert!(!prev.is_pushed()); 219 } 220 221 #[inline] next_sleeper(&self) -> BackupId222 pub fn next_sleeper(&self) -> BackupId { 223 unsafe { *self.next_sleeper.get() } 224 } 225 226 #[inline] set_next_sleeper(&self, val: BackupId)227 pub fn set_next_sleeper(&self, val: BackupId) { 228 unsafe { 229 *self.next_sleeper.get() = val; 230 } 231 } 232 } 233 234 // ===== impl State ===== 235 236 impl State { 237 /// Returns a new, default, thread `State` new() -> State238 pub fn new() -> State { 239 State(0) 240 } 241 242 /// Returns true if the thread entry is pushed in the sleeper stack is_pushed(&self) -> bool243 pub fn is_pushed(&self) -> bool { 244 self.0 & PUSHED == PUSHED 245 } 246 unset_pushed(&mut self)247 fn unset_pushed(&mut self) { 248 self.0 &= !PUSHED; 249 } 250 is_running(&self) -> bool251 pub fn is_running(&self) -> bool { 252 self.0 & RUNNING == RUNNING 253 } 254 set_running(&mut self)255 pub fn set_running(&mut self) { 256 self.0 |= RUNNING; 257 } 258 unset_running(&mut self)259 pub fn unset_running(&mut self) { 260 self.0 &= !RUNNING; 261 } 262 is_terminated(&self) -> bool263 pub fn is_terminated(&self) -> bool { 264 self.0 & TERMINATED == TERMINATED 265 } 266 worker_handoff(state: &AtomicUsize) -> State267 fn worker_handoff(state: &AtomicUsize) -> State { 268 let mut curr: State = state.load(Acquire).into(); 269 270 loop { 271 let mut next = curr; 272 next.set_running(); 273 next.unset_pushed(); 274 275 let actual = state 276 .compare_and_swap(curr.into(), next.into(), AcqRel) 277 .into(); 278 279 if actual == curr { 280 return curr; 281 } 282 283 curr = actual; 284 } 285 } 286 } 287 288 impl From<usize> for State { from(src: usize) -> State289 fn from(src: usize) -> State { 290 State(src) 291 } 292 } 293 294 impl From<State> for usize { from(src: State) -> usize295 fn from(src: State) -> usize { 296 src.0 297 } 298 } 299 300 impl fmt::Debug for State { fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result301 fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { 302 fmt.debug_struct("backup::State") 303 .field("is_pushed", &self.is_pushed()) 304 .field("is_running", &self.is_running()) 305 .field("is_terminated", &self.is_terminated()) 306 .finish() 307 } 308 } 309