1 use crate::primitive::sync::atomic::AtomicUsize; 2 use crate::primitive::sync::{Arc, Condvar, Mutex}; 3 use core::sync::atomic::Ordering::SeqCst; 4 use std::fmt; 5 use std::marker::PhantomData; 6 use std::time::{Duration, Instant}; 7 8 /// A thread parking primitive. 9 /// 10 /// Conceptually, each `Parker` has an associated token which is initially not present: 11 /// 12 /// * The [`park`] method blocks the current thread unless or until the token is available, at 13 /// which point it automatically consumes the token. 14 /// 15 /// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for 16 /// a specified maximum time. 17 /// 18 /// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the 19 /// token is initially absent, [`unpark`] followed by [`park`] will result in the second call 20 /// returning immediately. 21 /// 22 /// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using 23 /// [`park`] and [`unpark`]. 24 /// 25 /// # Examples 26 /// 27 /// ``` 28 /// use std::thread; 29 /// use std::time::Duration; 30 /// use crossbeam_utils::sync::Parker; 31 /// 32 /// let p = Parker::new(); 33 /// let u = p.unparker().clone(); 34 /// 35 /// // Make the token available. 36 /// u.unpark(); 37 /// // Wakes up immediately and consumes the token. 38 /// p.park(); 39 /// 40 /// thread::spawn(move || { 41 /// thread::sleep(Duration::from_millis(500)); 42 /// u.unpark(); 43 /// }); 44 /// 45 /// // Wakes up when `u.unpark()` provides the token. 46 /// p.park(); 47 /// ``` 48 /// 49 /// [`park`]: Parker::park 50 /// [`park_timeout`]: Parker::park_timeout 51 /// [`park_deadline`]: Parker::park_deadline 52 /// [`unpark`]: Unparker::unpark 53 pub struct Parker { 54 unparker: Unparker, 55 _marker: PhantomData<*const ()>, 56 } 57 58 unsafe impl Send for Parker {} 59 60 impl Default for Parker { default() -> Self61 fn default() -> Self { 62 Self { 63 unparker: Unparker { 64 inner: Arc::new(Inner { 65 state: AtomicUsize::new(EMPTY), 66 lock: Mutex::new(()), 67 cvar: Condvar::new(), 68 }), 69 }, 70 _marker: PhantomData, 71 } 72 } 73 } 74 75 impl Parker { 76 /// Creates a new `Parker`. 77 /// 78 /// # Examples 79 /// 80 /// ``` 81 /// use crossbeam_utils::sync::Parker; 82 /// 83 /// let p = Parker::new(); 84 /// ``` 85 /// new() -> Parker86 pub fn new() -> Parker { 87 Self::default() 88 } 89 90 /// Blocks the current thread until the token is made available. 91 /// 92 /// # Examples 93 /// 94 /// ``` 95 /// use crossbeam_utils::sync::Parker; 96 /// 97 /// let p = Parker::new(); 98 /// let u = p.unparker().clone(); 99 /// 100 /// // Make the token available. 101 /// u.unpark(); 102 /// 103 /// // Wakes up immediately and consumes the token. 104 /// p.park(); 105 /// ``` park(&self)106 pub fn park(&self) { 107 self.unparker.inner.park(None); 108 } 109 110 /// Blocks the current thread until the token is made available, but only for a limited time. 111 /// 112 /// # Examples 113 /// 114 /// ``` 115 /// use std::time::Duration; 116 /// use crossbeam_utils::sync::Parker; 117 /// 118 /// let p = Parker::new(); 119 /// 120 /// // Waits for the token to become available, but will not wait longer than 500 ms. 121 /// p.park_timeout(Duration::from_millis(500)); 122 /// ``` park_timeout(&self, timeout: Duration)123 pub fn park_timeout(&self, timeout: Duration) { 124 self.park_deadline(Instant::now() + timeout) 125 } 126 127 /// Blocks the current thread until the token is made available, or until a certain deadline. 128 /// 129 /// # Examples 130 /// 131 /// ``` 132 /// use std::time::{Duration, Instant}; 133 /// use crossbeam_utils::sync::Parker; 134 /// 135 /// let p = Parker::new(); 136 /// let deadline = Instant::now() + Duration::from_millis(500); 137 /// 138 /// // Waits for the token to become available, but will not wait longer than 500 ms. 139 /// p.park_deadline(deadline); 140 /// ``` park_deadline(&self, deadline: Instant)141 pub fn park_deadline(&self, deadline: Instant) { 142 self.unparker.inner.park(Some(deadline)) 143 } 144 145 /// Returns a reference to an associated [`Unparker`]. 146 /// 147 /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned. 148 /// 149 /// # Examples 150 /// 151 /// ``` 152 /// use crossbeam_utils::sync::Parker; 153 /// 154 /// let p = Parker::new(); 155 /// let u = p.unparker().clone(); 156 /// 157 /// // Make the token available. 158 /// u.unpark(); 159 /// // Wakes up immediately and consumes the token. 160 /// p.park(); 161 /// ``` 162 /// 163 /// [`park`]: Parker::park 164 /// [`park_timeout`]: Parker::park_timeout unparker(&self) -> &Unparker165 pub fn unparker(&self) -> &Unparker { 166 &self.unparker 167 } 168 169 /// Converts a `Parker` into a raw pointer. 170 /// 171 /// # Examples 172 /// 173 /// ``` 174 /// use crossbeam_utils::sync::Parker; 175 /// 176 /// let p = Parker::new(); 177 /// let raw = Parker::into_raw(p); 178 /// ``` into_raw(this: Parker) -> *const ()179 pub fn into_raw(this: Parker) -> *const () { 180 Unparker::into_raw(this.unparker) 181 } 182 183 /// Converts a raw pointer into a `Parker`. 184 /// 185 /// # Safety 186 /// 187 /// This method is safe to use only with pointers returned by [`Parker::into_raw`]. 188 /// 189 /// # Examples 190 /// 191 /// ``` 192 /// use crossbeam_utils::sync::Parker; 193 /// 194 /// let p = Parker::new(); 195 /// let raw = Parker::into_raw(p); 196 /// let p = unsafe { Parker::from_raw(raw) }; 197 /// ``` from_raw(ptr: *const ()) -> Parker198 pub unsafe fn from_raw(ptr: *const ()) -> Parker { 199 Parker { 200 unparker: Unparker::from_raw(ptr), 201 _marker: PhantomData, 202 } 203 } 204 } 205 206 impl fmt::Debug for Parker { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result207 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 208 f.pad("Parker { .. }") 209 } 210 } 211 212 /// Unparks a thread parked by the associated [`Parker`]. 213 pub struct Unparker { 214 inner: Arc<Inner>, 215 } 216 217 unsafe impl Send for Unparker {} 218 unsafe impl Sync for Unparker {} 219 220 impl Unparker { 221 /// Atomically makes the token available if it is not already. 222 /// 223 /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is 224 /// any. 225 /// 226 /// # Examples 227 /// 228 /// ``` 229 /// use std::thread; 230 /// use std::time::Duration; 231 /// use crossbeam_utils::sync::Parker; 232 /// 233 /// let p = Parker::new(); 234 /// let u = p.unparker().clone(); 235 /// 236 /// thread::spawn(move || { 237 /// thread::sleep(Duration::from_millis(500)); 238 /// u.unpark(); 239 /// }); 240 /// 241 /// // Wakes up when `u.unpark()` provides the token. 242 /// p.park(); 243 /// ``` 244 /// 245 /// [`park`]: Parker::park 246 /// [`park_timeout`]: Parker::park_timeout unpark(&self)247 pub fn unpark(&self) { 248 self.inner.unpark() 249 } 250 251 /// Converts an `Unparker` into a raw pointer. 252 /// 253 /// # Examples 254 /// 255 /// ``` 256 /// use crossbeam_utils::sync::{Parker, Unparker}; 257 /// 258 /// let p = Parker::new(); 259 /// let u = p.unparker().clone(); 260 /// let raw = Unparker::into_raw(u); 261 /// ``` into_raw(this: Unparker) -> *const ()262 pub fn into_raw(this: Unparker) -> *const () { 263 Arc::into_raw(this.inner) as *const () 264 } 265 266 /// Converts a raw pointer into an `Unparker`. 267 /// 268 /// # Safety 269 /// 270 /// This method is safe to use only with pointers returned by [`Unparker::into_raw`]. 271 /// 272 /// # Examples 273 /// 274 /// ``` 275 /// use crossbeam_utils::sync::{Parker, Unparker}; 276 /// 277 /// let p = Parker::new(); 278 /// let u = p.unparker().clone(); 279 /// 280 /// let raw = Unparker::into_raw(u); 281 /// let u = unsafe { Unparker::from_raw(raw) }; 282 /// ``` from_raw(ptr: *const ()) -> Unparker283 pub unsafe fn from_raw(ptr: *const ()) -> Unparker { 284 Unparker { 285 inner: Arc::from_raw(ptr as *const Inner), 286 } 287 } 288 } 289 290 impl fmt::Debug for Unparker { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result291 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 292 f.pad("Unparker { .. }") 293 } 294 } 295 296 impl Clone for Unparker { clone(&self) -> Unparker297 fn clone(&self) -> Unparker { 298 Unparker { 299 inner: self.inner.clone(), 300 } 301 } 302 } 303 304 const EMPTY: usize = 0; 305 const PARKED: usize = 1; 306 const NOTIFIED: usize = 2; 307 308 struct Inner { 309 state: AtomicUsize, 310 lock: Mutex<()>, 311 cvar: Condvar, 312 } 313 314 impl Inner { park(&self, deadline: Option<Instant>)315 fn park(&self, deadline: Option<Instant>) { 316 // If we were previously notified then we consume this notification and return quickly. 317 if self 318 .state 319 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) 320 .is_ok() 321 { 322 return; 323 } 324 325 // If the timeout is zero, then there is no need to actually block. 326 if let Some(deadline) = deadline { 327 if deadline <= Instant::now() { 328 return; 329 } 330 } 331 332 // Otherwise we need to coordinate going to sleep. 333 let mut m = self.lock.lock().unwrap(); 334 335 match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) { 336 Ok(_) => {} 337 // Consume this notification to avoid spurious wakeups in the next park. 338 Err(NOTIFIED) => { 339 // We must read `state` here, even though we know it will be `NOTIFIED`. This is 340 // because `unpark` may have been called again since we read `NOTIFIED` in the 341 // `compare_exchange` above. We must perform an acquire operation that synchronizes 342 // with that `unpark` to observe any writes it made before the call to `unpark`. To 343 // do that we must read from the write it made to `state`. 344 let old = self.state.swap(EMPTY, SeqCst); 345 assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); 346 return; 347 } 348 Err(n) => panic!("inconsistent park_timeout state: {}", n), 349 } 350 351 loop { 352 // Block the current thread on the conditional variable. 353 m = match deadline { 354 None => self.cvar.wait(m).unwrap(), 355 Some(deadline) => { 356 let now = Instant::now(); 357 if now < deadline { 358 // We could check for a timeout here, in the return value of wait_timeout, 359 // but in the case that a timeout and an unpark arrive simultaneously, we 360 // prefer to report the former. 361 self.cvar.wait_timeout(m, deadline - now).unwrap().0 362 } else { 363 // We've timed out; swap out the state back to empty on our way out 364 match self.state.swap(EMPTY, SeqCst) { 365 NOTIFIED | PARKED => return, 366 n => panic!("inconsistent park_timeout state: {}", n), 367 }; 368 } 369 } 370 }; 371 372 if self 373 .state 374 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) 375 .is_ok() 376 { 377 // got a notification 378 return; 379 } 380 381 // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught 382 // in the branch above, when we discover the deadline is in the past 383 } 384 } 385 unpark(&self)386 pub(crate) fn unpark(&self) { 387 // To ensure the unparked thread will observe any writes we made before this call, we must 388 // perform a release operation that `park` can synchronize with. To do that we must write 389 // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather 390 // than a compare-and-swap that returns if it reads `NOTIFIED` on failure. 391 match self.state.swap(NOTIFIED, SeqCst) { 392 EMPTY => return, // no one was waiting 393 NOTIFIED => return, // already unparked 394 PARKED => {} // gotta go wake someone up 395 _ => panic!("inconsistent state in unpark"), 396 } 397 398 // There is a period between when the parked thread sets `state` to `PARKED` (or last 399 // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`. 400 // If we were to notify during this period it would be ignored and then when the parked 401 // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this 402 // stage so we can acquire `lock` to wait until it is ready to receive the notification. 403 // 404 // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes 405 // it doesn't get woken only to have to wait for us to release `lock`. 406 drop(self.lock.lock().unwrap()); 407 self.cvar.notify_one(); 408 } 409 } 410