1 use crate::primitive::sync::atomic::AtomicUsize;
2 use crate::primitive::sync::{Arc, Condvar, Mutex};
3 use core::sync::atomic::Ordering::SeqCst;
4 use std::fmt;
5 use std::marker::PhantomData;
6 use std::time::{Duration, Instant};
7 
8 /// A thread parking primitive.
9 ///
10 /// Conceptually, each `Parker` has an associated token which is initially not present:
11 ///
12 /// * The [`park`] method blocks the current thread unless or until the token is available, at
13 ///   which point it automatically consumes the token.
14 ///
15 /// * The [`park_timeout`] and [`park_deadline`] methods work the same as [`park`], but block for
16 ///   a specified maximum time.
17 ///
18 /// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
19 ///   token is initially absent, [`unpark`] followed by [`park`] will result in the second call
20 ///   returning immediately.
21 ///
22 /// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using
23 /// [`park`] and [`unpark`].
24 ///
25 /// # Examples
26 ///
27 /// ```
28 /// use std::thread;
29 /// use std::time::Duration;
30 /// use crossbeam_utils::sync::Parker;
31 ///
32 /// let p = Parker::new();
33 /// let u = p.unparker().clone();
34 ///
35 /// // Make the token available.
36 /// u.unpark();
37 /// // Wakes up immediately and consumes the token.
38 /// p.park();
39 ///
40 /// thread::spawn(move || {
41 ///     thread::sleep(Duration::from_millis(500));
42 ///     u.unpark();
43 /// });
44 ///
45 /// // Wakes up when `u.unpark()` provides the token.
46 /// p.park();
47 /// ```
48 ///
49 /// [`park`]: Parker::park
50 /// [`park_timeout`]: Parker::park_timeout
51 /// [`park_deadline`]: Parker::park_deadline
52 /// [`unpark`]: Unparker::unpark
53 pub struct Parker {
54     unparker: Unparker,
55     _marker: PhantomData<*const ()>,
56 }
57 
58 unsafe impl Send for Parker {}
59 
60 impl Default for Parker {
default() -> Self61     fn default() -> Self {
62         Self {
63             unparker: Unparker {
64                 inner: Arc::new(Inner {
65                     state: AtomicUsize::new(EMPTY),
66                     lock: Mutex::new(()),
67                     cvar: Condvar::new(),
68                 }),
69             },
70             _marker: PhantomData,
71         }
72     }
73 }
74 
75 impl Parker {
76     /// Creates a new `Parker`.
77     ///
78     /// # Examples
79     ///
80     /// ```
81     /// use crossbeam_utils::sync::Parker;
82     ///
83     /// let p = Parker::new();
84     /// ```
85     ///
new() -> Parker86     pub fn new() -> Parker {
87         Self::default()
88     }
89 
90     /// Blocks the current thread until the token is made available.
91     ///
92     /// # Examples
93     ///
94     /// ```
95     /// use crossbeam_utils::sync::Parker;
96     ///
97     /// let p = Parker::new();
98     /// let u = p.unparker().clone();
99     ///
100     /// // Make the token available.
101     /// u.unpark();
102     ///
103     /// // Wakes up immediately and consumes the token.
104     /// p.park();
105     /// ```
park(&self)106     pub fn park(&self) {
107         self.unparker.inner.park(None);
108     }
109 
110     /// Blocks the current thread until the token is made available, but only for a limited time.
111     ///
112     /// # Examples
113     ///
114     /// ```
115     /// use std::time::Duration;
116     /// use crossbeam_utils::sync::Parker;
117     ///
118     /// let p = Parker::new();
119     ///
120     /// // Waits for the token to become available, but will not wait longer than 500 ms.
121     /// p.park_timeout(Duration::from_millis(500));
122     /// ```
park_timeout(&self, timeout: Duration)123     pub fn park_timeout(&self, timeout: Duration) {
124         self.park_deadline(Instant::now() + timeout)
125     }
126 
127     /// Blocks the current thread until the token is made available, or until a certain deadline.
128     ///
129     /// # Examples
130     ///
131     /// ```
132     /// use std::time::{Duration, Instant};
133     /// use crossbeam_utils::sync::Parker;
134     ///
135     /// let p = Parker::new();
136     /// let deadline = Instant::now() + Duration::from_millis(500);
137     ///
138     /// // Waits for the token to become available, but will not wait longer than 500 ms.
139     /// p.park_deadline(deadline);
140     /// ```
park_deadline(&self, deadline: Instant)141     pub fn park_deadline(&self, deadline: Instant) {
142         self.unparker.inner.park(Some(deadline))
143     }
144 
145     /// Returns a reference to an associated [`Unparker`].
146     ///
147     /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned.
148     ///
149     /// # Examples
150     ///
151     /// ```
152     /// use crossbeam_utils::sync::Parker;
153     ///
154     /// let p = Parker::new();
155     /// let u = p.unparker().clone();
156     ///
157     /// // Make the token available.
158     /// u.unpark();
159     /// // Wakes up immediately and consumes the token.
160     /// p.park();
161     /// ```
162     ///
163     /// [`park`]: Parker::park
164     /// [`park_timeout`]: Parker::park_timeout
unparker(&self) -> &Unparker165     pub fn unparker(&self) -> &Unparker {
166         &self.unparker
167     }
168 
169     /// Converts a `Parker` into a raw pointer.
170     ///
171     /// # Examples
172     ///
173     /// ```
174     /// use crossbeam_utils::sync::Parker;
175     ///
176     /// let p = Parker::new();
177     /// let raw = Parker::into_raw(p);
178     /// ```
into_raw(this: Parker) -> *const ()179     pub fn into_raw(this: Parker) -> *const () {
180         Unparker::into_raw(this.unparker)
181     }
182 
183     /// Converts a raw pointer into a `Parker`.
184     ///
185     /// # Safety
186     ///
187     /// This method is safe to use only with pointers returned by [`Parker::into_raw`].
188     ///
189     /// # Examples
190     ///
191     /// ```
192     /// use crossbeam_utils::sync::Parker;
193     ///
194     /// let p = Parker::new();
195     /// let raw = Parker::into_raw(p);
196     /// let p = unsafe { Parker::from_raw(raw) };
197     /// ```
from_raw(ptr: *const ()) -> Parker198     pub unsafe fn from_raw(ptr: *const ()) -> Parker {
199         Parker {
200             unparker: Unparker::from_raw(ptr),
201             _marker: PhantomData,
202         }
203     }
204 }
205 
206 impl fmt::Debug for Parker {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result207     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
208         f.pad("Parker { .. }")
209     }
210 }
211 
212 /// Unparks a thread parked by the associated [`Parker`].
213 pub struct Unparker {
214     inner: Arc<Inner>,
215 }
216 
217 unsafe impl Send for Unparker {}
218 unsafe impl Sync for Unparker {}
219 
220 impl Unparker {
221     /// Atomically makes the token available if it is not already.
222     ///
223     /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is
224     /// any.
225     ///
226     /// # Examples
227     ///
228     /// ```
229     /// use std::thread;
230     /// use std::time::Duration;
231     /// use crossbeam_utils::sync::Parker;
232     ///
233     /// let p = Parker::new();
234     /// let u = p.unparker().clone();
235     ///
236     /// thread::spawn(move || {
237     ///     thread::sleep(Duration::from_millis(500));
238     ///     u.unpark();
239     /// });
240     ///
241     /// // Wakes up when `u.unpark()` provides the token.
242     /// p.park();
243     /// ```
244     ///
245     /// [`park`]: Parker::park
246     /// [`park_timeout`]: Parker::park_timeout
unpark(&self)247     pub fn unpark(&self) {
248         self.inner.unpark()
249     }
250 
251     /// Converts an `Unparker` into a raw pointer.
252     ///
253     /// # Examples
254     ///
255     /// ```
256     /// use crossbeam_utils::sync::{Parker, Unparker};
257     ///
258     /// let p = Parker::new();
259     /// let u = p.unparker().clone();
260     /// let raw = Unparker::into_raw(u);
261     /// ```
into_raw(this: Unparker) -> *const ()262     pub fn into_raw(this: Unparker) -> *const () {
263         Arc::into_raw(this.inner) as *const ()
264     }
265 
266     /// Converts a raw pointer into an `Unparker`.
267     ///
268     /// # Safety
269     ///
270     /// This method is safe to use only with pointers returned by [`Unparker::into_raw`].
271     ///
272     /// # Examples
273     ///
274     /// ```
275     /// use crossbeam_utils::sync::{Parker, Unparker};
276     ///
277     /// let p = Parker::new();
278     /// let u = p.unparker().clone();
279     ///
280     /// let raw = Unparker::into_raw(u);
281     /// let u = unsafe { Unparker::from_raw(raw) };
282     /// ```
from_raw(ptr: *const ()) -> Unparker283     pub unsafe fn from_raw(ptr: *const ()) -> Unparker {
284         Unparker {
285             inner: Arc::from_raw(ptr as *const Inner),
286         }
287     }
288 }
289 
290 impl fmt::Debug for Unparker {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result291     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
292         f.pad("Unparker { .. }")
293     }
294 }
295 
296 impl Clone for Unparker {
clone(&self) -> Unparker297     fn clone(&self) -> Unparker {
298         Unparker {
299             inner: self.inner.clone(),
300         }
301     }
302 }
303 
304 const EMPTY: usize = 0;
305 const PARKED: usize = 1;
306 const NOTIFIED: usize = 2;
307 
308 struct Inner {
309     state: AtomicUsize,
310     lock: Mutex<()>,
311     cvar: Condvar,
312 }
313 
314 impl Inner {
park(&self, deadline: Option<Instant>)315     fn park(&self, deadline: Option<Instant>) {
316         // If we were previously notified then we consume this notification and return quickly.
317         if self
318             .state
319             .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
320             .is_ok()
321         {
322             return;
323         }
324 
325         // If the timeout is zero, then there is no need to actually block.
326         if let Some(deadline) = deadline {
327             if deadline <= Instant::now() {
328                 return;
329             }
330         }
331 
332         // Otherwise we need to coordinate going to sleep.
333         let mut m = self.lock.lock().unwrap();
334 
335         match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
336             Ok(_) => {}
337             // Consume this notification to avoid spurious wakeups in the next park.
338             Err(NOTIFIED) => {
339                 // We must read `state` here, even though we know it will be `NOTIFIED`. This is
340                 // because `unpark` may have been called again since we read `NOTIFIED` in the
341                 // `compare_exchange` above. We must perform an acquire operation that synchronizes
342                 // with that `unpark` to observe any writes it made before the call to `unpark`. To
343                 // do that we must read from the write it made to `state`.
344                 let old = self.state.swap(EMPTY, SeqCst);
345                 assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
346                 return;
347             }
348             Err(n) => panic!("inconsistent park_timeout state: {}", n),
349         }
350 
351         loop {
352             // Block the current thread on the conditional variable.
353             m = match deadline {
354                 None => self.cvar.wait(m).unwrap(),
355                 Some(deadline) => {
356                     let now = Instant::now();
357                     if now < deadline {
358                         // We could check for a timeout here, in the return value of wait_timeout,
359                         // but in the case that a timeout and an unpark arrive simultaneously, we
360                         // prefer to report the former.
361                         self.cvar.wait_timeout(m, deadline - now).unwrap().0
362                     } else {
363                         // We've timed out; swap out the state back to empty on our way out
364                         match self.state.swap(EMPTY, SeqCst) {
365                             NOTIFIED | PARKED => return,
366                             n => panic!("inconsistent park_timeout state: {}", n),
367                         };
368                     }
369                 }
370             };
371 
372             if self
373                 .state
374                 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
375                 .is_ok()
376             {
377                 // got a notification
378                 return;
379             }
380 
381             // Spurious wakeup, go back to sleep. Alternatively, if we timed out, it will be caught
382             // in the branch above, when we discover the deadline is in the past
383         }
384     }
385 
unpark(&self)386     pub(crate) fn unpark(&self) {
387         // To ensure the unparked thread will observe any writes we made before this call, we must
388         // perform a release operation that `park` can synchronize with. To do that we must write
389         // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
390         // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
391         match self.state.swap(NOTIFIED, SeqCst) {
392             EMPTY => return,    // no one was waiting
393             NOTIFIED => return, // already unparked
394             PARKED => {}        // gotta go wake someone up
395             _ => panic!("inconsistent state in unpark"),
396         }
397 
398         // There is a period between when the parked thread sets `state` to `PARKED` (or last
399         // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
400         // If we were to notify during this period it would be ignored and then when the parked
401         // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
402         // stage so we can acquire `lock` to wait until it is ready to receive the notification.
403         //
404         // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
405         // it doesn't get woken only to have to wait for us to release `lock`.
406         drop(self.lock.lock().unwrap());
407         self.cvar.notify_one();
408     }
409 }
410