1 use std::fmt; 2 use std::marker::PhantomData; 3 use std::sync::atomic::AtomicUsize; 4 use std::sync::atomic::Ordering::SeqCst; 5 use std::sync::{Arc, Condvar, Mutex}; 6 use std::time::Duration; 7 8 /// A thread parking primitive. 9 /// 10 /// Conceptually, each `Parker` has an associated token which is initially not present: 11 /// 12 /// * The [`park`] method blocks the current thread unless or until the token is available, at 13 /// which point it automatically consumes the token. It may also return *spuriously*, without 14 /// consuming the token. 15 /// 16 /// * The [`park_timeout`] method works the same as [`park`], but blocks for a specified maximum 17 /// time. 18 /// 19 /// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the 20 /// token is initially absent, [`unpark`] followed by [`park`] will result in the second call 21 /// returning immediately. 22 /// 23 /// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using 24 /// [`park`] and [`unpark`]. 25 /// 26 /// # Examples 27 /// 28 /// ``` 29 /// use std::thread; 30 /// use std::time::Duration; 31 /// use crossbeam_utils::sync::Parker; 32 /// 33 /// let p = Parker::new(); 34 /// let u = p.unparker().clone(); 35 /// 36 /// // Make the token available. 37 /// u.unpark(); 38 /// // Wakes up immediately and consumes the token. 39 /// p.park(); 40 /// 41 /// thread::spawn(move || { 42 /// thread::sleep(Duration::from_millis(500)); 43 /// u.unpark(); 44 /// }); 45 /// 46 /// // Wakes up when `u.unpark()` provides the token, but may also wake up 47 /// // spuriously before that without consuming the token. 48 /// p.park(); 49 /// ``` 50 /// 51 /// [`park`]: Parker::park 52 /// [`park_timeout`]: Parker::park_timeout 53 /// [`unpark`]: Unparker::unpark 54 pub struct Parker { 55 unparker: Unparker, 56 _marker: PhantomData<*const ()>, 57 } 58 59 unsafe impl Send for Parker {} 60 61 impl Default for Parker { default() -> Self62 fn default() -> Self { 63 Self { 64 unparker: Unparker { 65 inner: Arc::new(Inner { 66 state: AtomicUsize::new(EMPTY), 67 lock: Mutex::new(()), 68 cvar: Condvar::new(), 69 }), 70 }, 71 _marker: PhantomData, 72 } 73 } 74 } 75 76 impl Parker { 77 /// Creates a new `Parker`. 78 /// 79 /// # Examples 80 /// 81 /// ``` 82 /// use crossbeam_utils::sync::Parker; 83 /// 84 /// let p = Parker::new(); 85 /// ``` 86 /// new() -> Parker87 pub fn new() -> Parker { 88 Self::default() 89 } 90 91 /// Blocks the current thread until the token is made available. 92 /// 93 /// A call to `park` may wake up spuriously without consuming the token, and callers should be 94 /// prepared for this possibility. 95 /// 96 /// # Examples 97 /// 98 /// ``` 99 /// use crossbeam_utils::sync::Parker; 100 /// 101 /// let p = Parker::new(); 102 /// let u = p.unparker().clone(); 103 /// 104 /// // Make the token available. 105 /// u.unpark(); 106 /// 107 /// // Wakes up immediately and consumes the token. 108 /// p.park(); 109 /// ``` park(&self)110 pub fn park(&self) { 111 self.unparker.inner.park(None); 112 } 113 114 /// Blocks the current thread until the token is made available, but only for a limited time. 115 /// 116 /// A call to `park_timeout` may wake up spuriously without consuming the token, and callers 117 /// should be prepared for this possibility. 118 /// 119 /// # Examples 120 /// 121 /// ``` 122 /// use std::time::Duration; 123 /// use crossbeam_utils::sync::Parker; 124 /// 125 /// let p = Parker::new(); 126 /// 127 /// // Waits for the token to become available, but will not wait longer than 500 ms. 128 /// p.park_timeout(Duration::from_millis(500)); 129 /// ``` park_timeout(&self, timeout: Duration)130 pub fn park_timeout(&self, timeout: Duration) { 131 self.unparker.inner.park(Some(timeout)); 132 } 133 134 /// Returns a reference to an associated [`Unparker`]. 135 /// 136 /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned. 137 /// 138 /// # Examples 139 /// 140 /// ``` 141 /// use crossbeam_utils::sync::Parker; 142 /// 143 /// let p = Parker::new(); 144 /// let u = p.unparker().clone(); 145 /// 146 /// // Make the token available. 147 /// u.unpark(); 148 /// // Wakes up immediately and consumes the token. 149 /// p.park(); 150 /// ``` 151 /// 152 /// [`park`]: Parker::park 153 /// [`park_timeout`]: Parker::park_timeout unparker(&self) -> &Unparker154 pub fn unparker(&self) -> &Unparker { 155 &self.unparker 156 } 157 158 /// Converts a `Parker` into a raw pointer. 159 /// 160 /// # Examples 161 /// 162 /// ``` 163 /// use crossbeam_utils::sync::Parker; 164 /// 165 /// let p = Parker::new(); 166 /// let raw = Parker::into_raw(p); 167 /// ``` into_raw(this: Parker) -> *const ()168 pub fn into_raw(this: Parker) -> *const () { 169 Unparker::into_raw(this.unparker) 170 } 171 172 /// Converts a raw pointer into a `Parker`. 173 /// 174 /// # Safety 175 /// 176 /// This method is safe to use only with pointers returned by [`Parker::into_raw`]. 177 /// 178 /// # Examples 179 /// 180 /// ``` 181 /// use crossbeam_utils::sync::Parker; 182 /// 183 /// let p = Parker::new(); 184 /// let raw = Parker::into_raw(p); 185 /// let p = unsafe { Parker::from_raw(raw) }; 186 /// ``` from_raw(ptr: *const ()) -> Parker187 pub unsafe fn from_raw(ptr: *const ()) -> Parker { 188 Parker { 189 unparker: Unparker::from_raw(ptr), 190 _marker: PhantomData, 191 } 192 } 193 } 194 195 impl fmt::Debug for Parker { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result196 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 197 f.pad("Parker { .. }") 198 } 199 } 200 201 /// Unparks a thread parked by the associated [`Parker`]. 202 pub struct Unparker { 203 inner: Arc<Inner>, 204 } 205 206 unsafe impl Send for Unparker {} 207 unsafe impl Sync for Unparker {} 208 209 impl Unparker { 210 /// Atomically makes the token available if it is not already. 211 /// 212 /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is 213 /// any. 214 /// 215 /// # Examples 216 /// 217 /// ``` 218 /// use std::thread; 219 /// use std::time::Duration; 220 /// use crossbeam_utils::sync::Parker; 221 /// 222 /// let p = Parker::new(); 223 /// let u = p.unparker().clone(); 224 /// 225 /// thread::spawn(move || { 226 /// thread::sleep(Duration::from_millis(500)); 227 /// u.unpark(); 228 /// }); 229 /// 230 /// // Wakes up when `u.unpark()` provides the token, but may also wake up 231 /// // spuriously before that without consuming the token. 232 /// p.park(); 233 /// ``` 234 /// 235 /// [`park`]: Parker::park 236 /// [`park_timeout`]: Parker::park_timeout unpark(&self)237 pub fn unpark(&self) { 238 self.inner.unpark() 239 } 240 241 /// Converts an `Unparker` into a raw pointer. 242 /// 243 /// # Examples 244 /// 245 /// ``` 246 /// use crossbeam_utils::sync::{Parker, Unparker}; 247 /// 248 /// let p = Parker::new(); 249 /// let u = p.unparker().clone(); 250 /// let raw = Unparker::into_raw(u); 251 /// ``` into_raw(this: Unparker) -> *const ()252 pub fn into_raw(this: Unparker) -> *const () { 253 Arc::into_raw(this.inner) as *const () 254 } 255 256 /// Converts a raw pointer into an `Unparker`. 257 /// 258 /// # Safety 259 /// 260 /// This method is safe to use only with pointers returned by [`Unparker::into_raw`]. 261 /// 262 /// # Examples 263 /// 264 /// ``` 265 /// use crossbeam_utils::sync::{Parker, Unparker}; 266 /// 267 /// let p = Parker::new(); 268 /// let u = p.unparker().clone(); 269 /// 270 /// let raw = Unparker::into_raw(u); 271 /// let u = unsafe { Unparker::from_raw(raw) }; 272 /// ``` from_raw(ptr: *const ()) -> Unparker273 pub unsafe fn from_raw(ptr: *const ()) -> Unparker { 274 Unparker { 275 inner: Arc::from_raw(ptr as *const Inner), 276 } 277 } 278 } 279 280 impl fmt::Debug for Unparker { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result281 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 282 f.pad("Unparker { .. }") 283 } 284 } 285 286 impl Clone for Unparker { clone(&self) -> Unparker287 fn clone(&self) -> Unparker { 288 Unparker { 289 inner: self.inner.clone(), 290 } 291 } 292 } 293 294 const EMPTY: usize = 0; 295 const PARKED: usize = 1; 296 const NOTIFIED: usize = 2; 297 298 struct Inner { 299 state: AtomicUsize, 300 lock: Mutex<()>, 301 cvar: Condvar, 302 } 303 304 impl Inner { park(&self, timeout: Option<Duration>)305 fn park(&self, timeout: Option<Duration>) { 306 // If we were previously notified then we consume this notification and return quickly. 307 if self 308 .state 309 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) 310 .is_ok() 311 { 312 return; 313 } 314 315 // If the timeout is zero, then there is no need to actually block. 316 if let Some(ref dur) = timeout { 317 if *dur == Duration::from_millis(0) { 318 return; 319 } 320 } 321 322 // Otherwise we need to coordinate going to sleep. 323 let mut m = self.lock.lock().unwrap(); 324 325 match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { 326 Ok(_) => {} 327 // Consume this notification to avoid spurious wakeups in the next park. 328 Err(NOTIFIED) => { 329 // We must read `state` here, even though we know it will be `NOTIFIED`. This is 330 // because `unpark` may have been called again since we read `NOTIFIED` in the 331 // `compare_exchange` above. We must perform an acquire operation that synchronizes 332 // with that `unpark` to observe any writes it made before the call to `unpark`. To 333 // do that we must read from the write it made to `state`. 334 let old = self.state.swap(EMPTY, SeqCst); 335 assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); 336 return; 337 } 338 Err(n) => panic!("inconsistent park_timeout state: {}", n), 339 } 340 341 match timeout { 342 None => { 343 loop { 344 // Block the current thread on the conditional variable. 345 m = self.cvar.wait(m).unwrap(); 346 347 if self 348 .state 349 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) 350 .is_ok() 351 { 352 // got a notification 353 return; 354 } 355 356 // spurious wakeup, go back to sleep 357 } 358 } 359 Some(timeout) => { 360 // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a 361 // notification we just want to unconditionally set `state` back to `EMPTY`, either 362 // consuming a notification or un-flagging ourselves as parked. 363 let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap(); 364 365 match self.state.swap(EMPTY, SeqCst) { 366 NOTIFIED => {} // got a notification 367 PARKED => {} // no notification 368 n => panic!("inconsistent park_timeout state: {}", n), 369 } 370 } 371 } 372 } 373 unpark(&self)374 pub fn unpark(&self) { 375 // To ensure the unparked thread will observe any writes we made before this call, we must 376 // perform a release operation that `park` can synchronize with. To do that we must write 377 // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather 378 // than a compare-and-swap that returns if it reads `NOTIFIED` on failure. 379 match self.state.swap(NOTIFIED, SeqCst) { 380 EMPTY => return, // no one was waiting 381 NOTIFIED => return, // already unparked 382 PARKED => {} // gotta go wake someone up 383 _ => panic!("inconsistent state in unpark"), 384 } 385 386 // There is a period between when the parked thread sets `state` to `PARKED` (or last 387 // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`. 388 // If we were to notify during this period it would be ignored and then when the parked 389 // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this 390 // stage so we can acquire `lock` to wait until it is ready to receive the notification. 391 // 392 // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes 393 // it doesn't get woken only to have to wait for us to release `lock`. 394 drop(self.lock.lock().unwrap()); 395 self.cvar.notify_one(); 396 } 397 } 398