1 //! Parks the runtime.
2 //!
3 //! A combination of the various resource driver park handles.
4 
5 use crate::loom::sync::atomic::AtomicUsize;
6 use crate::loom::sync::{Arc, Condvar, Mutex};
7 use crate::loom::thread;
8 use crate::park::{Park, Unpark};
9 use crate::runtime::time;
10 use crate::util::TryLock;
11 
12 use std::sync::atomic::Ordering::SeqCst;
13 use std::time::Duration;
14 
15 pub(crate) struct Parker {
16     inner: Arc<Inner>,
17 }
18 
19 pub(crate) struct Unparker {
20     inner: Arc<Inner>,
21 }
22 
23 struct Inner {
24     /// Avoids entering the park if possible
25     state: AtomicUsize,
26 
27     /// Used to coordinate access to the driver / condvar
28     mutex: Mutex<()>,
29 
30     /// Condvar to block on if the driver is unavailable.
31     condvar: Condvar,
32 
33     /// Resource (I/O, time, ...) driver
34     shared: Arc<Shared>,
35 }
36 
37 const EMPTY: usize = 0;
38 const PARKED_CONDVAR: usize = 1;
39 const PARKED_DRIVER: usize = 2;
40 const NOTIFIED: usize = 3;
41 
42 /// Shared across multiple Parker handles
43 struct Shared {
44     /// Shared driver. Only one thread at a time can use this
45     driver: TryLock<time::Driver>,
46 
47     /// Unpark handle
48     handle: <time::Driver as Park>::Unpark,
49 }
50 
51 impl Parker {
new(driver: time::Driver) -> Parker52     pub(crate) fn new(driver: time::Driver) -> Parker {
53         let handle = driver.unpark();
54 
55         Parker {
56             inner: Arc::new(Inner {
57                 state: AtomicUsize::new(EMPTY),
58                 mutex: Mutex::new(()),
59                 condvar: Condvar::new(),
60                 shared: Arc::new(Shared {
61                     driver: TryLock::new(driver),
62                     handle,
63                 }),
64             }),
65         }
66     }
67 }
68 
69 impl Clone for Parker {
clone(&self) -> Parker70     fn clone(&self) -> Parker {
71         Parker {
72             inner: Arc::new(Inner {
73                 state: AtomicUsize::new(EMPTY),
74                 mutex: Mutex::new(()),
75                 condvar: Condvar::new(),
76                 shared: self.inner.shared.clone(),
77             }),
78         }
79     }
80 }
81 
82 impl Park for Parker {
83     type Unpark = Unparker;
84     type Error = ();
85 
unpark(&self) -> Unparker86     fn unpark(&self) -> Unparker {
87         Unparker {
88             inner: self.inner.clone(),
89         }
90     }
91 
park(&mut self) -> Result<(), Self::Error>92     fn park(&mut self) -> Result<(), Self::Error> {
93         self.inner.park();
94         Ok(())
95     }
96 
park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>97     fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
98         // Only parking with zero is supported...
99         assert_eq!(duration, Duration::from_millis(0));
100 
101         if let Some(mut driver) = self.inner.shared.driver.try_lock() {
102             driver.park_timeout(duration).map_err(|_| ())
103         } else {
104             Ok(())
105         }
106     }
107 }
108 
109 impl Unpark for Unparker {
unpark(&self)110     fn unpark(&self) {
111         self.inner.unpark();
112     }
113 }
114 
115 impl Inner {
116     /// Parks the current thread for at most `dur`.
park(&self)117     fn park(&self) {
118         for _ in 0..3 {
119             // If we were previously notified then we consume this notification and
120             // return quickly.
121             if self
122                 .state
123                 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
124                 .is_ok()
125             {
126                 return;
127             }
128 
129             thread::yield_now();
130         }
131 
132         if let Some(mut driver) = self.shared.driver.try_lock() {
133             self.park_driver(&mut driver);
134         } else {
135             self.park_condvar();
136         }
137     }
138 
park_condvar(&self)139     fn park_condvar(&self) {
140         // Otherwise we need to coordinate going to sleep
141         let mut m = self.mutex.lock().unwrap();
142 
143         match self
144             .state
145             .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst)
146         {
147             Ok(_) => {}
148             Err(NOTIFIED) => {
149                 // We must read here, even though we know it will be `NOTIFIED`.
150                 // This is because `unpark` may have been called again since we read
151                 // `NOTIFIED` in the `compare_exchange` above. We must perform an
152                 // acquire operation that synchronizes with that `unpark` to observe
153                 // any writes it made before the call to unpark. To do that we must
154                 // read from the write it made to `state`.
155                 let old = self.state.swap(EMPTY, SeqCst);
156                 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
157 
158                 return;
159             }
160             Err(actual) => panic!("inconsistent park state; actual = {}", actual),
161         }
162 
163         loop {
164             m = self.condvar.wait(m).unwrap();
165 
166             if self
167                 .state
168                 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst)
169                 .is_ok()
170             {
171                 // got a notification
172                 return;
173             }
174 
175             // spurious wakeup, go back to sleep
176         }
177     }
178 
park_driver(&self, driver: &mut time::Driver)179     fn park_driver(&self, driver: &mut time::Driver) {
180         match self
181             .state
182             .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst)
183         {
184             Ok(_) => {}
185             Err(NOTIFIED) => {
186                 // We must read here, even though we know it will be `NOTIFIED`.
187                 // This is because `unpark` may have been called again since we read
188                 // `NOTIFIED` in the `compare_exchange` above. We must perform an
189                 // acquire operation that synchronizes with that `unpark` to observe
190                 // any writes it made before the call to unpark. To do that we must
191                 // read from the write it made to `state`.
192                 let old = self.state.swap(EMPTY, SeqCst);
193                 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly");
194 
195                 return;
196             }
197             Err(actual) => panic!("inconsistent park state; actual = {}", actual),
198         }
199 
200         // TODO: don't unwrap
201         driver.park().unwrap();
202 
203         match self.state.swap(EMPTY, SeqCst) {
204             NOTIFIED => {}      // got a notification, hurray!
205             PARKED_DRIVER => {} // no notification, alas
206             n => panic!("inconsistent park_timeout state: {}", n),
207         }
208     }
209 
unpark(&self)210     fn unpark(&self) {
211         // To ensure the unparked thread will observe any writes we made before
212         // this call, we must perform a release operation that `park` can
213         // synchronize with. To do that we must write `NOTIFIED` even if `state`
214         // is already `NOTIFIED`. That is why this must be a swap rather than a
215         // compare-and-swap that returns if it reads `NOTIFIED` on failure.
216         match self.state.swap(NOTIFIED, SeqCst) {
217             EMPTY => {}    // no one was waiting
218             NOTIFIED => {} // already unparked
219             PARKED_CONDVAR => self.unpark_condvar(),
220             PARKED_DRIVER => self.unpark_driver(),
221             actual => panic!("inconsistent state in unpark; actual = {}", actual),
222         }
223     }
224 
unpark_condvar(&self)225     fn unpark_condvar(&self) {
226         // There is a period between when the parked thread sets `state` to
227         // `PARKED` (or last checked `state` in the case of a spurious wake
228         // up) and when it actually waits on `cvar`. If we were to notify
229         // during this period it would be ignored and then when the parked
230         // thread went to sleep it would never wake up. Fortunately, it has
231         // `lock` locked at this stage so we can acquire `lock` to wait until
232         // it is ready to receive the notification.
233         //
234         // Releasing `lock` before the call to `notify_one` means that when the
235         // parked thread wakes it doesn't get woken only to have to wait for us
236         // to release `lock`.
237         drop(self.mutex.lock().unwrap());
238 
239         self.condvar.notify_one()
240     }
241 
unpark_driver(&self)242     fn unpark_driver(&self) {
243         self.shared.handle.unpark();
244     }
245 }
246