1 //! A scheduler is initialized with a fixed number of workers. Each worker is
2 //! driven by a thread. Each worker has a "core" which contains data such as the
3 //! run queue and other state. When `block_in_place` is called, the worker's
4 //! "core" is handed off to a new thread allowing the scheduler to continue to
5 //! make progress while the originating thread blocks.
6 //!
7 //! # Shutdown
8 //!
9 //! Shutting down the runtime involves the following steps:
10 //!
11 //!  1. The Shared::close method is called. This closes the inject queue and
12 //!     OwnedTasks instance and wakes up all worker threads.
13 //!
14 //!  2. Each worker thread observes the close signal next time it runs
15 //!     Core::maintenance by checking whether the inject queue is closed.
16 //!     The Core::is_shutdown flag is set to true.
17 //!
18 //!  3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
19 //!     will keep removing tasks from OwnedTasks until it is empty. No new
20 //!     tasks can be pushed to the OwnedTasks during or after this step as it
21 //!     was closed in step 1.
22 //!
23 //!  5. The workers call Shared::shutdown to enter the single-threaded phase of
24 //!     shutdown. These calls will push their core to Shared::shutdown_cores,
25 //!     and the last thread to push its core will finish the shutdown procedure.
26 //!
27 //!  6. The local run queue of each core is emptied, then the inject queue is
28 //!     emptied.
29 //!
30 //! At this point, shutdown has completed. It is not possible for any of the
31 //! collections to contain any tasks at this point, as each collection was
32 //! closed first, then emptied afterwards.
33 //!
34 //! ## Spawns during shutdown
35 //!
36 //! When spawning tasks during shutdown, there are two cases:
37 //!
38 //!  * The spawner observes the OwnedTasks being open, and the inject queue is
39 //!    closed.
40 //!  * The spawner observes the OwnedTasks being closed and doesn't check the
41 //!    inject queue.
42 //!
43 //! The first case can only happen if the OwnedTasks::bind call happens before
44 //! or during step 1 of shutdown. In this case, the runtime will clean up the
45 //! task in step 3 of shutdown.
46 //!
47 //! In the latter case, the task was not spawned and the task is immediately
48 //! cancelled by the spawner.
49 //!
50 //! The correctness of shutdown requires both the inject queue and OwnedTasks
51 //! collection to have a closed bit. With a close bit on only the inject queue,
52 //! spawning could run in to a situation where a task is successfully bound long
53 //! after the runtime has shut down. With a close bit on only the OwnedTasks,
54 //! the first spawning situation could result in the notification being pushed
55 //! to the inject queue after step 6 of shutdown, which would leave a task in
56 //! the inject queue indefinitely. This would be a ref-count cycle and a memory
57 //! leak.
58 
59 use crate::coop;
60 use crate::future::Future;
61 use crate::loom::rand::seed;
62 use crate::loom::sync::{Arc, Mutex};
63 use crate::park::{Park, Unpark};
64 use crate::runtime;
65 use crate::runtime::enter::EnterContext;
66 use crate::runtime::park::{Parker, Unparker};
67 use crate::runtime::stats::{RuntimeStats, WorkerStatsBatcher};
68 use crate::runtime::task::{Inject, JoinHandle, OwnedTasks};
69 use crate::runtime::thread_pool::{AtomicCell, Idle};
70 use crate::runtime::{queue, task, Callback};
71 use crate::util::FastRand;
72 
73 use std::cell::RefCell;
74 use std::time::Duration;
75 
76 /// A scheduler worker
77 pub(super) struct Worker {
78     /// Reference to shared state
79     shared: Arc<Shared>,
80 
81     /// Index holding this worker's remote state
82     index: usize,
83 
84     /// Used to hand-off a worker's core to another thread.
85     core: AtomicCell<Core>,
86 }
87 
88 /// Core data
89 struct Core {
90     /// Used to schedule bookkeeping tasks every so often.
91     tick: u8,
92 
93     /// When a task is scheduled from a worker, it is stored in this slot. The
94     /// worker will check this slot for a task **before** checking the run
95     /// queue. This effectively results in the **last** scheduled task to be run
96     /// next (LIFO). This is an optimization for message passing patterns and
97     /// helps to reduce latency.
98     lifo_slot: Option<Notified>,
99 
100     /// The worker-local run queue.
101     run_queue: queue::Local<Arc<Shared>>,
102 
103     /// True if the worker is currently searching for more work. Searching
104     /// involves attempting to steal from other workers.
105     is_searching: bool,
106 
107     /// True if the scheduler is being shutdown
108     is_shutdown: bool,
109 
110     /// Parker
111     ///
112     /// Stored in an `Option` as the parker is added / removed to make the
113     /// borrow checker happy.
114     park: Option<Parker>,
115 
116     /// Batching stats so they can be submitted to RuntimeStats.
117     stats: WorkerStatsBatcher,
118 
119     /// Fast random number generator.
120     rand: FastRand,
121 }
122 
123 /// State shared across all workers
124 pub(super) struct Shared {
125     /// Per-worker remote state. All other workers have access to this and is
126     /// how they communicate between each other.
127     remotes: Box<[Remote]>,
128 
129     /// Submits work to the scheduler while **not** currently on a worker thread.
130     inject: Inject<Arc<Shared>>,
131 
132     /// Coordinates idle workers
133     idle: Idle,
134 
135     /// Collection of all active tasks spawned onto this executor.
136     owned: OwnedTasks<Arc<Shared>>,
137 
138     /// Cores that have observed the shutdown signal
139     ///
140     /// The core is **not** placed back in the worker to avoid it from being
141     /// stolen by a thread that was spawned as part of `block_in_place`.
142     #[allow(clippy::vec_box)] // we're moving an already-boxed value
143     shutdown_cores: Mutex<Vec<Box<Core>>>,
144 
145     /// Callback for a worker parking itself
146     before_park: Option<Callback>,
147     /// Callback for a worker unparking itself
148     after_unpark: Option<Callback>,
149 
150     /// Collects stats from the runtime.
151     stats: RuntimeStats,
152 }
153 
154 /// Used to communicate with a worker from other threads.
155 struct Remote {
156     /// Steals tasks from this worker.
157     steal: queue::Steal<Arc<Shared>>,
158 
159     /// Unparks the associated worker thread
160     unpark: Unparker,
161 }
162 
163 /// Thread-local context
164 struct Context {
165     /// Worker
166     worker: Arc<Worker>,
167 
168     /// Core data
169     core: RefCell<Option<Box<Core>>>,
170 }
171 
172 /// Starts the workers
173 pub(crate) struct Launch(Vec<Arc<Worker>>);
174 
175 /// Running a task may consume the core. If the core is still available when
176 /// running the task completes, it is returned. Otherwise, the worker will need
177 /// to stop processing.
178 type RunResult = Result<Box<Core>, ()>;
179 
180 /// A task handle
181 type Task = task::Task<Arc<Shared>>;
182 
183 /// A notified task handle
184 type Notified = task::Notified<Arc<Shared>>;
185 
186 // Tracks thread-local state
187 scoped_thread_local!(static CURRENT: Context);
188 
create( size: usize, park: Parker, before_park: Option<Callback>, after_unpark: Option<Callback>, ) -> (Arc<Shared>, Launch)189 pub(super) fn create(
190     size: usize,
191     park: Parker,
192     before_park: Option<Callback>,
193     after_unpark: Option<Callback>,
194 ) -> (Arc<Shared>, Launch) {
195     let mut cores = vec![];
196     let mut remotes = vec![];
197 
198     // Create the local queues
199     for i in 0..size {
200         let (steal, run_queue) = queue::local();
201 
202         let park = park.clone();
203         let unpark = park.unpark();
204 
205         cores.push(Box::new(Core {
206             tick: 0,
207             lifo_slot: None,
208             run_queue,
209             is_searching: false,
210             is_shutdown: false,
211             park: Some(park),
212             stats: WorkerStatsBatcher::new(i),
213             rand: FastRand::new(seed()),
214         }));
215 
216         remotes.push(Remote { steal, unpark });
217     }
218 
219     let shared = Arc::new(Shared {
220         remotes: remotes.into_boxed_slice(),
221         inject: Inject::new(),
222         idle: Idle::new(size),
223         owned: OwnedTasks::new(),
224         shutdown_cores: Mutex::new(vec![]),
225         before_park,
226         after_unpark,
227         stats: RuntimeStats::new(size),
228     });
229 
230     let mut launch = Launch(vec![]);
231 
232     for (index, core) in cores.drain(..).enumerate() {
233         launch.0.push(Arc::new(Worker {
234             shared: shared.clone(),
235             index,
236             core: AtomicCell::new(Some(core)),
237         }));
238     }
239 
240     (shared, launch)
241 }
242 
block_in_place<F, R>(f: F) -> R where F: FnOnce() -> R,243 pub(crate) fn block_in_place<F, R>(f: F) -> R
244 where
245     F: FnOnce() -> R,
246 {
247     // Try to steal the worker core back
248     struct Reset(coop::Budget);
249 
250     impl Drop for Reset {
251         fn drop(&mut self) {
252             CURRENT.with(|maybe_cx| {
253                 if let Some(cx) = maybe_cx {
254                     let core = cx.worker.core.take();
255                     let mut cx_core = cx.core.borrow_mut();
256                     assert!(cx_core.is_none());
257                     *cx_core = core;
258 
259                     // Reset the task budget as we are re-entering the
260                     // runtime.
261                     coop::set(self.0);
262                 }
263             });
264         }
265     }
266 
267     let mut had_entered = false;
268 
269     CURRENT.with(|maybe_cx| {
270         match (crate::runtime::enter::context(), maybe_cx.is_some()) {
271             (EnterContext::Entered { .. }, true) => {
272                 // We are on a thread pool runtime thread, so we just need to
273                 // set up blocking.
274                 had_entered = true;
275             }
276             (EnterContext::Entered { allow_blocking }, false) => {
277                 // We are on an executor, but _not_ on the thread pool.  That is
278                 // _only_ okay if we are in a thread pool runtime's block_on
279                 // method:
280                 if allow_blocking {
281                     had_entered = true;
282                     return;
283                 } else {
284                     // This probably means we are on the basic_scheduler or in a
285                     // LocalSet, where it is _not_ okay to block.
286                     panic!("can call blocking only when running on the multi-threaded runtime");
287                 }
288             }
289             (EnterContext::NotEntered, true) => {
290                 // This is a nested call to block_in_place (we already exited).
291                 // All the necessary setup has already been done.
292                 return;
293             }
294             (EnterContext::NotEntered, false) => {
295                 // We are outside of the tokio runtime, so blocking is fine.
296                 // We can also skip all of the thread pool blocking setup steps.
297                 return;
298             }
299         }
300 
301         let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
302 
303         // Get the worker core. If none is set, then blocking is fine!
304         let core = match cx.core.borrow_mut().take() {
305             Some(core) => core,
306             None => return,
307         };
308 
309         // The parker should be set here
310         assert!(core.park.is_some());
311 
312         // In order to block, the core must be sent to another thread for
313         // execution.
314         //
315         // First, move the core back into the worker's shared core slot.
316         cx.worker.core.set(core);
317 
318         // Next, clone the worker handle and send it to a new thread for
319         // processing.
320         //
321         // Once the blocking task is done executing, we will attempt to
322         // steal the core back.
323         let worker = cx.worker.clone();
324         runtime::spawn_blocking(move || run(worker));
325     });
326 
327     if had_entered {
328         // Unset the current task's budget. Blocking sections are not
329         // constrained by task budgets.
330         let _reset = Reset(coop::stop());
331 
332         crate::runtime::enter::exit(f)
333     } else {
334         f()
335     }
336 }
337 
338 /// After how many ticks is the global queue polled. This helps to ensure
339 /// fairness.
340 ///
341 /// The number is fairly arbitrary. I believe this value was copied from golang.
342 const GLOBAL_POLL_INTERVAL: u8 = 61;
343 
344 impl Launch {
launch(mut self)345     pub(crate) fn launch(mut self) {
346         for worker in self.0.drain(..) {
347             runtime::spawn_blocking(move || run(worker));
348         }
349     }
350 }
351 
run(worker: Arc<Worker>)352 fn run(worker: Arc<Worker>) {
353     // Acquire a core. If this fails, then another thread is running this
354     // worker and there is nothing further to do.
355     let core = match worker.core.take() {
356         Some(core) => core,
357         None => return,
358     };
359 
360     // Set the worker context.
361     let cx = Context {
362         worker,
363         core: RefCell::new(None),
364     };
365 
366     let _enter = crate::runtime::enter(true);
367 
368     CURRENT.set(&cx, || {
369         // This should always be an error. It only returns a `Result` to support
370         // using `?` to short circuit.
371         assert!(cx.run(core).is_err());
372     });
373 }
374 
375 impl Context {
run(&self, mut core: Box<Core>) -> RunResult376     fn run(&self, mut core: Box<Core>) -> RunResult {
377         while !core.is_shutdown {
378             // Increment the tick
379             core.tick();
380 
381             // Run maintenance, if needed
382             core = self.maintenance(core);
383 
384             // First, check work available to the current worker.
385             if let Some(task) = core.next_task(&self.worker) {
386                 core = self.run_task(task, core)?;
387                 continue;
388             }
389 
390             // There is no more **local** work to process, try to steal work
391             // from other workers.
392             if let Some(task) = core.steal_work(&self.worker) {
393                 core = self.run_task(task, core)?;
394             } else {
395                 // Wait for work
396                 core = self.park(core);
397             }
398         }
399 
400         core.pre_shutdown(&self.worker);
401 
402         // Signal shutdown
403         self.worker.shared.shutdown(core);
404         Err(())
405     }
406 
run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult407     fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
408         let task = self.worker.shared.owned.assert_owner(task);
409 
410         // Make sure the worker is not in the **searching** state. This enables
411         // another idle worker to try to steal work.
412         core.transition_from_searching(&self.worker);
413 
414         // Make the core available to the runtime context
415         core.stats.incr_poll_count();
416         *self.core.borrow_mut() = Some(core);
417 
418         // Run the task
419         coop::budget(|| {
420             task.run();
421 
422             // As long as there is budget remaining and a task exists in the
423             // `lifo_slot`, then keep running.
424             loop {
425                 // Check if we still have the core. If not, the core was stolen
426                 // by another worker.
427                 let mut core = match self.core.borrow_mut().take() {
428                     Some(core) => core,
429                     None => return Err(()),
430                 };
431 
432                 // Check for a task in the LIFO slot
433                 let task = match core.lifo_slot.take() {
434                     Some(task) => task,
435                     None => return Ok(core),
436                 };
437 
438                 if coop::has_budget_remaining() {
439                     // Run the LIFO task, then loop
440                     core.stats.incr_poll_count();
441                     *self.core.borrow_mut() = Some(core);
442                     let task = self.worker.shared.owned.assert_owner(task);
443                     task.run();
444                 } else {
445                     // Not enough budget left to run the LIFO task, push it to
446                     // the back of the queue and return.
447                     core.run_queue.push_back(task, self.worker.inject());
448                     return Ok(core);
449                 }
450             }
451         })
452     }
453 
maintenance(&self, mut core: Box<Core>) -> Box<Core>454     fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
455         if core.tick % GLOBAL_POLL_INTERVAL == 0 {
456             // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
457             // to run without actually putting the thread to sleep.
458             core = self.park_timeout(core, Some(Duration::from_millis(0)));
459 
460             // Run regularly scheduled maintenance
461             core.maintenance(&self.worker);
462         }
463 
464         core
465     }
466 
park(&self, mut core: Box<Core>) -> Box<Core>467     fn park(&self, mut core: Box<Core>) -> Box<Core> {
468         if let Some(f) = &self.worker.shared.before_park {
469             f();
470         }
471 
472         if core.transition_to_parked(&self.worker) {
473             while !core.is_shutdown {
474                 core = self.park_timeout(core, None);
475 
476                 // Run regularly scheduled maintenance
477                 core.maintenance(&self.worker);
478 
479                 if core.transition_from_parked(&self.worker) {
480                     break;
481                 }
482             }
483         }
484 
485         if let Some(f) = &self.worker.shared.after_unpark {
486             f();
487         }
488         core
489     }
490 
park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core>491     fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
492         // Take the parker out of core
493         let mut park = core.park.take().expect("park missing");
494 
495         core.stats.about_to_park();
496 
497         // Store `core` in context
498         *self.core.borrow_mut() = Some(core);
499 
500         // Park thread
501         if let Some(timeout) = duration {
502             park.park_timeout(timeout).expect("park failed");
503         } else {
504             park.park().expect("park failed");
505         }
506 
507         // Remove `core` from context
508         core = self.core.borrow_mut().take().expect("core missing");
509 
510         // Place `park` back in `core`
511         core.park = Some(park);
512 
513         // If there are tasks available to steal, notify a worker
514         if core.run_queue.is_stealable() {
515             self.worker.shared.notify_parked();
516         }
517 
518         core.stats.returned_from_park();
519 
520         core
521     }
522 }
523 
524 impl Core {
525     /// Increment the tick
tick(&mut self)526     fn tick(&mut self) {
527         self.tick = self.tick.wrapping_add(1);
528     }
529 
530     /// Return the next notified task available to this worker.
next_task(&mut self, worker: &Worker) -> Option<Notified>531     fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
532         if self.tick % GLOBAL_POLL_INTERVAL == 0 {
533             worker.inject().pop().or_else(|| self.next_local_task())
534         } else {
535             self.next_local_task().or_else(|| worker.inject().pop())
536         }
537     }
538 
next_local_task(&mut self) -> Option<Notified>539     fn next_local_task(&mut self) -> Option<Notified> {
540         self.lifo_slot.take().or_else(|| self.run_queue.pop())
541     }
542 
steal_work(&mut self, worker: &Worker) -> Option<Notified>543     fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
544         if !self.transition_to_searching(worker) {
545             return None;
546         }
547 
548         let num = worker.shared.remotes.len();
549         // Start from a random worker
550         let start = self.rand.fastrand_n(num as u32) as usize;
551 
552         for i in 0..num {
553             let i = (start + i) % num;
554 
555             // Don't steal from ourself! We know we don't have work.
556             if i == worker.index {
557                 continue;
558             }
559 
560             let target = &worker.shared.remotes[i];
561             if let Some(task) = target
562                 .steal
563                 .steal_into(&mut self.run_queue, &mut self.stats)
564             {
565                 return Some(task);
566             }
567         }
568 
569         // Fallback on checking the global queue
570         worker.shared.inject.pop()
571     }
572 
transition_to_searching(&mut self, worker: &Worker) -> bool573     fn transition_to_searching(&mut self, worker: &Worker) -> bool {
574         if !self.is_searching {
575             self.is_searching = worker.shared.idle.transition_worker_to_searching();
576         }
577 
578         self.is_searching
579     }
580 
transition_from_searching(&mut self, worker: &Worker)581     fn transition_from_searching(&mut self, worker: &Worker) {
582         if !self.is_searching {
583             return;
584         }
585 
586         self.is_searching = false;
587         worker.shared.transition_worker_from_searching();
588     }
589 
590     /// Prepares the worker state for parking.
591     ///
592     /// Returns true if the transition happend, false if there is work to do first.
transition_to_parked(&mut self, worker: &Worker) -> bool593     fn transition_to_parked(&mut self, worker: &Worker) -> bool {
594         // Workers should not park if they have work to do
595         if self.lifo_slot.is_some() || self.run_queue.has_tasks() {
596             return false;
597         }
598 
599         // When the final worker transitions **out** of searching to parked, it
600         // must check all the queues one last time in case work materialized
601         // between the last work scan and transitioning out of searching.
602         let is_last_searcher = worker
603             .shared
604             .idle
605             .transition_worker_to_parked(worker.index, self.is_searching);
606 
607         // The worker is no longer searching. Setting this is the local cache
608         // only.
609         self.is_searching = false;
610 
611         if is_last_searcher {
612             worker.shared.notify_if_work_pending();
613         }
614 
615         true
616     }
617 
618     /// Returns `true` if the transition happened.
transition_from_parked(&mut self, worker: &Worker) -> bool619     fn transition_from_parked(&mut self, worker: &Worker) -> bool {
620         // If a task is in the lifo slot, then we must unpark regardless of
621         // being notified
622         if self.lifo_slot.is_some() {
623             worker.shared.idle.unpark_worker_by_id(worker.index);
624             self.is_searching = true;
625             return true;
626         }
627 
628         if worker.shared.idle.is_parked(worker.index) {
629             return false;
630         }
631 
632         // When unparked, the worker is in the searching state.
633         self.is_searching = true;
634         true
635     }
636 
637     /// Runs maintenance work such as checking the pool's state.
maintenance(&mut self, worker: &Worker)638     fn maintenance(&mut self, worker: &Worker) {
639         self.stats.submit(&worker.shared.stats);
640 
641         if !self.is_shutdown {
642             // Check if the scheduler has been shutdown
643             self.is_shutdown = worker.inject().is_closed();
644         }
645     }
646 
647     /// Signals all tasks to shut down, and waits for them to complete. Must run
648     /// before we enter the single-threaded phase of shutdown processing.
pre_shutdown(&mut self, worker: &Worker)649     fn pre_shutdown(&mut self, worker: &Worker) {
650         // Signal to all tasks to shut down.
651         worker.shared.owned.close_and_shutdown_all();
652 
653         self.stats.submit(&worker.shared.stats);
654     }
655 
656     /// Shuts down the core.
shutdown(&mut self)657     fn shutdown(&mut self) {
658         // Take the core
659         let mut park = self.park.take().expect("park missing");
660 
661         // Drain the queue
662         while self.next_local_task().is_some() {}
663 
664         park.shutdown();
665     }
666 }
667 
668 impl Worker {
669     /// Returns a reference to the scheduler's injection queue.
inject(&self) -> &Inject<Arc<Shared>>670     fn inject(&self) -> &Inject<Arc<Shared>> {
671         &self.shared.inject
672     }
673 }
674 
675 impl task::Schedule for Arc<Shared> {
release(&self, task: &Task) -> Option<Task>676     fn release(&self, task: &Task) -> Option<Task> {
677         self.owned.remove(task)
678     }
679 
schedule(&self, task: Notified)680     fn schedule(&self, task: Notified) {
681         (**self).schedule(task, false);
682     }
683 
yield_now(&self, task: Notified)684     fn yield_now(&self, task: Notified) {
685         (**self).schedule(task, true);
686     }
687 }
688 
689 impl Shared {
bind_new_task<T>(me: &Arc<Self>, future: T) -> JoinHandle<T::Output> where T: Future + Send + 'static, T::Output: Send + 'static,690     pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T) -> JoinHandle<T::Output>
691     where
692         T: Future + Send + 'static,
693         T::Output: Send + 'static,
694     {
695         let (handle, notified) = me.owned.bind(future, me.clone());
696 
697         if let Some(notified) = notified {
698             me.schedule(notified, false);
699         }
700 
701         handle
702     }
703 
stats(&self) -> &RuntimeStats704     pub(crate) fn stats(&self) -> &RuntimeStats {
705         &self.stats
706     }
707 
schedule(&self, task: Notified, is_yield: bool)708     pub(super) fn schedule(&self, task: Notified, is_yield: bool) {
709         CURRENT.with(|maybe_cx| {
710             if let Some(cx) = maybe_cx {
711                 // Make sure the task is part of the **current** scheduler.
712                 if self.ptr_eq(&cx.worker.shared) {
713                     // And the current thread still holds a core
714                     if let Some(core) = cx.core.borrow_mut().as_mut() {
715                         self.schedule_local(core, task, is_yield);
716                         return;
717                     }
718                 }
719             }
720 
721             // Otherwise, use the inject queue.
722             self.inject.push(task);
723             self.notify_parked();
724         })
725     }
726 
schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool)727     fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
728         // Spawning from the worker thread. If scheduling a "yield" then the
729         // task must always be pushed to the back of the queue, enabling other
730         // tasks to be executed. If **not** a yield, then there is more
731         // flexibility and the task may go to the front of the queue.
732         let should_notify = if is_yield {
733             core.run_queue.push_back(task, &self.inject);
734             true
735         } else {
736             // Push to the LIFO slot
737             let prev = core.lifo_slot.take();
738             let ret = prev.is_some();
739 
740             if let Some(prev) = prev {
741                 core.run_queue.push_back(prev, &self.inject);
742             }
743 
744             core.lifo_slot = Some(task);
745 
746             ret
747         };
748 
749         // Only notify if not currently parked. If `park` is `None`, then the
750         // scheduling is from a resource driver. As notifications often come in
751         // batches, the notification is delayed until the park is complete.
752         if should_notify && core.park.is_some() {
753             self.notify_parked();
754         }
755     }
756 
close(&self)757     pub(super) fn close(&self) {
758         if self.inject.close() {
759             self.notify_all();
760         }
761     }
762 
notify_parked(&self)763     fn notify_parked(&self) {
764         if let Some(index) = self.idle.worker_to_notify() {
765             self.remotes[index].unpark.unpark();
766         }
767     }
768 
notify_all(&self)769     fn notify_all(&self) {
770         for remote in &self.remotes[..] {
771             remote.unpark.unpark();
772         }
773     }
774 
notify_if_work_pending(&self)775     fn notify_if_work_pending(&self) {
776         for remote in &self.remotes[..] {
777             if !remote.steal.is_empty() {
778                 self.notify_parked();
779                 return;
780             }
781         }
782 
783         if !self.inject.is_empty() {
784             self.notify_parked();
785         }
786     }
787 
transition_worker_from_searching(&self)788     fn transition_worker_from_searching(&self) {
789         if self.idle.transition_worker_from_searching() {
790             // We are the final searching worker. Because work was found, we
791             // need to notify another worker.
792             self.notify_parked();
793         }
794     }
795 
796     /// Signals that a worker has observed the shutdown signal and has replaced
797     /// its core back into its handle.
798     ///
799     /// If all workers have reached this point, the final cleanup is performed.
shutdown(&self, core: Box<Core>)800     fn shutdown(&self, core: Box<Core>) {
801         let mut cores = self.shutdown_cores.lock();
802         cores.push(core);
803 
804         if cores.len() != self.remotes.len() {
805             return;
806         }
807 
808         debug_assert!(self.owned.is_empty());
809 
810         for mut core in cores.drain(..) {
811             core.shutdown();
812         }
813 
814         // Drain the injection queue
815         //
816         // We already shut down every task, so we can simply drop the tasks.
817         while let Some(task) = self.inject.pop() {
818             drop(task);
819         }
820     }
821 
ptr_eq(&self, other: &Shared) -> bool822     fn ptr_eq(&self, other: &Shared) -> bool {
823         std::ptr::eq(self, other)
824     }
825 }
826