1 use std::sync::atomic::{AtomicUsize, Ordering}; 2 use std::sync::{Arc, Condvar, Mutex}; 3 use std::usize; 4 5 use crate::registry::{Registry, WorkerThread}; 6 7 /// We define various kinds of latches, which are all a primitive signaling 8 /// mechanism. A latch starts as false. Eventually someone calls `set()` and 9 /// it becomes true. You can test if it has been set by calling `probe()`. 10 /// 11 /// Some kinds of latches, but not all, support a `wait()` operation 12 /// that will wait until the latch is set, blocking efficiently. That 13 /// is not part of the trait since it is not possibly to do with all 14 /// latches. 15 /// 16 /// The intention is that `set()` is called once, but `probe()` may be 17 /// called any number of times. Once `probe()` returns true, the memory 18 /// effects that occurred before `set()` become visible. 19 /// 20 /// It'd probably be better to refactor the API into two paired types, 21 /// but that's a bit of work, and this is not a public API. 22 /// 23 /// ## Memory ordering 24 /// 25 /// Latches need to guarantee two things: 26 /// 27 /// - Once `probe()` returns true, all memory effects from the `set()` 28 /// are visible (in other words, the set should synchronize-with 29 /// the probe). 30 /// - Once `set()` occurs, the next `probe()` *will* observe it. This 31 /// typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep 32 /// README](/src/sleep/README.md#tickle-then-get-sleepy) for details. 33 pub(super) trait Latch { 34 /// Set the latch, signalling others. 35 /// 36 /// # WARNING 37 /// 38 /// Setting a latch triggers other threads to wake up and (in some 39 /// cases) complete. This may, in turn, cause memory to be 40 /// allocated and so forth. One must be very careful about this, 41 /// and it's typically better to read all the fields you will need 42 /// to access *before* a latch is set! set(&self)43 fn set(&self); 44 } 45 46 pub(super) trait AsCoreLatch { as_core_latch(&self) -> &CoreLatch47 fn as_core_latch(&self) -> &CoreLatch; 48 } 49 50 /// Latch is not set, owning thread is awake 51 const UNSET: usize = 0; 52 53 /// Latch is not set, owning thread is going to sleep on this latch 54 /// (but has not yet fallen asleep). 55 const SLEEPY: usize = 1; 56 57 /// Latch is not set, owning thread is asleep on this latch and 58 /// must be awoken. 59 const SLEEPING: usize = 2; 60 61 /// Latch is set. 62 const SET: usize = 3; 63 64 /// Spin latches are the simplest, most efficient kind, but they do 65 /// not support a `wait()` operation. They just have a boolean flag 66 /// that becomes true when `set()` is called. 67 #[derive(Debug)] 68 pub(super) struct CoreLatch { 69 state: AtomicUsize, 70 } 71 72 impl CoreLatch { 73 #[inline] new() -> Self74 fn new() -> Self { 75 Self { 76 state: AtomicUsize::new(0), 77 } 78 } 79 80 /// Returns the address of this core latch as an integer. Used 81 /// for logging. 82 #[inline] addr(&self) -> usize83 pub(super) fn addr(&self) -> usize { 84 self as *const CoreLatch as usize 85 } 86 87 /// Invoked by owning thread as it prepares to sleep. Returns true 88 /// if the owning thread may proceed to fall asleep, false if the 89 /// latch was set in the meantime. 90 #[inline] get_sleepy(&self) -> bool91 pub(super) fn get_sleepy(&self) -> bool { 92 self.state 93 .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed) 94 .is_ok() 95 } 96 97 /// Invoked by owning thread as it falls asleep sleep. Returns 98 /// true if the owning thread should block, or false if the latch 99 /// was set in the meantime. 100 #[inline] fall_asleep(&self) -> bool101 pub(super) fn fall_asleep(&self) -> bool { 102 self.state 103 .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed) 104 .is_ok() 105 } 106 107 /// Invoked by owning thread as it falls asleep sleep. Returns 108 /// true if the owning thread should block, or false if the latch 109 /// was set in the meantime. 110 #[inline] wake_up(&self)111 pub(super) fn wake_up(&self) { 112 if !self.probe() { 113 let _ = 114 self.state 115 .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed); 116 } 117 } 118 119 /// Set the latch. If this returns true, the owning thread was sleeping 120 /// and must be awoken. 121 /// 122 /// This is private because, typically, setting a latch involves 123 /// doing some wakeups; those are encapsulated in the surrounding 124 /// latch code. 125 #[inline] set(&self) -> bool126 fn set(&self) -> bool { 127 let old_state = self.state.swap(SET, Ordering::AcqRel); 128 old_state == SLEEPING 129 } 130 131 /// Test if this latch has been set. 132 #[inline] probe(&self) -> bool133 pub(super) fn probe(&self) -> bool { 134 self.state.load(Ordering::Acquire) == SET 135 } 136 } 137 138 /// Spin latches are the simplest, most efficient kind, but they do 139 /// not support a `wait()` operation. They just have a boolean flag 140 /// that becomes true when `set()` is called. 141 pub(super) struct SpinLatch<'r> { 142 core_latch: CoreLatch, 143 registry: &'r Arc<Registry>, 144 target_worker_index: usize, 145 cross: bool, 146 } 147 148 impl<'r> SpinLatch<'r> { 149 /// Creates a new spin latch that is owned by `thread`. This means 150 /// that `thread` is the only thread that should be blocking on 151 /// this latch -- it also means that when the latch is set, we 152 /// will wake `thread` if it is sleeping. 153 #[inline] new(thread: &'r WorkerThread) -> SpinLatch<'r>154 pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> { 155 SpinLatch { 156 core_latch: CoreLatch::new(), 157 registry: thread.registry(), 158 target_worker_index: thread.index(), 159 cross: false, 160 } 161 } 162 163 /// Creates a new spin latch for cross-threadpool blocking. Notably, we 164 /// need to make sure the registry is kept alive after setting, so we can 165 /// safely call the notification. 166 #[inline] cross(thread: &'r WorkerThread) -> SpinLatch<'r>167 pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> { 168 SpinLatch { 169 cross: true, 170 ..SpinLatch::new(thread) 171 } 172 } 173 174 #[inline] probe(&self) -> bool175 pub(super) fn probe(&self) -> bool { 176 self.core_latch.probe() 177 } 178 } 179 180 impl<'r> AsCoreLatch for SpinLatch<'r> { 181 #[inline] as_core_latch(&self) -> &CoreLatch182 fn as_core_latch(&self) -> &CoreLatch { 183 &self.core_latch 184 } 185 } 186 187 impl<'r> Latch for SpinLatch<'r> { 188 #[inline] set(&self)189 fn set(&self) { 190 let cross_registry; 191 192 let registry = if self.cross { 193 // Ensure the registry stays alive while we notify it. 194 // Otherwise, it would be possible that we set the spin 195 // latch and the other thread sees it and exits, causing 196 // the registry to be deallocated, all before we get a 197 // chance to invoke `registry.notify_worker_latch_is_set`. 198 cross_registry = Arc::clone(self.registry); 199 &cross_registry 200 } else { 201 // If this is not a "cross-registry" spin-latch, then the 202 // thread which is performing `set` is itself ensuring 203 // that the registry stays alive. 204 self.registry 205 }; 206 let target_worker_index = self.target_worker_index; 207 208 // NOTE: Once we `set`, the target may proceed and invalidate `&self`! 209 if self.core_latch.set() { 210 // Subtle: at this point, we can no longer read from 211 // `self`, because the thread owning this spin latch may 212 // have awoken and deallocated the latch. Therefore, we 213 // only use fields whose values we already read. 214 registry.notify_worker_latch_is_set(target_worker_index); 215 } 216 } 217 } 218 219 /// A Latch starts as false and eventually becomes true. You can block 220 /// until it becomes true. 221 #[derive(Debug)] 222 pub(super) struct LockLatch { 223 m: Mutex<bool>, 224 v: Condvar, 225 } 226 227 impl LockLatch { 228 #[inline] new() -> LockLatch229 pub(super) fn new() -> LockLatch { 230 LockLatch { 231 m: Mutex::new(false), 232 v: Condvar::new(), 233 } 234 } 235 236 /// Block until latch is set, then resets this lock latch so it can be reused again. wait_and_reset(&self)237 pub(super) fn wait_and_reset(&self) { 238 let mut guard = self.m.lock().unwrap(); 239 while !*guard { 240 guard = self.v.wait(guard).unwrap(); 241 } 242 *guard = false; 243 } 244 245 /// Block until latch is set. wait(&self)246 pub(super) fn wait(&self) { 247 let mut guard = self.m.lock().unwrap(); 248 while !*guard { 249 guard = self.v.wait(guard).unwrap(); 250 } 251 } 252 } 253 254 impl Latch for LockLatch { 255 #[inline] set(&self)256 fn set(&self) { 257 let mut guard = self.m.lock().unwrap(); 258 *guard = true; 259 self.v.notify_all(); 260 } 261 } 262 263 /// Counting latches are used to implement scopes. They track a 264 /// counter. Unlike other latches, calling `set()` does not 265 /// necessarily make the latch be considered `set()`; instead, it just 266 /// decrements the counter. The latch is only "set" (in the sense that 267 /// `probe()` returns true) once the counter reaches zero. 268 /// 269 /// Note: like a `SpinLatch`, count laches are always associated with 270 /// some registry that is probing them, which must be tickled when 271 /// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a 272 /// reference to that registry. This is because in some cases the 273 /// registry owns the count-latch, and that would create a cycle. So a 274 /// `CountLatch` must be given a reference to its owning registry when 275 /// it is set. For this reason, it does not implement the `Latch` 276 /// trait (but it doesn't have to, as it is not used in those generic 277 /// contexts). 278 #[derive(Debug)] 279 pub(super) struct CountLatch { 280 core_latch: CoreLatch, 281 counter: AtomicUsize, 282 } 283 284 impl CountLatch { 285 #[inline] new() -> CountLatch286 pub(super) fn new() -> CountLatch { 287 CountLatch { 288 core_latch: CoreLatch::new(), 289 counter: AtomicUsize::new(1), 290 } 291 } 292 293 #[inline] increment(&self)294 pub(super) fn increment(&self) { 295 debug_assert!(!self.core_latch.probe()); 296 self.counter.fetch_add(1, Ordering::Relaxed); 297 } 298 299 /// Decrements the latch counter by one. If this is the final 300 /// count, then the latch is **set**, and calls to `probe()` will 301 /// return true. Returns whether the latch was set. 302 #[inline] set(&self) -> bool303 pub(super) fn set(&self) -> bool { 304 if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 { 305 self.core_latch.set(); 306 true 307 } else { 308 false 309 } 310 } 311 312 /// Decrements the latch counter by one and possibly set it. If 313 /// the latch is set, then the specific worker thread is tickled, 314 /// which should be the one that owns this latch. 315 #[inline] set_and_tickle_one(&self, registry: &Registry, target_worker_index: usize)316 pub(super) fn set_and_tickle_one(&self, registry: &Registry, target_worker_index: usize) { 317 if self.set() { 318 registry.notify_worker_latch_is_set(target_worker_index); 319 } 320 } 321 } 322 323 impl AsCoreLatch for CountLatch { 324 #[inline] as_core_latch(&self) -> &CoreLatch325 fn as_core_latch(&self) -> &CoreLatch { 326 &self.core_latch 327 } 328 } 329 330 #[derive(Debug)] 331 pub(super) struct CountLockLatch { 332 lock_latch: LockLatch, 333 counter: AtomicUsize, 334 } 335 336 impl CountLockLatch { 337 #[inline] new() -> CountLockLatch338 pub(super) fn new() -> CountLockLatch { 339 CountLockLatch { 340 lock_latch: LockLatch::new(), 341 counter: AtomicUsize::new(1), 342 } 343 } 344 345 #[inline] increment(&self)346 pub(super) fn increment(&self) { 347 let old_counter = self.counter.fetch_add(1, Ordering::Relaxed); 348 debug_assert!(old_counter != 0); 349 } 350 wait(&self)351 pub(super) fn wait(&self) { 352 self.lock_latch.wait(); 353 } 354 } 355 356 impl Latch for CountLockLatch { 357 #[inline] set(&self)358 fn set(&self) { 359 if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 { 360 self.lock_latch.set(); 361 } 362 } 363 } 364 365 impl<'a, L> Latch for &'a L 366 where 367 L: Latch, 368 { 369 #[inline] set(&self)370 fn set(&self) { 371 L::set(self); 372 } 373 } 374