1 //! Abstraction over blocking and unblocking the current thread.
2 //!
3 //! Provides an abstraction over blocking the current thread. This is similar to
4 //! the park / unpark constructs provided by [`std`] but made generic. This
5 //! allows embedding custom functionality to perform when the thread is blocked.
6 //!
7 //! A blocked [`Park`][p] instance is unblocked by calling [`unpark`] on its
8 //! [`Unpark`][up] handle.
9 //!
10 //! The [`ParkThread`] struct implements [`Park`][p] using
11 //! [`thread::park`][`std`] to put the thread to sleep. The Tokio reactor also
12 //! implements park, but uses [`mio::Poll`][mio] to block the thread instead.
13 //!
14 //! The [`Park`][p] trait is composable. A timer implementation might decorate a
15 //! [`Park`][p] implementation by checking if any timeouts have elapsed after
16 //! the inner [`Park`][p] implementation unblocks.
17 //!
18 //! # Model
19 //!
20 //! Conceptually, each [`Park`][p] instance has an associated token, which is
21 //! initially not present:
22 //!
23 //! * The [`park`] method blocks the current thread unless or until the token
24 //!   is available, at which point it atomically consumes the token.
25 //! * The [`unpark`] method atomically makes the token available if it wasn't
26 //!   already.
27 //!
28 //! Some things to note:
29 //!
30 //! * If [`unpark`] is called before [`park`], the next call to [`park`] will
31 //! **not** block the thread.
32 //! * **Spurious** wakeups are permitted, i.e., the [`park`] method may unblock
33 //!   even if [`unpark`] was not called.
34 //! * [`park_timeout`] does the same as [`park`] but allows specifying a maximum
35 //!   time to block the thread for.
36 //!
37 //! [`std`]: https://doc.rust-lang.org/std/thread/fn.park.html
38 //! [`thread::park`]: https://doc.rust-lang.org/std/thread/fn.park.html
39 //! [`ParkThread`]: struct.ParkThread.html
40 //! [p]: trait.Park.html
41 //! [`park`]: trait.Park.html#tymethod.park
42 //! [`park_timeout`]: trait.Park.html#tymethod.park_timeout
43 //! [`unpark`]: trait.Unpark.html#tymethod.unpark
44 //! [up]: trait.Unpark.html
45 //! [mio]: https://docs.rs/mio/0.6/mio/struct.Poll.html
46 
47 use std::marker::PhantomData;
48 use std::rc::Rc;
49 use std::sync::Arc;
50 use std::time::Duration;
51 
52 use crossbeam_utils::sync::{Parker, Unparker};
53 
54 /// Block the current thread.
55 ///
56 /// See [module documentation][mod] for more details.
57 ///
58 /// [mod]: ../index.html
59 pub trait Park {
60     /// Unpark handle type for the `Park` implementation.
61     type Unpark: Unpark;
62 
63     /// Error returned by `park`
64     type Error;
65 
66     /// Get a new `Unpark` handle associated with this `Park` instance.
unpark(&self) -> Self::Unpark67     fn unpark(&self) -> Self::Unpark;
68 
69     /// Block the current thread unless or until the token is available.
70     ///
71     /// A call to `park` does not guarantee that the thread will remain blocked
72     /// forever, and callers should be prepared for this possibility. This
73     /// function may wakeup spuriously for any reason.
74     ///
75     /// See [module documentation][mod] for more details.
76     ///
77     /// # Panics
78     ///
79     /// This function **should** not panic, but ultimately, panics are left as
80     /// an implementation detail. Refer to the documentation for the specific
81     /// `Park` implementation
82     ///
83     /// [mod]: ../index.html
park(&mut self) -> Result<(), Self::Error>84     fn park(&mut self) -> Result<(), Self::Error>;
85 
86     /// Park the current thread for at most `duration`.
87     ///
88     /// This function is the same as `park` but allows specifying a maximum time
89     /// to block the thread for.
90     ///
91     /// Same as `park`, there is no guarantee that the thread will remain
92     /// blocked for any amount of time. Spurious wakeups are permitted for any
93     /// reason.
94     ///
95     /// See [module documentation][mod] for more details.
96     ///
97     /// # Panics
98     ///
99     /// This function **should** not panic, but ultimately, panics are left as
100     /// an implementation detail. Refer to the documentation for the specific
101     /// `Park` implementation
102     ///
103     /// [mod]: ../index.html
park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>104     fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>;
105 }
106 
107 /// Unblock a thread blocked by the associated [`Park`] instance.
108 ///
109 /// See [module documentation][mod] for more details.
110 ///
111 /// [mod]: ../index.html
112 /// [`Park`]: trait.Park.html
113 pub trait Unpark: Sync + Send + 'static {
114     /// Unblock a thread that is blocked by the associated `Park` handle.
115     ///
116     /// Calling `unpark` atomically makes available the unpark token, if it is
117     /// not already available.
118     ///
119     /// See [module documentation][mod] for more details.
120     ///
121     /// # Panics
122     ///
123     /// This function **should** not panic, but ultimately, panics are left as
124     /// an implementation detail. Refer to the documentation for the specific
125     /// `Unpark` implementation
126     ///
127     /// [mod]: ../index.html
unpark(&self)128     fn unpark(&self);
129 }
130 
131 impl Unpark for Box<dyn Unpark> {
unpark(&self)132     fn unpark(&self) {
133         (**self).unpark()
134     }
135 }
136 
137 impl Unpark for Arc<dyn Unpark> {
unpark(&self)138     fn unpark(&self) {
139         (**self).unpark()
140     }
141 }
142 
143 /// Blocks the current thread using a condition variable.
144 ///
145 /// Implements the [`Park`] functionality by using a condition variable. An
146 /// atomic variable is also used to avoid using the condition variable if
147 /// possible.
148 ///
149 /// The condition variable is cached in a thread-local variable and is shared
150 /// across all `ParkThread` instances created on the same thread. This also
151 /// means that an instance of `ParkThread` might be unblocked by a handle
152 /// associated with a different `ParkThread` instance.
153 #[derive(Debug)]
154 pub struct ParkThread {
155     _anchor: PhantomData<Rc<()>>,
156 }
157 
158 /// Error returned by [`ParkThread`]
159 ///
160 /// This currently is never returned, but might at some point in the future.
161 ///
162 /// [`ParkThread`]: struct.ParkThread.html
163 #[derive(Debug)]
164 pub struct ParkError {
165     _p: (),
166 }
167 
168 /// Unblocks a thread that was blocked by `ParkThread`.
169 #[derive(Clone, Debug)]
170 pub struct UnparkThread {
171     inner: Unparker,
172 }
173 
174 thread_local! {
175     static CURRENT_PARKER: Parker = Parker::new();
176 }
177 
178 // ===== impl ParkThread =====
179 
180 impl ParkThread {
181     /// Create a new `ParkThread` handle for the current thread.
182     ///
183     /// This type cannot be moved to other threads, so it should be created on
184     /// the thread that the caller intends to park.
new() -> ParkThread185     pub fn new() -> ParkThread {
186         ParkThread {
187             _anchor: PhantomData,
188         }
189     }
190 
191     /// Get a reference to the `ParkThread` handle for this thread.
with_current<F, R>(&self, f: F) -> R where F: FnOnce(&Parker) -> R,192     fn with_current<F, R>(&self, f: F) -> R
193     where
194         F: FnOnce(&Parker) -> R,
195     {
196         CURRENT_PARKER.with(|inner| f(inner))
197     }
198 }
199 
200 impl Park for ParkThread {
201     type Unpark = UnparkThread;
202     type Error = ParkError;
203 
unpark(&self) -> Self::Unpark204     fn unpark(&self) -> Self::Unpark {
205         let inner = self.with_current(|inner| inner.unparker().clone());
206         UnparkThread { inner }
207     }
208 
park(&mut self) -> Result<(), Self::Error>209     fn park(&mut self) -> Result<(), Self::Error> {
210         self.with_current(|inner| inner.park());
211         Ok(())
212     }
213 
park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>214     fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
215         self.with_current(|inner| inner.park_timeout(duration));
216         Ok(())
217     }
218 }
219 
220 // ===== impl UnparkThread =====
221 
222 impl Unpark for UnparkThread {
unpark(&self)223     fn unpark(&self) {
224         self.inner.unpark();
225     }
226 }
227