1 //! Code that decides when workers should go to sleep. See README.md
2 //! for an overview.
3 
4 use crate::latch::CoreLatch;
5 use crate::log::Event::*;
6 use crate::log::Logger;
7 use crossbeam_utils::CachePadded;
8 use std::sync::atomic::Ordering;
9 use std::sync::{Condvar, Mutex};
10 use std::thread;
11 use std::usize;
12 
13 mod counters;
14 use self::counters::{AtomicCounters, JobsEventCounter};
15 
16 /// The `Sleep` struct is embedded into each registry. It governs the waking and sleeping
17 /// of workers. It has callbacks that are invoked periodically at significant events,
18 /// such as when workers are looping and looking for work, when latches are set, or when
19 /// jobs are published, and it either blocks threads or wakes them in response to these
20 /// events. See the [`README.md`] in this module for more details.
21 ///
22 /// [`README.md`] README.md
23 pub(super) struct Sleep {
24     logger: Logger,
25 
26     /// One "sleep state" per worker. Used to track if a worker is sleeping and to have
27     /// them block.
28     worker_sleep_states: Vec<CachePadded<WorkerSleepState>>,
29 
30     counters: AtomicCounters,
31 }
32 
33 /// An instance of this struct is created when a thread becomes idle.
34 /// It is consumed when the thread finds work, and passed by `&mut`
35 /// reference for operations that preserve the idle state. (In other
36 /// words, producing one of these structs is evidence the thread is
37 /// idle.) It tracks state such as how long the thread has been idle.
38 pub(super) struct IdleState {
39     /// What is worker index of the idle thread?
40     worker_index: usize,
41 
42     /// How many rounds have we been circling without sleeping?
43     rounds: u32,
44 
45     /// Once we become sleepy, what was the sleepy counter value?
46     /// Set to `INVALID_SLEEPY_COUNTER` otherwise.
47     jobs_counter: JobsEventCounter,
48 }
49 
50 /// The "sleep state" for an individual worker.
51 #[derive(Default)]
52 struct WorkerSleepState {
53     /// Set to true when the worker goes to sleep; set to false when
54     /// the worker is notified or when it wakes.
55     is_blocked: Mutex<bool>,
56 
57     condvar: Condvar,
58 }
59 
60 const ROUNDS_UNTIL_SLEEPY: u32 = 32;
61 const ROUNDS_UNTIL_SLEEPING: u32 = ROUNDS_UNTIL_SLEEPY + 1;
62 
63 impl Sleep {
new(logger: Logger, n_threads: usize) -> Sleep64     pub(super) fn new(logger: Logger, n_threads: usize) -> Sleep {
65         Sleep {
66             logger,
67             worker_sleep_states: (0..n_threads).map(|_| Default::default()).collect(),
68             counters: AtomicCounters::new(),
69         }
70     }
71 
72     #[inline]
start_looking(&self, worker_index: usize, latch: &CoreLatch) -> IdleState73     pub(super) fn start_looking(&self, worker_index: usize, latch: &CoreLatch) -> IdleState {
74         self.logger.log(|| ThreadIdle {
75             worker: worker_index,
76             latch_addr: latch.addr(),
77         });
78 
79         self.counters.add_inactive_thread();
80 
81         IdleState {
82             worker_index,
83             rounds: 0,
84             jobs_counter: JobsEventCounter::DUMMY,
85         }
86     }
87 
88     #[inline]
work_found(&self, idle_state: IdleState)89     pub(super) fn work_found(&self, idle_state: IdleState) {
90         self.logger.log(|| ThreadFoundWork {
91             worker: idle_state.worker_index,
92             yields: idle_state.rounds,
93         });
94 
95         // If we were the last idle thread and other threads are still sleeping,
96         // then we should wake up another thread.
97         let threads_to_wake = self.counters.sub_inactive_thread();
98         self.wake_any_threads(threads_to_wake as u32);
99     }
100 
101     #[inline]
no_work_found( &self, idle_state: &mut IdleState, latch: &CoreLatch, has_injected_jobs: impl FnOnce() -> bool, )102     pub(super) fn no_work_found(
103         &self,
104         idle_state: &mut IdleState,
105         latch: &CoreLatch,
106         has_injected_jobs: impl FnOnce() -> bool,
107     ) {
108         if idle_state.rounds < ROUNDS_UNTIL_SLEEPY {
109             thread::yield_now();
110             idle_state.rounds += 1;
111         } else if idle_state.rounds == ROUNDS_UNTIL_SLEEPY {
112             idle_state.jobs_counter = self.announce_sleepy(idle_state.worker_index);
113             idle_state.rounds += 1;
114             thread::yield_now();
115         } else if idle_state.rounds < ROUNDS_UNTIL_SLEEPING {
116             idle_state.rounds += 1;
117             thread::yield_now();
118         } else {
119             debug_assert_eq!(idle_state.rounds, ROUNDS_UNTIL_SLEEPING);
120             self.sleep(idle_state, latch, has_injected_jobs);
121         }
122     }
123 
124     #[cold]
announce_sleepy(&self, worker_index: usize) -> JobsEventCounter125     fn announce_sleepy(&self, worker_index: usize) -> JobsEventCounter {
126         let counters = self
127             .counters
128             .increment_jobs_event_counter_if(JobsEventCounter::is_active);
129         let jobs_counter = counters.jobs_counter();
130         self.logger.log(|| ThreadSleepy {
131             worker: worker_index,
132             jobs_counter: jobs_counter.as_usize(),
133         });
134         jobs_counter
135     }
136 
137     #[cold]
sleep( &self, idle_state: &mut IdleState, latch: &CoreLatch, has_injected_jobs: impl FnOnce() -> bool, )138     fn sleep(
139         &self,
140         idle_state: &mut IdleState,
141         latch: &CoreLatch,
142         has_injected_jobs: impl FnOnce() -> bool,
143     ) {
144         let worker_index = idle_state.worker_index;
145 
146         if !latch.get_sleepy() {
147             self.logger.log(|| ThreadSleepInterruptedByLatch {
148                 worker: worker_index,
149                 latch_addr: latch.addr(),
150             });
151 
152             return;
153         }
154 
155         let sleep_state = &self.worker_sleep_states[worker_index];
156         let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
157         debug_assert!(!*is_blocked);
158 
159         // Our latch was signalled. We should wake back up fully as we
160         // wil have some stuff to do.
161         if !latch.fall_asleep() {
162             self.logger.log(|| ThreadSleepInterruptedByLatch {
163                 worker: worker_index,
164                 latch_addr: latch.addr(),
165             });
166 
167             idle_state.wake_fully();
168             return;
169         }
170 
171         loop {
172             let counters = self.counters.load(Ordering::SeqCst);
173 
174             // Check if the JEC has changed since we got sleepy.
175             debug_assert!(idle_state.jobs_counter.is_sleepy());
176             if counters.jobs_counter() != idle_state.jobs_counter {
177                 // JEC has changed, so a new job was posted, but for some reason
178                 // we didn't see it. We should return to just before the SLEEPY
179                 // state so we can do another search and (if we fail to find
180                 // work) go back to sleep.
181                 self.logger.log(|| ThreadSleepInterruptedByJob {
182                     worker: worker_index,
183                 });
184 
185                 idle_state.wake_partly();
186                 latch.wake_up();
187                 return;
188             }
189 
190             // Otherwise, let's move from IDLE to SLEEPING.
191             if self.counters.try_add_sleeping_thread(counters) {
192                 break;
193             }
194         }
195 
196         // Successfully registered as asleep.
197 
198         self.logger.log(|| ThreadSleeping {
199             worker: worker_index,
200             latch_addr: latch.addr(),
201         });
202 
203         // We have one last check for injected jobs to do. This protects against
204         // deadlock in the very unlikely event that
205         //
206         // - an external job is being injected while we are sleepy
207         // - that job triggers the rollover over the JEC such that we don't see it
208         // - we are the last active worker thread
209         std::sync::atomic::fence(Ordering::SeqCst);
210         if has_injected_jobs() {
211             // If we see an externally injected job, then we have to 'wake
212             // ourselves up'. (Ordinarily, `sub_sleeping_thread` is invoked by
213             // the one that wakes us.)
214             self.counters.sub_sleeping_thread();
215         } else {
216             // If we don't see an injected job (the normal case), then flag
217             // ourselves as asleep and wait till we are notified.
218             //
219             // (Note that `is_blocked` is held under a mutex and the mutex was
220             // acquired *before* we incremented the "sleepy counter". This means
221             // that whomever is coming to wake us will have to wait until we
222             // release the mutex in the call to `wait`, so they will see this
223             // boolean as true.)
224             *is_blocked = true;
225             while *is_blocked {
226                 is_blocked = sleep_state.condvar.wait(is_blocked).unwrap();
227             }
228         }
229 
230         // Update other state:
231         idle_state.wake_fully();
232         latch.wake_up();
233 
234         self.logger.log(|| ThreadAwoken {
235             worker: worker_index,
236             latch_addr: latch.addr(),
237         });
238     }
239 
240     /// Notify the given thread that it should wake up (if it is
241     /// sleeping).  When this method is invoked, we typically know the
242     /// thread is asleep, though in rare cases it could have been
243     /// awoken by (e.g.) new work having been posted.
notify_worker_latch_is_set(&self, target_worker_index: usize)244     pub(super) fn notify_worker_latch_is_set(&self, target_worker_index: usize) {
245         self.wake_specific_thread(target_worker_index);
246     }
247 
248     /// Signals that `num_jobs` new jobs were injected into the thread
249     /// pool from outside. This function will ensure that there are
250     /// threads available to process them, waking threads from sleep
251     /// if necessary.
252     ///
253     /// # Parameters
254     ///
255     /// - `source_worker_index` -- index of the thread that did the
256     ///   push, or `usize::MAX` if this came from outside the thread
257     ///   pool -- it is used only for logging.
258     /// - `num_jobs` -- lower bound on number of jobs available for stealing.
259     ///   We'll try to get at least one thread per job.
260     #[inline]
new_injected_jobs( &self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool, )261     pub(super) fn new_injected_jobs(
262         &self,
263         source_worker_index: usize,
264         num_jobs: u32,
265         queue_was_empty: bool,
266     ) {
267         // This fence is needed to guarantee that threads
268         // as they are about to fall asleep, observe any
269         // new jobs that may have been injected.
270         std::sync::atomic::fence(Ordering::SeqCst);
271 
272         self.new_jobs(source_worker_index, num_jobs, queue_was_empty)
273     }
274 
275     /// Signals that `num_jobs` new jobs were pushed onto a thread's
276     /// local deque. This function will try to ensure that there are
277     /// threads available to process them, waking threads from sleep
278     /// if necessary. However, this is not guaranteed: under certain
279     /// race conditions, the function may fail to wake any new
280     /// threads; in that case the existing thread should eventually
281     /// pop the job.
282     ///
283     /// # Parameters
284     ///
285     /// - `source_worker_index` -- index of the thread that did the
286     ///   push, or `usize::MAX` if this came from outside the thread
287     ///   pool -- it is used only for logging.
288     /// - `num_jobs` -- lower bound on number of jobs available for stealing.
289     ///   We'll try to get at least one thread per job.
290     #[inline]
new_internal_jobs( &self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool, )291     pub(super) fn new_internal_jobs(
292         &self,
293         source_worker_index: usize,
294         num_jobs: u32,
295         queue_was_empty: bool,
296     ) {
297         self.new_jobs(source_worker_index, num_jobs, queue_was_empty)
298     }
299 
300     /// Common helper for `new_injected_jobs` and `new_internal_jobs`.
301     #[inline]
new_jobs(&self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool)302     fn new_jobs(&self, source_worker_index: usize, num_jobs: u32, queue_was_empty: bool) {
303         // Read the counters and -- if sleepy workers have announced themselves
304         // -- announce that there is now work available. The final value of `counters`
305         // with which we exit the loop thus corresponds to a state when
306         let counters = self
307             .counters
308             .increment_jobs_event_counter_if(JobsEventCounter::is_sleepy);
309         let num_awake_but_idle = counters.awake_but_idle_threads();
310         let num_sleepers = counters.sleeping_threads();
311 
312         self.logger.log(|| JobThreadCounts {
313             worker: source_worker_index,
314             num_idle: num_awake_but_idle as u16,
315             num_sleepers: num_sleepers as u16,
316         });
317 
318         if num_sleepers == 0 {
319             // nobody to wake
320             return;
321         }
322 
323         // Promote from u16 to u32 so we can interoperate with
324         // num_jobs more easily.
325         let num_awake_but_idle = num_awake_but_idle as u32;
326         let num_sleepers = num_sleepers as u32;
327 
328         // If the queue is non-empty, then we always wake up a worker
329         // -- clearly the existing idle jobs aren't enough. Otherwise,
330         // check to see if we have enough idle workers.
331         if !queue_was_empty {
332             let num_to_wake = std::cmp::min(num_jobs, num_sleepers);
333             self.wake_any_threads(num_to_wake);
334         } else if num_awake_but_idle < num_jobs {
335             let num_to_wake = std::cmp::min(num_jobs - num_awake_but_idle, num_sleepers);
336             self.wake_any_threads(num_to_wake);
337         }
338     }
339 
340     #[cold]
wake_any_threads(&self, mut num_to_wake: u32)341     fn wake_any_threads(&self, mut num_to_wake: u32) {
342         if num_to_wake > 0 {
343             for i in 0..self.worker_sleep_states.len() {
344                 if self.wake_specific_thread(i) {
345                     num_to_wake -= 1;
346                     if num_to_wake == 0 {
347                         return;
348                     }
349                 }
350             }
351         }
352     }
353 
wake_specific_thread(&self, index: usize) -> bool354     fn wake_specific_thread(&self, index: usize) -> bool {
355         let sleep_state = &self.worker_sleep_states[index];
356 
357         let mut is_blocked = sleep_state.is_blocked.lock().unwrap();
358         if *is_blocked {
359             *is_blocked = false;
360             sleep_state.condvar.notify_one();
361 
362             // When the thread went to sleep, it will have incremented
363             // this value. When we wake it, its our job to decrement
364             // it. We could have the thread do it, but that would
365             // introduce a delay between when the thread was
366             // *notified* and when this counter was decremented. That
367             // might mislead people with new work into thinking that
368             // there are sleeping threads that they should try to
369             // wake, when in fact there is nothing left for them to
370             // do.
371             self.counters.sub_sleeping_thread();
372 
373             self.logger.log(|| ThreadNotify { worker: index });
374 
375             true
376         } else {
377             false
378         }
379     }
380 }
381 
382 impl IdleState {
wake_fully(&mut self)383     fn wake_fully(&mut self) {
384         self.rounds = 0;
385         self.jobs_counter = JobsEventCounter::DUMMY;
386     }
387 
wake_partly(&mut self)388     fn wake_partly(&mut self) {
389         self.rounds = ROUNDS_UNTIL_SLEEPY;
390         self.jobs_counter = JobsEventCounter::DUMMY;
391     }
392 }
393