1 //! Parks the runtime.
2 //!
3 //! A combination of the various resource driver park handles.
4 
5 use crate::loom::sync::atomic::AtomicUsize;
6 use crate::loom::sync::{Arc, Condvar, Mutex};
7 use crate::loom::thread;
8 use crate::park::{Park, Unpark};
9 use crate::runtime::time;
10 use crate::util::TryLock;
11 
12 use std::sync::atomic::Ordering::SeqCst;
13 use std::time::Duration;
14 
15 pub(crate) struct Parker {
16     inner: Arc<Inner>,
17 }
18 
19 pub(crate) struct Unparker {
20     inner: Arc<Inner>,
21 }
22 
23 struct Inner {
24     /// Avoids entering the park if possible
25     state: AtomicUsize,
26 
27     /// Used to coordinate access to the driver / condvar
28     mutex: Mutex<()>,
29 
30     /// Condvar to block on if the driver is unavailable.
31     condvar: Condvar,
32 
33     /// Resource (I/O, time, ...) driver
34     shared: Arc<Shared>,
35 }
36 
37 const EMPTY: usize = 0;
38 const PARKED_CONDVAR: usize = 1;
39 const PARKED_DRIVER: usize = 2;
40 const NOTIFIED: usize = 3;
41 
42 /// Shared across multiple Parker handles
43 struct Shared {
44     /// Shared driver. Only one thread at a time can use this
45     driver: TryLock<time::Driver>,
46 
47     /// Unpark handle
48     handle: <time::Driver as Park>::Unpark,
49 }
50 
51 impl Parker {
new(driver: time::Driver) -> Parker52     pub(crate) fn new(driver: time::Driver) -> Parker {
53         let handle = driver.unpark();
54 
55         Parker {
56             inner: Arc::new(Inner {
57                 state: AtomicUsize::new(EMPTY),
58                 mutex: Mutex::new(()),
59                 condvar: Condvar::new(),
60                 shared: Arc::new(Shared {
61                     driver: TryLock::new(driver),
62                     handle,
63                 }),
64             }),
65         }
66     }
67 }
68 
69 impl Clone for Parker {
clone(&self) -> Parker70     fn clone(&self) -> Parker {
71         Parker {
72             inner: Arc::new(Inner {
73                 state: AtomicUsize::new(EMPTY),
74                 mutex: Mutex::new(()),
75                 condvar: Condvar::new(),
76                 shared: self.inner.shared.clone(),
77             }),
78         }
79     }
80 }
81 
82 impl Park for Parker {
83     type Unpark = Unparker;
84     type Error = ();
85 
unpark(&self) -> Unparker86     fn unpark(&self) -> Unparker {
87         Unparker {
88             inner: self.inner.clone(),
89         }
90     }
91 
park(&mut self) -> Result<(), Self::Error>92     fn park(&mut self) -> Result<(), Self::Error> {
93         self.inner.park();
94         Ok(())
95     }
96 
park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>97     fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
98         // Only parking with zero is supported...
99         assert_eq!(duration, Duration::from_millis(0));
100 
101         if let Some(mut driver) = self.inner.shared.driver.try_lock() {
102             driver.park_timeout(duration).map_err(|_| ())
103         } else {
104             Ok(())
105         }
106     }
107 
shutdown(&mut self)108     fn shutdown(&mut self) {
109         self.inner.shutdown();
110     }
111 }
112 
113 impl Unpark for Unparker {
unpark(&self)114     fn unpark(&self) {
115         self.inner.unpark();
116     }
117 }
118 
119 impl Inner {
120     /// Parks the current thread for at most `dur`.
park(&self)121     fn park(&self) {
122         for _ in 0..3 {
123             // If we were previously notified then we consume this notification and
124             // return quickly.
125             if self
126                 .state
127                 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
128                 .is_ok()
129             {
130                 return;
131             }
132 
133             thread::yield_now();
134         }
135 
136         if let Some(mut driver) = self.shared.driver.try_lock() {
137             self.park_driver(&mut driver);
138         } else {
139             self.park_condvar();
140         }
141     }
142 
park_condvar(&self)143     fn park_condvar(&self) {
144         // Otherwise we need to coordinate going to sleep
145         let mut m = self.mutex.lock().unwrap();
146 
147         match self
148             .state
149             .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst)
150         {
151             Ok(_) => {}
152             Err(NOTIFIED) => {
153                 // We must read here, even though we know it will be `NOTIFIED`.
154                 // This is because `unpark` may have been called again since we read
155                 // `NOTIFIED` in the `compare_exchange` above. We must perform an
156                 // acquire operation that synchronizes with that `unpark` to observe
157                 // any writes it made before the call to unpark. To do that we must
158                 // read from the write it made to `state`.
159                 let old = self.state.swap(EMPTY, SeqCst);
160                 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
161 
162                 return;
163             }
164             Err(actual) => panic!("inconsistent park state; actual = {}", actual),
165         }
166 
167         loop {
168             m = self.condvar.wait(m).unwrap();
169 
170             if self
171                 .state
172                 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
173                 .is_ok()
174             {
175                 // got a notification
176                 return;
177             }
178 
179             // spurious wakeup, go back to sleep
180         }
181     }
182 
park_driver(&self, driver: &mut time::Driver)183     fn park_driver(&self, driver: &mut time::Driver) {
184         match self
185             .state
186             .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
187         {
188             Ok(_) => {}
189             Err(NOTIFIED) => {
190                 // We must read here, even though we know it will be `NOTIFIED`.
191                 // This is because `unpark` may have been called again since we read
192                 // `NOTIFIED` in the `compare_exchange` above. We must perform an
193                 // acquire operation that synchronizes with that `unpark` to observe
194                 // any writes it made before the call to unpark. To do that we must
195                 // read from the write it made to `state`.
196                 let old = self.state.swap(EMPTY, SeqCst);
197                 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
198 
199                 return;
200             }
201             Err(actual) => panic!("inconsistent park state; actual = {}", actual),
202         }
203 
204         // TODO: don't unwrap
205         driver.park().unwrap();
206 
207         match self.state.swap(EMPTY, SeqCst) {
208             NOTIFIED => {}      // got a notification, hurray!
209             PARKED_DRIVER => {} // no notification, alas
210             n => panic!("inconsistent park_timeout state: {}", n),
211         }
212     }
213 
unpark(&self)214     fn unpark(&self) {
215         // To ensure the unparked thread will observe any writes we made before
216         // this call, we must perform a release operation that `park` can
217         // synchronize with. To do that we must write `NOTIFIED` even if `state`
218         // is already `NOTIFIED`. That is why this must be a swap rather than a
219         // compare-and-swap that returns if it reads `NOTIFIED` on failure.
220         match self.state.swap(NOTIFIED, SeqCst) {
221             EMPTY => {}    // no one was waiting
222             NOTIFIED => {} // already unparked
223             PARKED_CONDVAR => self.unpark_condvar(),
224             PARKED_DRIVER => self.unpark_driver(),
225             actual => panic!("inconsistent state in unpark; actual = {}", actual),
226         }
227     }
228 
unpark_condvar(&self)229     fn unpark_condvar(&self) {
230         // There is a period between when the parked thread sets `state` to
231         // `PARKED` (or last checked `state` in the case of a spurious wake
232         // up) and when it actually waits on `cvar`. If we were to notify
233         // during this period it would be ignored and then when the parked
234         // thread went to sleep it would never wake up. Fortunately, it has
235         // `lock` locked at this stage so we can acquire `lock` to wait until
236         // it is ready to receive the notification.
237         //
238         // Releasing `lock` before the call to `notify_one` means that when the
239         // parked thread wakes it doesn't get woken only to have to wait for us
240         // to release `lock`.
241         drop(self.mutex.lock().unwrap());
242 
243         self.condvar.notify_one()
244     }
245 
unpark_driver(&self)246     fn unpark_driver(&self) {
247         self.shared.handle.unpark();
248     }
249 
shutdown(&self)250     fn shutdown(&self) {
251         if let Some(mut driver) = self.shared.driver.try_lock() {
252             driver.shutdown();
253         }
254 
255         self.condvar.notify_all();
256     }
257 }
258