1 //! Parks the runtime. 2 //! 3 //! A combination of the various resource driver park handles. 4 5 use crate::loom::sync::atomic::AtomicUsize; 6 use crate::loom::sync::{Arc, Condvar, Mutex}; 7 use crate::loom::thread; 8 use crate::park::{Park, Unpark}; 9 use crate::runtime::time; 10 use crate::util::TryLock; 11 12 use std::sync::atomic::Ordering::SeqCst; 13 use std::time::Duration; 14 15 pub(crate) struct Parker { 16 inner: Arc<Inner>, 17 } 18 19 pub(crate) struct Unparker { 20 inner: Arc<Inner>, 21 } 22 23 struct Inner { 24 /// Avoids entering the park if possible 25 state: AtomicUsize, 26 27 /// Used to coordinate access to the driver / condvar 28 mutex: Mutex<()>, 29 30 /// Condvar to block on if the driver is unavailable. 31 condvar: Condvar, 32 33 /// Resource (I/O, time, ...) driver 34 shared: Arc<Shared>, 35 } 36 37 const EMPTY: usize = 0; 38 const PARKED_CONDVAR: usize = 1; 39 const PARKED_DRIVER: usize = 2; 40 const NOTIFIED: usize = 3; 41 42 /// Shared across multiple Parker handles 43 struct Shared { 44 /// Shared driver. Only one thread at a time can use this 45 driver: TryLock<time::Driver>, 46 47 /// Unpark handle 48 handle: <time::Driver as Park>::Unpark, 49 } 50 51 impl Parker { new(driver: time::Driver) -> Parker52 pub(crate) fn new(driver: time::Driver) -> Parker { 53 let handle = driver.unpark(); 54 55 Parker { 56 inner: Arc::new(Inner { 57 state: AtomicUsize::new(EMPTY), 58 mutex: Mutex::new(()), 59 condvar: Condvar::new(), 60 shared: Arc::new(Shared { 61 driver: TryLock::new(driver), 62 handle, 63 }), 64 }), 65 } 66 } 67 } 68 69 impl Clone for Parker { clone(&self) -> Parker70 fn clone(&self) -> Parker { 71 Parker { 72 inner: Arc::new(Inner { 73 state: AtomicUsize::new(EMPTY), 74 mutex: Mutex::new(()), 75 condvar: Condvar::new(), 76 shared: self.inner.shared.clone(), 77 }), 78 } 79 } 80 } 81 82 impl Park for Parker { 83 type Unpark = Unparker; 84 type Error = (); 85 unpark(&self) -> Unparker86 fn unpark(&self) -> Unparker { 87 Unparker { 88 inner: self.inner.clone(), 89 } 90 } 91 park(&mut self) -> Result<(), Self::Error>92 fn park(&mut self) -> Result<(), Self::Error> { 93 self.inner.park(); 94 Ok(()) 95 } 96 park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>97 fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { 98 // Only parking with zero is supported... 99 assert_eq!(duration, Duration::from_millis(0)); 100 101 if let Some(mut driver) = self.inner.shared.driver.try_lock() { 102 driver.park_timeout(duration).map_err(|_| ()) 103 } else { 104 Ok(()) 105 } 106 } 107 shutdown(&mut self)108 fn shutdown(&mut self) { 109 self.inner.shutdown(); 110 } 111 } 112 113 impl Unpark for Unparker { unpark(&self)114 fn unpark(&self) { 115 self.inner.unpark(); 116 } 117 } 118 119 impl Inner { 120 /// Parks the current thread for at most `dur`. park(&self)121 fn park(&self) { 122 for _ in 0..3 { 123 // If we were previously notified then we consume this notification and 124 // return quickly. 125 if self 126 .state 127 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) 128 .is_ok() 129 { 130 return; 131 } 132 133 thread::yield_now(); 134 } 135 136 if let Some(mut driver) = self.shared.driver.try_lock() { 137 self.park_driver(&mut driver); 138 } else { 139 self.park_condvar(); 140 } 141 } 142 park_condvar(&self)143 fn park_condvar(&self) { 144 // Otherwise we need to coordinate going to sleep 145 let mut m = self.mutex.lock().unwrap(); 146 147 match self 148 .state 149 .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst) 150 { 151 Ok(_) => {} 152 Err(NOTIFIED) => { 153 // We must read here, even though we know it will be `NOTIFIED`. 154 // This is because `unpark` may have been called again since we read 155 // `NOTIFIED` in the `compare_exchange` above. We must perform an 156 // acquire operation that synchronizes with that `unpark` to observe 157 // any writes it made before the call to unpark. To do that we must 158 // read from the write it made to `state`. 159 let old = self.state.swap(EMPTY, SeqCst); 160 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); 161 162 return; 163 } 164 Err(actual) => panic!("inconsistent park state; actual = {}", actual), 165 } 166 167 loop { 168 m = self.condvar.wait(m).unwrap(); 169 170 if self 171 .state 172 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) 173 .is_ok() 174 { 175 // got a notification 176 return; 177 } 178 179 // spurious wakeup, go back to sleep 180 } 181 } 182 park_driver(&self, driver: &mut time::Driver)183 fn park_driver(&self, driver: &mut time::Driver) { 184 match self 185 .state 186 .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst) 187 { 188 Ok(_) => {} 189 Err(NOTIFIED) => { 190 // We must read here, even though we know it will be `NOTIFIED`. 191 // This is because `unpark` may have been called again since we read 192 // `NOTIFIED` in the `compare_exchange` above. We must perform an 193 // acquire operation that synchronizes with that `unpark` to observe 194 // any writes it made before the call to unpark. To do that we must 195 // read from the write it made to `state`. 196 let old = self.state.swap(EMPTY, SeqCst); 197 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); 198 199 return; 200 } 201 Err(actual) => panic!("inconsistent park state; actual = {}", actual), 202 } 203 204 // TODO: don't unwrap 205 driver.park().unwrap(); 206 207 match self.state.swap(EMPTY, SeqCst) { 208 NOTIFIED => {} // got a notification, hurray! 209 PARKED_DRIVER => {} // no notification, alas 210 n => panic!("inconsistent park_timeout state: {}", n), 211 } 212 } 213 unpark(&self)214 fn unpark(&self) { 215 // To ensure the unparked thread will observe any writes we made before 216 // this call, we must perform a release operation that `park` can 217 // synchronize with. To do that we must write `NOTIFIED` even if `state` 218 // is already `NOTIFIED`. That is why this must be a swap rather than a 219 // compare-and-swap that returns if it reads `NOTIFIED` on failure. 220 match self.state.swap(NOTIFIED, SeqCst) { 221 EMPTY => {} // no one was waiting 222 NOTIFIED => {} // already unparked 223 PARKED_CONDVAR => self.unpark_condvar(), 224 PARKED_DRIVER => self.unpark_driver(), 225 actual => panic!("inconsistent state in unpark; actual = {}", actual), 226 } 227 } 228 unpark_condvar(&self)229 fn unpark_condvar(&self) { 230 // There is a period between when the parked thread sets `state` to 231 // `PARKED` (or last checked `state` in the case of a spurious wake 232 // up) and when it actually waits on `cvar`. If we were to notify 233 // during this period it would be ignored and then when the parked 234 // thread went to sleep it would never wake up. Fortunately, it has 235 // `lock` locked at this stage so we can acquire `lock` to wait until 236 // it is ready to receive the notification. 237 // 238 // Releasing `lock` before the call to `notify_one` means that when the 239 // parked thread wakes it doesn't get woken only to have to wait for us 240 // to release `lock`. 241 drop(self.mutex.lock().unwrap()); 242 243 self.condvar.notify_one() 244 } 245 unpark_driver(&self)246 fn unpark_driver(&self) { 247 self.shared.handle.unpark(); 248 } 249 shutdown(&self)250 fn shutdown(&self) { 251 if let Some(mut driver) = self.shared.driver.try_lock() { 252 driver.shutdown(); 253 } 254 255 self.condvar.notify_all(); 256 } 257 } 258