1 //! Parks the runtime. 2 //! 3 //! A combination of the various resource driver park handles. 4 5 use crate::loom::sync::atomic::AtomicUsize; 6 use crate::loom::sync::{Arc, Condvar, Mutex}; 7 use crate::loom::thread; 8 use crate::park::{Park, Unpark}; 9 use crate::runtime::time; 10 use crate::util::TryLock; 11 12 use std::sync::atomic::Ordering::SeqCst; 13 use std::time::Duration; 14 15 pub(crate) struct Parker { 16 inner: Arc<Inner>, 17 } 18 19 pub(crate) struct Unparker { 20 inner: Arc<Inner>, 21 } 22 23 struct Inner { 24 /// Avoids entering the park if possible 25 state: AtomicUsize, 26 27 /// Used to coordinate access to the driver / condvar 28 mutex: Mutex<()>, 29 30 /// Condvar to block on if the driver is unavailable. 31 condvar: Condvar, 32 33 /// Resource (I/O, time, ...) driver 34 shared: Arc<Shared>, 35 } 36 37 const EMPTY: usize = 0; 38 const PARKED_CONDVAR: usize = 1; 39 const PARKED_DRIVER: usize = 2; 40 const NOTIFIED: usize = 3; 41 42 /// Shared across multiple Parker handles 43 struct Shared { 44 /// Shared driver. Only one thread at a time can use this 45 driver: TryLock<time::Driver>, 46 47 /// Unpark handle 48 handle: <time::Driver as Park>::Unpark, 49 } 50 51 impl Parker { new(driver: time::Driver) -> Parker52 pub(crate) fn new(driver: time::Driver) -> Parker { 53 let handle = driver.unpark(); 54 55 Parker { 56 inner: Arc::new(Inner { 57 state: AtomicUsize::new(EMPTY), 58 mutex: Mutex::new(()), 59 condvar: Condvar::new(), 60 shared: Arc::new(Shared { 61 driver: TryLock::new(driver), 62 handle, 63 }), 64 }), 65 } 66 } 67 } 68 69 impl Clone for Parker { clone(&self) -> Parker70 fn clone(&self) -> Parker { 71 Parker { 72 inner: Arc::new(Inner { 73 state: AtomicUsize::new(EMPTY), 74 mutex: Mutex::new(()), 75 condvar: Condvar::new(), 76 shared: self.inner.shared.clone(), 77 }), 78 } 79 } 80 } 81 82 impl Park for Parker { 83 type Unpark = Unparker; 84 type Error = (); 85 unpark(&self) -> Unparker86 fn unpark(&self) -> Unparker { 87 Unparker { 88 inner: self.inner.clone(), 89 } 90 } 91 park(&mut self) -> Result<(), Self::Error>92 fn park(&mut self) -> Result<(), Self::Error> { 93 self.inner.park(); 94 Ok(()) 95 } 96 park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error>97 fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { 98 // Only parking with zero is supported... 99 assert_eq!(duration, Duration::from_millis(0)); 100 101 if let Some(mut driver) = self.inner.shared.driver.try_lock() { 102 driver.park_timeout(duration).map_err(|_| ()) 103 } else { 104 Ok(()) 105 } 106 } 107 } 108 109 impl Unpark for Unparker { unpark(&self)110 fn unpark(&self) { 111 self.inner.unpark(); 112 } 113 } 114 115 impl Inner { 116 /// Parks the current thread for at most `dur`. park(&self)117 fn park(&self) { 118 for _ in 0..3 { 119 // If we were previously notified then we consume this notification and 120 // return quickly. 121 if self 122 .state 123 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) 124 .is_ok() 125 { 126 return; 127 } 128 129 thread::yield_now(); 130 } 131 132 if let Some(mut driver) = self.shared.driver.try_lock() { 133 self.park_driver(&mut driver); 134 } else { 135 self.park_condvar(); 136 } 137 } 138 park_condvar(&self)139 fn park_condvar(&self) { 140 // Otherwise we need to coordinate going to sleep 141 let mut m = self.mutex.lock().unwrap(); 142 143 match self 144 .state 145 .compare_exchange(EMPTY, PARKED_CONDVAR, SeqCst, SeqCst) 146 { 147 Ok(_) => {} 148 Err(NOTIFIED) => { 149 // We must read here, even though we know it will be `NOTIFIED`. 150 // This is because `unpark` may have been called again since we read 151 // `NOTIFIED` in the `compare_exchange` above. We must perform an 152 // acquire operation that synchronizes with that `unpark` to observe 153 // any writes it made before the call to unpark. To do that we must 154 // read from the write it made to `state`. 155 let old = self.state.swap(EMPTY, SeqCst); 156 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); 157 158 return; 159 } 160 Err(actual) => panic!("inconsistent park state; actual = {}", actual), 161 } 162 163 loop { 164 m = self.condvar.wait(m).unwrap(); 165 166 if self 167 .state 168 .compare_exchange(NOTIFIED, EMPTY, SeqCst, SeqCst) 169 .is_ok() 170 { 171 // got a notification 172 return; 173 } 174 175 // spurious wakeup, go back to sleep 176 } 177 } 178 park_driver(&self, driver: &mut time::Driver)179 fn park_driver(&self, driver: &mut time::Driver) { 180 match self 181 .state 182 .compare_exchange(EMPTY, PARKED_DRIVER, SeqCst, SeqCst) 183 { 184 Ok(_) => {} 185 Err(NOTIFIED) => { 186 // We must read here, even though we know it will be `NOTIFIED`. 187 // This is because `unpark` may have been called again since we read 188 // `NOTIFIED` in the `compare_exchange` above. We must perform an 189 // acquire operation that synchronizes with that `unpark` to observe 190 // any writes it made before the call to unpark. To do that we must 191 // read from the write it made to `state`. 192 let old = self.state.swap(EMPTY, SeqCst); 193 debug_assert_eq!(old, NOTIFIED, "park state changed unexpectedly"); 194 195 return; 196 } 197 Err(actual) => panic!("inconsistent park state; actual = {}", actual), 198 } 199 200 // TODO: don't unwrap 201 driver.park().unwrap(); 202 203 match self.state.swap(EMPTY, SeqCst) { 204 NOTIFIED => {} // got a notification, hurray! 205 PARKED_DRIVER => {} // no notification, alas 206 n => panic!("inconsistent park_timeout state: {}", n), 207 } 208 } 209 unpark(&self)210 fn unpark(&self) { 211 // To ensure the unparked thread will observe any writes we made before 212 // this call, we must perform a release operation that `park` can 213 // synchronize with. To do that we must write `NOTIFIED` even if `state` 214 // is already `NOTIFIED`. That is why this must be a swap rather than a 215 // compare-and-swap that returns if it reads `NOTIFIED` on failure. 216 match self.state.swap(NOTIFIED, SeqCst) { 217 EMPTY => {} // no one was waiting 218 NOTIFIED => {} // already unparked 219 PARKED_CONDVAR => self.unpark_condvar(), 220 PARKED_DRIVER => self.unpark_driver(), 221 actual => panic!("inconsistent state in unpark; actual = {}", actual), 222 } 223 } 224 unpark_condvar(&self)225 fn unpark_condvar(&self) { 226 // There is a period between when the parked thread sets `state` to 227 // `PARKED` (or last checked `state` in the case of a spurious wake 228 // up) and when it actually waits on `cvar`. If we were to notify 229 // during this period it would be ignored and then when the parked 230 // thread went to sleep it would never wake up. Fortunately, it has 231 // `lock` locked at this stage so we can acquire `lock` to wait until 232 // it is ready to receive the notification. 233 // 234 // Releasing `lock` before the call to `notify_one` means that when the 235 // parked thread wakes it doesn't get woken only to have to wait for us 236 // to release `lock`. 237 drop(self.mutex.lock().unwrap()); 238 239 self.condvar.notify_one() 240 } 241 unpark_driver(&self)242 fn unpark_driver(&self) { 243 self.shared.handle.unpark(); 244 } 245 } 246