1 use std::fmt;
2 use std::marker::PhantomData;
3 use std::sync::atomic::AtomicUsize;
4 use std::sync::atomic::Ordering::SeqCst;
5 use std::sync::{Arc, Condvar, Mutex};
6 use std::time::Duration;
7 
8 /// A thread parking primitive.
9 ///
10 /// Conceptually, each `Parker` has an associated token which is initially not present:
11 ///
12 /// * The [`park`] method blocks the current thread unless or until the token is available, at
13 ///   which point it automatically consumes the token. It may also return *spuriously*, without
14 ///   consuming the token.
15 ///
16 /// * The [`park_timeout`] method works the same as [`park`], but blocks for a specified maximum
17 ///   time.
18 ///
19 /// * The [`unpark`] method atomically makes the token available if it wasn't already. Because the
20 ///   token is initially absent, [`unpark`] followed by [`park`] will result in the second call
21 ///   returning immediately.
22 ///
23 /// In other words, each `Parker` acts a bit like a spinlock that can be locked and unlocked using
24 /// [`park`] and [`unpark`].
25 ///
26 /// # Examples
27 ///
28 /// ```
29 /// use std::thread;
30 /// use std::time::Duration;
31 /// use crossbeam_utils::sync::Parker;
32 ///
33 /// let p = Parker::new();
34 /// let u = p.unparker().clone();
35 ///
36 /// // Make the token available.
37 /// u.unpark();
38 /// // Wakes up immediately and consumes the token.
39 /// p.park();
40 ///
41 /// thread::spawn(move || {
42 ///     thread::sleep(Duration::from_millis(500));
43 ///     u.unpark();
44 /// });
45 ///
46 /// // Wakes up when `u.unpark()` provides the token, but may also wake up
47 /// // spuriously before that without consuming the token.
48 /// p.park();
49 /// ```
50 ///
51 /// [`park`]: Parker::park
52 /// [`park_timeout`]: Parker::park_timeout
53 /// [`unpark`]: Unparker::unpark
54 pub struct Parker {
55     unparker: Unparker,
56     _marker: PhantomData<*const ()>,
57 }
58 
59 unsafe impl Send for Parker {}
60 
61 impl Default for Parker {
default() -> Self62     fn default() -> Self {
63         Self {
64             unparker: Unparker {
65                 inner: Arc::new(Inner {
66                     state: AtomicUsize::new(EMPTY),
67                     lock: Mutex::new(()),
68                     cvar: Condvar::new(),
69                 }),
70             },
71             _marker: PhantomData,
72         }
73     }
74 }
75 
76 impl Parker {
77     /// Creates a new `Parker`.
78     ///
79     /// # Examples
80     ///
81     /// ```
82     /// use crossbeam_utils::sync::Parker;
83     ///
84     /// let p = Parker::new();
85     /// ```
86     ///
new() -> Parker87     pub fn new() -> Parker {
88         Self::default()
89     }
90 
91     /// Blocks the current thread until the token is made available.
92     ///
93     /// A call to `park` may wake up spuriously without consuming the token, and callers should be
94     /// prepared for this possibility.
95     ///
96     /// # Examples
97     ///
98     /// ```
99     /// use crossbeam_utils::sync::Parker;
100     ///
101     /// let p = Parker::new();
102     /// let u = p.unparker().clone();
103     ///
104     /// // Make the token available.
105     /// u.unpark();
106     ///
107     /// // Wakes up immediately and consumes the token.
108     /// p.park();
109     /// ```
park(&self)110     pub fn park(&self) {
111         self.unparker.inner.park(None);
112     }
113 
114     /// Blocks the current thread until the token is made available, but only for a limited time.
115     ///
116     /// A call to `park_timeout` may wake up spuriously without consuming the token, and callers
117     /// should be prepared for this possibility.
118     ///
119     /// # Examples
120     ///
121     /// ```
122     /// use std::time::Duration;
123     /// use crossbeam_utils::sync::Parker;
124     ///
125     /// let p = Parker::new();
126     ///
127     /// // Waits for the token to become available, but will not wait longer than 500 ms.
128     /// p.park_timeout(Duration::from_millis(500));
129     /// ```
park_timeout(&self, timeout: Duration)130     pub fn park_timeout(&self, timeout: Duration) {
131         self.unparker.inner.park(Some(timeout));
132     }
133 
134     /// Returns a reference to an associated [`Unparker`].
135     ///
136     /// The returned [`Unparker`] doesn't have to be used by reference - it can also be cloned.
137     ///
138     /// # Examples
139     ///
140     /// ```
141     /// use crossbeam_utils::sync::Parker;
142     ///
143     /// let p = Parker::new();
144     /// let u = p.unparker().clone();
145     ///
146     /// // Make the token available.
147     /// u.unpark();
148     /// // Wakes up immediately and consumes the token.
149     /// p.park();
150     /// ```
151     ///
152     /// [`park`]: Parker::park
153     /// [`park_timeout`]: Parker::park_timeout
unparker(&self) -> &Unparker154     pub fn unparker(&self) -> &Unparker {
155         &self.unparker
156     }
157 
158     /// Converts a `Parker` into a raw pointer.
159     ///
160     /// # Examples
161     ///
162     /// ```
163     /// use crossbeam_utils::sync::Parker;
164     ///
165     /// let p = Parker::new();
166     /// let raw = Parker::into_raw(p);
167     /// ```
into_raw(this: Parker) -> *const ()168     pub fn into_raw(this: Parker) -> *const () {
169         Unparker::into_raw(this.unparker)
170     }
171 
172     /// Converts a raw pointer into a `Parker`.
173     ///
174     /// # Safety
175     ///
176     /// This method is safe to use only with pointers returned by [`Parker::into_raw`].
177     ///
178     /// # Examples
179     ///
180     /// ```
181     /// use crossbeam_utils::sync::Parker;
182     ///
183     /// let p = Parker::new();
184     /// let raw = Parker::into_raw(p);
185     /// let p = unsafe { Parker::from_raw(raw) };
186     /// ```
from_raw(ptr: *const ()) -> Parker187     pub unsafe fn from_raw(ptr: *const ()) -> Parker {
188         Parker {
189             unparker: Unparker::from_raw(ptr),
190             _marker: PhantomData,
191         }
192     }
193 }
194 
195 impl fmt::Debug for Parker {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result196     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
197         f.pad("Parker { .. }")
198     }
199 }
200 
201 /// Unparks a thread parked by the associated [`Parker`].
202 pub struct Unparker {
203     inner: Arc<Inner>,
204 }
205 
206 unsafe impl Send for Unparker {}
207 unsafe impl Sync for Unparker {}
208 
209 impl Unparker {
210     /// Atomically makes the token available if it is not already.
211     ///
212     /// This method will wake up the thread blocked on [`park`] or [`park_timeout`], if there is
213     /// any.
214     ///
215     /// # Examples
216     ///
217     /// ```
218     /// use std::thread;
219     /// use std::time::Duration;
220     /// use crossbeam_utils::sync::Parker;
221     ///
222     /// let p = Parker::new();
223     /// let u = p.unparker().clone();
224     ///
225     /// thread::spawn(move || {
226     ///     thread::sleep(Duration::from_millis(500));
227     ///     u.unpark();
228     /// });
229     ///
230     /// // Wakes up when `u.unpark()` provides the token, but may also wake up
231     /// // spuriously before that without consuming the token.
232     /// p.park();
233     /// ```
234     ///
235     /// [`park`]: Parker::park
236     /// [`park_timeout`]: Parker::park_timeout
unpark(&self)237     pub fn unpark(&self) {
238         self.inner.unpark()
239     }
240 
241     /// Converts an `Unparker` into a raw pointer.
242     ///
243     /// # Examples
244     ///
245     /// ```
246     /// use crossbeam_utils::sync::{Parker, Unparker};
247     ///
248     /// let p = Parker::new();
249     /// let u = p.unparker().clone();
250     /// let raw = Unparker::into_raw(u);
251     /// ```
into_raw(this: Unparker) -> *const ()252     pub fn into_raw(this: Unparker) -> *const () {
253         Arc::into_raw(this.inner) as *const ()
254     }
255 
256     /// Converts a raw pointer into an `Unparker`.
257     ///
258     /// # Safety
259     ///
260     /// This method is safe to use only with pointers returned by [`Unparker::into_raw`].
261     ///
262     /// # Examples
263     ///
264     /// ```
265     /// use crossbeam_utils::sync::{Parker, Unparker};
266     ///
267     /// let p = Parker::new();
268     /// let u = p.unparker().clone();
269     ///
270     /// let raw = Unparker::into_raw(u);
271     /// let u = unsafe { Unparker::from_raw(raw) };
272     /// ```
from_raw(ptr: *const ()) -> Unparker273     pub unsafe fn from_raw(ptr: *const ()) -> Unparker {
274         Unparker {
275             inner: Arc::from_raw(ptr as *const Inner),
276         }
277     }
278 }
279 
280 impl fmt::Debug for Unparker {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result281     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
282         f.pad("Unparker { .. }")
283     }
284 }
285 
286 impl Clone for Unparker {
clone(&self) -> Unparker287     fn clone(&self) -> Unparker {
288         Unparker {
289             inner: self.inner.clone(),
290         }
291     }
292 }
293 
294 const EMPTY: usize = 0;
295 const PARKED: usize = 1;
296 const NOTIFIED: usize = 2;
297 
298 struct Inner {
299     state: AtomicUsize,
300     lock: Mutex<()>,
301     cvar: Condvar,
302 }
303 
304 impl Inner {
park(&self, timeout: Option<Duration>)305     fn park(&self, timeout: Option<Duration>) {
306         // If we were previously notified then we consume this notification and return quickly.
307         if self
308             .state
309             .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
310             .is_ok()
311         {
312             return;
313         }
314 
315         // If the timeout is zero, then there is no need to actually block.
316         if let Some(ref dur) = timeout {
317             if *dur == Duration::from_millis(0) {
318                 return;
319             }
320         }
321 
322         // Otherwise we need to coordinate going to sleep.
323         let mut m = self.lock.lock().unwrap();
324 
325         match self.state.compare_exchange(EMPTY, PARKED, SeqCst, SeqCst) {
326             Ok(_) => {}
327             // Consume this notification to avoid spurious wakeups in the next park.
328             Err(NOTIFIED) => {
329                 // We must read `state` here, even though we know it will be `NOTIFIED`. This is
330                 // because `unpark` may have been called again since we read `NOTIFIED` in the
331                 // `compare_exchange` above. We must perform an acquire operation that synchronizes
332                 // with that `unpark` to observe any writes it made before the call to `unpark`. To
333                 // do that we must read from the write it made to `state`.
334                 let old = self.state.swap(EMPTY, SeqCst);
335                 assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
336                 return;
337             }
338             Err(n) => panic!("inconsistent park_timeout state: {}", n),
339         }
340 
341         match timeout {
342             None => {
343                 loop {
344                     // Block the current thread on the conditional variable.
345                     m = self.cvar.wait(m).unwrap();
346 
347                     if self
348                         .state
349                         .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
350                         .is_ok()
351                     {
352                         // got a notification
353                         return;
354                     }
355 
356                     // spurious wakeup, go back to sleep
357                 }
358             }
359             Some(timeout) => {
360                 // Wait with a timeout, and if we spuriously wake up or otherwise wake up from a
361                 // notification we just want to unconditionally set `state` back to `EMPTY`, either
362                 // consuming a notification or un-flagging ourselves as parked.
363                 let (_m, _result) = self.cvar.wait_timeout(m, timeout).unwrap();
364 
365                 match self.state.swap(EMPTY, SeqCst) {
366                     NOTIFIED => {} // got a notification
367                     PARKED => {}   // no notification
368                     n => panic!("inconsistent park_timeout state: {}", n),
369                 }
370             }
371         }
372     }
373 
unpark(&self)374     pub fn unpark(&self) {
375         // To ensure the unparked thread will observe any writes we made before this call, we must
376         // perform a release operation that `park` can synchronize with. To do that we must write
377         // `NOTIFIED` even if `state` is already `NOTIFIED`. That is why this must be a swap rather
378         // than a compare-and-swap that returns if it reads `NOTIFIED` on failure.
379         match self.state.swap(NOTIFIED, SeqCst) {
380             EMPTY => return,    // no one was waiting
381             NOTIFIED => return, // already unparked
382             PARKED => {}        // gotta go wake someone up
383             _ => panic!("inconsistent state in unpark"),
384         }
385 
386         // There is a period between when the parked thread sets `state` to `PARKED` (or last
387         // checked `state` in the case of a spurious wakeup) and when it actually waits on `cvar`.
388         // If we were to notify during this period it would be ignored and then when the parked
389         // thread went to sleep it would never wake up. Fortunately, it has `lock` locked at this
390         // stage so we can acquire `lock` to wait until it is ready to receive the notification.
391         //
392         // Releasing `lock` before the call to `notify_one` means that when the parked thread wakes
393         // it doesn't get woken only to have to wait for us to release `lock`.
394         drop(self.lock.lock().unwrap());
395         self.cvar.notify_one();
396     }
397 }
398