1 use std::sync::atomic::{AtomicUsize, Ordering};
2 use std::sync::{Arc, Condvar, Mutex};
3 use std::usize;
4 
5 use crate::registry::{Registry, WorkerThread};
6 
7 /// We define various kinds of latches, which are all a primitive signaling
8 /// mechanism. A latch starts as false. Eventually someone calls `set()` and
9 /// it becomes true. You can test if it has been set by calling `probe()`.
10 ///
11 /// Some kinds of latches, but not all, support a `wait()` operation
12 /// that will wait until the latch is set, blocking efficiently. That
13 /// is not part of the trait since it is not possibly to do with all
14 /// latches.
15 ///
16 /// The intention is that `set()` is called once, but `probe()` may be
17 /// called any number of times. Once `probe()` returns true, the memory
18 /// effects that occurred before `set()` become visible.
19 ///
20 /// It'd probably be better to refactor the API into two paired types,
21 /// but that's a bit of work, and this is not a public API.
22 ///
23 /// ## Memory ordering
24 ///
25 /// Latches need to guarantee two things:
26 ///
27 /// - Once `probe()` returns true, all memory effects from the `set()`
28 ///   are visible (in other words, the set should synchronize-with
29 ///   the probe).
30 /// - Once `set()` occurs, the next `probe()` *will* observe it.  This
31 ///   typically requires a seq-cst ordering. See [the "tickle-then-get-sleepy" scenario in the sleep
32 ///   README](/src/sleep/README.md#tickle-then-get-sleepy) for details.
33 pub(super) trait Latch {
34     /// Set the latch, signalling others.
35     ///
36     /// # WARNING
37     ///
38     /// Setting a latch triggers other threads to wake up and (in some
39     /// cases) complete. This may, in turn, cause memory to be
40     /// allocated and so forth.  One must be very careful about this,
41     /// and it's typically better to read all the fields you will need
42     /// to access *before* a latch is set!
set(&self)43     fn set(&self);
44 }
45 
46 pub(super) trait AsCoreLatch {
as_core_latch(&self) -> &CoreLatch47     fn as_core_latch(&self) -> &CoreLatch;
48 }
49 
50 /// Latch is not set, owning thread is awake
51 const UNSET: usize = 0;
52 
53 /// Latch is not set, owning thread is going to sleep on this latch
54 /// (but has not yet fallen asleep).
55 const SLEEPY: usize = 1;
56 
57 /// Latch is not set, owning thread is asleep on this latch and
58 /// must be awoken.
59 const SLEEPING: usize = 2;
60 
61 /// Latch is set.
62 const SET: usize = 3;
63 
64 /// Spin latches are the simplest, most efficient kind, but they do
65 /// not support a `wait()` operation. They just have a boolean flag
66 /// that becomes true when `set()` is called.
67 #[derive(Debug)]
68 pub(super) struct CoreLatch {
69     state: AtomicUsize,
70 }
71 
72 impl CoreLatch {
73     #[inline]
new() -> Self74     fn new() -> Self {
75         Self {
76             state: AtomicUsize::new(0),
77         }
78     }
79 
80     /// Returns the address of this core latch as an integer. Used
81     /// for logging.
82     #[inline]
addr(&self) -> usize83     pub(super) fn addr(&self) -> usize {
84         self as *const CoreLatch as usize
85     }
86 
87     /// Invoked by owning thread as it prepares to sleep. Returns true
88     /// if the owning thread may proceed to fall asleep, false if the
89     /// latch was set in the meantime.
90     #[inline]
get_sleepy(&self) -> bool91     pub(super) fn get_sleepy(&self) -> bool {
92         self.state
93             .compare_exchange(UNSET, SLEEPY, Ordering::SeqCst, Ordering::Relaxed)
94             .is_ok()
95     }
96 
97     /// Invoked by owning thread as it falls asleep sleep. Returns
98     /// true if the owning thread should block, or false if the latch
99     /// was set in the meantime.
100     #[inline]
fall_asleep(&self) -> bool101     pub(super) fn fall_asleep(&self) -> bool {
102         self.state
103             .compare_exchange(SLEEPY, SLEEPING, Ordering::SeqCst, Ordering::Relaxed)
104             .is_ok()
105     }
106 
107     /// Invoked by owning thread as it falls asleep sleep. Returns
108     /// true if the owning thread should block, or false if the latch
109     /// was set in the meantime.
110     #[inline]
wake_up(&self)111     pub(super) fn wake_up(&self) {
112         if !self.probe() {
113             let _ =
114                 self.state
115                     .compare_exchange(SLEEPING, UNSET, Ordering::SeqCst, Ordering::Relaxed);
116         }
117     }
118 
119     /// Set the latch. If this returns true, the owning thread was sleeping
120     /// and must be awoken.
121     ///
122     /// This is private because, typically, setting a latch involves
123     /// doing some wakeups; those are encapsulated in the surrounding
124     /// latch code.
125     #[inline]
set(&self) -> bool126     fn set(&self) -> bool {
127         let old_state = self.state.swap(SET, Ordering::AcqRel);
128         old_state == SLEEPING
129     }
130 
131     /// Test if this latch has been set.
132     #[inline]
probe(&self) -> bool133     pub(super) fn probe(&self) -> bool {
134         self.state.load(Ordering::Acquire) == SET
135     }
136 }
137 
138 /// Spin latches are the simplest, most efficient kind, but they do
139 /// not support a `wait()` operation. They just have a boolean flag
140 /// that becomes true when `set()` is called.
141 pub(super) struct SpinLatch<'r> {
142     core_latch: CoreLatch,
143     registry: &'r Arc<Registry>,
144     target_worker_index: usize,
145     cross: bool,
146 }
147 
148 impl<'r> SpinLatch<'r> {
149     /// Creates a new spin latch that is owned by `thread`. This means
150     /// that `thread` is the only thread that should be blocking on
151     /// this latch -- it also means that when the latch is set, we
152     /// will wake `thread` if it is sleeping.
153     #[inline]
new(thread: &'r WorkerThread) -> SpinLatch<'r>154     pub(super) fn new(thread: &'r WorkerThread) -> SpinLatch<'r> {
155         SpinLatch {
156             core_latch: CoreLatch::new(),
157             registry: thread.registry(),
158             target_worker_index: thread.index(),
159             cross: false,
160         }
161     }
162 
163     /// Creates a new spin latch for cross-threadpool blocking.  Notably, we
164     /// need to make sure the registry is kept alive after setting, so we can
165     /// safely call the notification.
166     #[inline]
cross(thread: &'r WorkerThread) -> SpinLatch<'r>167     pub(super) fn cross(thread: &'r WorkerThread) -> SpinLatch<'r> {
168         SpinLatch {
169             cross: true,
170             ..SpinLatch::new(thread)
171         }
172     }
173 
174     #[inline]
probe(&self) -> bool175     pub(super) fn probe(&self) -> bool {
176         self.core_latch.probe()
177     }
178 }
179 
180 impl<'r> AsCoreLatch for SpinLatch<'r> {
181     #[inline]
as_core_latch(&self) -> &CoreLatch182     fn as_core_latch(&self) -> &CoreLatch {
183         &self.core_latch
184     }
185 }
186 
187 impl<'r> Latch for SpinLatch<'r> {
188     #[inline]
set(&self)189     fn set(&self) {
190         let cross_registry;
191 
192         let registry = if self.cross {
193             // Ensure the registry stays alive while we notify it.
194             // Otherwise, it would be possible that we set the spin
195             // latch and the other thread sees it and exits, causing
196             // the registry to be deallocated, all before we get a
197             // chance to invoke `registry.notify_worker_latch_is_set`.
198             cross_registry = Arc::clone(self.registry);
199             &cross_registry
200         } else {
201             // If this is not a "cross-registry" spin-latch, then the
202             // thread which is performing `set` is itself ensuring
203             // that the registry stays alive.
204             self.registry
205         };
206         let target_worker_index = self.target_worker_index;
207 
208         // NOTE: Once we `set`, the target may proceed and invalidate `&self`!
209         if self.core_latch.set() {
210             // Subtle: at this point, we can no longer read from
211             // `self`, because the thread owning this spin latch may
212             // have awoken and deallocated the latch. Therefore, we
213             // only use fields whose values we already read.
214             registry.notify_worker_latch_is_set(target_worker_index);
215         }
216     }
217 }
218 
219 /// A Latch starts as false and eventually becomes true. You can block
220 /// until it becomes true.
221 #[derive(Debug)]
222 pub(super) struct LockLatch {
223     m: Mutex<bool>,
224     v: Condvar,
225 }
226 
227 impl LockLatch {
228     #[inline]
new() -> LockLatch229     pub(super) fn new() -> LockLatch {
230         LockLatch {
231             m: Mutex::new(false),
232             v: Condvar::new(),
233         }
234     }
235 
236     /// Block until latch is set, then resets this lock latch so it can be reused again.
wait_and_reset(&self)237     pub(super) fn wait_and_reset(&self) {
238         let mut guard = self.m.lock().unwrap();
239         while !*guard {
240             guard = self.v.wait(guard).unwrap();
241         }
242         *guard = false;
243     }
244 
245     /// Block until latch is set.
wait(&self)246     pub(super) fn wait(&self) {
247         let mut guard = self.m.lock().unwrap();
248         while !*guard {
249             guard = self.v.wait(guard).unwrap();
250         }
251     }
252 }
253 
254 impl Latch for LockLatch {
255     #[inline]
set(&self)256     fn set(&self) {
257         let mut guard = self.m.lock().unwrap();
258         *guard = true;
259         self.v.notify_all();
260     }
261 }
262 
263 /// Counting latches are used to implement scopes. They track a
264 /// counter. Unlike other latches, calling `set()` does not
265 /// necessarily make the latch be considered `set()`; instead, it just
266 /// decrements the counter. The latch is only "set" (in the sense that
267 /// `probe()` returns true) once the counter reaches zero.
268 ///
269 /// Note: like a `SpinLatch`, count laches are always associated with
270 /// some registry that is probing them, which must be tickled when
271 /// they are set. *Unlike* a `SpinLatch`, they don't themselves hold a
272 /// reference to that registry. This is because in some cases the
273 /// registry owns the count-latch, and that would create a cycle. So a
274 /// `CountLatch` must be given a reference to its owning registry when
275 /// it is set. For this reason, it does not implement the `Latch`
276 /// trait (but it doesn't have to, as it is not used in those generic
277 /// contexts).
278 #[derive(Debug)]
279 pub(super) struct CountLatch {
280     core_latch: CoreLatch,
281     counter: AtomicUsize,
282 }
283 
284 impl CountLatch {
285     #[inline]
new() -> CountLatch286     pub(super) fn new() -> CountLatch {
287         CountLatch {
288             core_latch: CoreLatch::new(),
289             counter: AtomicUsize::new(1),
290         }
291     }
292 
293     #[inline]
increment(&self)294     pub(super) fn increment(&self) {
295         debug_assert!(!self.core_latch.probe());
296         self.counter.fetch_add(1, Ordering::Relaxed);
297     }
298 
299     /// Decrements the latch counter by one. If this is the final
300     /// count, then the latch is **set**, and calls to `probe()` will
301     /// return true. Returns whether the latch was set.
302     #[inline]
set(&self) -> bool303     pub(super) fn set(&self) -> bool {
304         if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 {
305             self.core_latch.set();
306             true
307         } else {
308             false
309         }
310     }
311 
312     /// Decrements the latch counter by one and possibly set it.  If
313     /// the latch is set, then the specific worker thread is tickled,
314     /// which should be the one that owns this latch.
315     #[inline]
set_and_tickle_one(&self, registry: &Registry, target_worker_index: usize)316     pub(super) fn set_and_tickle_one(&self, registry: &Registry, target_worker_index: usize) {
317         if self.set() {
318             registry.notify_worker_latch_is_set(target_worker_index);
319         }
320     }
321 }
322 
323 impl AsCoreLatch for CountLatch {
324     #[inline]
as_core_latch(&self) -> &CoreLatch325     fn as_core_latch(&self) -> &CoreLatch {
326         &self.core_latch
327     }
328 }
329 
330 #[derive(Debug)]
331 pub(super) struct CountLockLatch {
332     lock_latch: LockLatch,
333     counter: AtomicUsize,
334 }
335 
336 impl CountLockLatch {
337     #[inline]
new() -> CountLockLatch338     pub(super) fn new() -> CountLockLatch {
339         CountLockLatch {
340             lock_latch: LockLatch::new(),
341             counter: AtomicUsize::new(1),
342         }
343     }
344 
345     #[inline]
increment(&self)346     pub(super) fn increment(&self) {
347         let old_counter = self.counter.fetch_add(1, Ordering::Relaxed);
348         debug_assert!(old_counter != 0);
349     }
350 
wait(&self)351     pub(super) fn wait(&self) {
352         self.lock_latch.wait();
353     }
354 }
355 
356 impl Latch for CountLockLatch {
357     #[inline]
set(&self)358     fn set(&self) {
359         if self.counter.fetch_sub(1, Ordering::SeqCst) == 1 {
360             self.lock_latch.set();
361         }
362     }
363 }
364 
365 impl<'a, L> Latch for &'a L
366 where
367     L: Latch,
368 {
369     #[inline]
set(&self)370     fn set(&self) {
371         L::set(self);
372     }
373 }
374