1 //! A scheduler is initialized with a fixed number of workers. Each worker is
2 //! driven by a thread. Each worker has a "core" which contains data such as the
3 //! run queue and other state. When `block_in_place` is called, the worker's
4 //! "core" is handed off to a new thread allowing the scheduler to continue to
5 //! make progress while the originating thread blocks.
6 //!
7 //! # Shutdown
8 //!
9 //! Shutting down the runtime involves the following steps:
10 //!
11 //!  1. The Shared::close method is called. This closes the inject queue and
12 //!     OwnedTasks instance and wakes up all worker threads.
13 //!
14 //!  2. Each worker thread observes the close signal next time it runs
15 //!     Core::maintenance by checking whether the inject queue is closed.
16 //!     The Core::is_shutdown flag is set to true.
17 //!
18 //!  3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
19 //!     will keep removing tasks from OwnedTasks until it is empty. No new
20 //!     tasks can be pushed to the OwnedTasks during or after this step as it
21 //!     was closed in step 1.
22 //!
23 //!  5. The workers call Shared::shutdown to enter the single-threaded phase of
24 //!     shutdown. These calls will push their core to Shared::shutdown_cores,
25 //!     and the last thread to push its core will finish the shutdown procedure.
26 //!
27 //!  6. The local run queue of each core is emptied, then the inject queue is
28 //!     emptied.
29 //!
30 //! At this point, shutdown has completed. It is not possible for any of the
31 //! collections to contain any tasks at this point, as each collection was
32 //! closed first, then emptied afterwards.
33 //!
34 //! ## Spawns during shutdown
35 //!
36 //! When spawning tasks during shutdown, there are two cases:
37 //!
38 //!  * The spawner observes the OwnedTasks being open, and the inject queue is
39 //!    closed.
40 //!  * The spawner observes the OwnedTasks being closed and doesn't check the
41 //!    inject queue.
42 //!
43 //! The first case can only happen if the OwnedTasks::bind call happens before
44 //! or during step 1 of shutdown. In this case, the runtime will clean up the
45 //! task in step 3 of shutdown.
46 //!
47 //! In the latter case, the task was not spawned and the task is immediately
48 //! cancelled by the spawner.
49 //!
50 //! The correctness of shutdown requires both the inject queue and OwnedTasks
51 //! collection to have a closed bit. With a close bit on only the inject queue,
52 //! spawning could run in to a situation where a task is successfully bound long
53 //! after the runtime has shut down. With a close bit on only the OwnedTasks,
54 //! the first spawning situation could result in the notification being pushed
55 //! to the inject queue after step 6 of shutdown, which would leave a task in
56 //! the inject queue indefinitely. This would be a ref-count cycle and a memory
57 //! leak.
58 
59 use crate::coop;
60 use crate::future::Future;
61 use crate::loom::rand::seed;
62 use crate::loom::sync::{Arc, Mutex};
63 use crate::park::{Park, Unpark};
64 use crate::runtime;
65 use crate::runtime::enter::EnterContext;
66 use crate::runtime::park::{Parker, Unparker};
67 use crate::runtime::task::{Inject, JoinHandle, OwnedTasks};
68 use crate::runtime::thread_pool::{AtomicCell, Idle};
69 use crate::runtime::{queue, task};
70 use crate::util::FastRand;
71 
72 use std::cell::RefCell;
73 use std::time::Duration;
74 
75 /// A scheduler worker
76 pub(super) struct Worker {
77     /// Reference to shared state
78     shared: Arc<Shared>,
79 
80     /// Index holding this worker's remote state
81     index: usize,
82 
83     /// Used to hand-off a worker's core to another thread.
84     core: AtomicCell<Core>,
85 }
86 
87 /// Core data
88 struct Core {
89     /// Used to schedule bookkeeping tasks every so often.
90     tick: u8,
91 
92     /// When a task is scheduled from a worker, it is stored in this slot. The
93     /// worker will check this slot for a task **before** checking the run
94     /// queue. This effectively results in the **last** scheduled task to be run
95     /// next (LIFO). This is an optimization for message passing patterns and
96     /// helps to reduce latency.
97     lifo_slot: Option<Notified>,
98 
99     /// The worker-local run queue.
100     run_queue: queue::Local<Arc<Shared>>,
101 
102     /// True if the worker is currently searching for more work. Searching
103     /// involves attempting to steal from other workers.
104     is_searching: bool,
105 
106     /// True if the scheduler is being shutdown
107     is_shutdown: bool,
108 
109     /// Parker
110     ///
111     /// Stored in an `Option` as the parker is added / removed to make the
112     /// borrow checker happy.
113     park: Option<Parker>,
114 
115     /// Fast random number generator.
116     rand: FastRand,
117 }
118 
119 /// State shared across all workers
120 pub(super) struct Shared {
121     /// Per-worker remote state. All other workers have access to this and is
122     /// how they communicate between each other.
123     remotes: Box<[Remote]>,
124 
125     /// Submit work to the scheduler while **not** currently on a worker thread.
126     inject: Inject<Arc<Shared>>,
127 
128     /// Coordinates idle workers
129     idle: Idle,
130 
131     /// Collection of all active tasks spawned onto this executor.
132     owned: OwnedTasks<Arc<Shared>>,
133 
134     /// Cores that have observed the shutdown signal
135     ///
136     /// The core is **not** placed back in the worker to avoid it from being
137     /// stolen by a thread that was spawned as part of `block_in_place`.
138     #[allow(clippy::vec_box)] // we're moving an already-boxed value
139     shutdown_cores: Mutex<Vec<Box<Core>>>,
140 }
141 
142 /// Used to communicate with a worker from other threads.
143 struct Remote {
144     /// Steal tasks from this worker.
145     steal: queue::Steal<Arc<Shared>>,
146 
147     /// Unparks the associated worker thread
148     unpark: Unparker,
149 }
150 
151 /// Thread-local context
152 struct Context {
153     /// Worker
154     worker: Arc<Worker>,
155 
156     /// Core data
157     core: RefCell<Option<Box<Core>>>,
158 }
159 
160 /// Starts the workers
161 pub(crate) struct Launch(Vec<Arc<Worker>>);
162 
163 /// Running a task may consume the core. If the core is still available when
164 /// running the task completes, it is returned. Otherwise, the worker will need
165 /// to stop processing.
166 type RunResult = Result<Box<Core>, ()>;
167 
168 /// A task handle
169 type Task = task::Task<Arc<Shared>>;
170 
171 /// A notified task handle
172 type Notified = task::Notified<Arc<Shared>>;
173 
174 // Tracks thread-local state
175 scoped_thread_local!(static CURRENT: Context);
176 
create(size: usize, park: Parker) -> (Arc<Shared>, Launch)177 pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
178     let mut cores = vec![];
179     let mut remotes = vec![];
180 
181     // Create the local queues
182     for _ in 0..size {
183         let (steal, run_queue) = queue::local();
184 
185         let park = park.clone();
186         let unpark = park.unpark();
187 
188         cores.push(Box::new(Core {
189             tick: 0,
190             lifo_slot: None,
191             run_queue,
192             is_searching: false,
193             is_shutdown: false,
194             park: Some(park),
195             rand: FastRand::new(seed()),
196         }));
197 
198         remotes.push(Remote { steal, unpark });
199     }
200 
201     let shared = Arc::new(Shared {
202         remotes: remotes.into_boxed_slice(),
203         inject: Inject::new(),
204         idle: Idle::new(size),
205         owned: OwnedTasks::new(),
206         shutdown_cores: Mutex::new(vec![]),
207     });
208 
209     let mut launch = Launch(vec![]);
210 
211     for (index, core) in cores.drain(..).enumerate() {
212         launch.0.push(Arc::new(Worker {
213             shared: shared.clone(),
214             index,
215             core: AtomicCell::new(Some(core)),
216         }));
217     }
218 
219     (shared, launch)
220 }
221 
block_in_place<F, R>(f: F) -> R where F: FnOnce() -> R,222 pub(crate) fn block_in_place<F, R>(f: F) -> R
223 where
224     F: FnOnce() -> R,
225 {
226     // Try to steal the worker core back
227     struct Reset(coop::Budget);
228 
229     impl Drop for Reset {
230         fn drop(&mut self) {
231             CURRENT.with(|maybe_cx| {
232                 if let Some(cx) = maybe_cx {
233                     let core = cx.worker.core.take();
234                     let mut cx_core = cx.core.borrow_mut();
235                     assert!(cx_core.is_none());
236                     *cx_core = core;
237 
238                     // Reset the task budget as we are re-entering the
239                     // runtime.
240                     coop::set(self.0);
241                 }
242             });
243         }
244     }
245 
246     let mut had_entered = false;
247 
248     CURRENT.with(|maybe_cx| {
249         match (crate::runtime::enter::context(), maybe_cx.is_some()) {
250             (EnterContext::Entered { .. }, true) => {
251                 // We are on a thread pool runtime thread, so we just need to
252                 // set up blocking.
253                 had_entered = true;
254             }
255             (EnterContext::Entered { allow_blocking }, false) => {
256                 // We are on an executor, but _not_ on the thread pool.  That is
257                 // _only_ okay if we are in a thread pool runtime's block_on
258                 // method:
259                 if allow_blocking {
260                     had_entered = true;
261                     return;
262                 } else {
263                     // This probably means we are on the basic_scheduler or in a
264                     // LocalSet, where it is _not_ okay to block.
265                     panic!("can call blocking only when running on the multi-threaded runtime");
266                 }
267             }
268             (EnterContext::NotEntered, true) => {
269                 // This is a nested call to block_in_place (we already exited).
270                 // All the necessary setup has already been done.
271                 return;
272             }
273             (EnterContext::NotEntered, false) => {
274                 // We are outside of the tokio runtime, so blocking is fine.
275                 // We can also skip all of the thread pool blocking setup steps.
276                 return;
277             }
278         }
279 
280         let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
281 
282         // Get the worker core. If none is set, then blocking is fine!
283         let core = match cx.core.borrow_mut().take() {
284             Some(core) => core,
285             None => return,
286         };
287 
288         // The parker should be set here
289         assert!(core.park.is_some());
290 
291         // In order to block, the core must be sent to another thread for
292         // execution.
293         //
294         // First, move the core back into the worker's shared core slot.
295         cx.worker.core.set(core);
296 
297         // Next, clone the worker handle and send it to a new thread for
298         // processing.
299         //
300         // Once the blocking task is done executing, we will attempt to
301         // steal the core back.
302         let worker = cx.worker.clone();
303         runtime::spawn_blocking(move || run(worker));
304     });
305 
306     if had_entered {
307         // Unset the current task's budget. Blocking sections are not
308         // constrained by task budgets.
309         let _reset = Reset(coop::stop());
310 
311         crate::runtime::enter::exit(f)
312     } else {
313         f()
314     }
315 }
316 
317 /// After how many ticks is the global queue polled. This helps to ensure
318 /// fairness.
319 ///
320 /// The number is fairly arbitrary. I believe this value was copied from golang.
321 const GLOBAL_POLL_INTERVAL: u8 = 61;
322 
323 impl Launch {
launch(mut self)324     pub(crate) fn launch(mut self) {
325         for worker in self.0.drain(..) {
326             runtime::spawn_blocking(move || run(worker));
327         }
328     }
329 }
330 
run(worker: Arc<Worker>)331 fn run(worker: Arc<Worker>) {
332     // Acquire a core. If this fails, then another thread is running this
333     // worker and there is nothing further to do.
334     let core = match worker.core.take() {
335         Some(core) => core,
336         None => return,
337     };
338 
339     // Set the worker context.
340     let cx = Context {
341         worker,
342         core: RefCell::new(None),
343     };
344 
345     let _enter = crate::runtime::enter(true);
346 
347     CURRENT.set(&cx, || {
348         // This should always be an error. It only returns a `Result` to support
349         // using `?` to short circuit.
350         assert!(cx.run(core).is_err());
351     });
352 }
353 
354 impl Context {
run(&self, mut core: Box<Core>) -> RunResult355     fn run(&self, mut core: Box<Core>) -> RunResult {
356         while !core.is_shutdown {
357             // Increment the tick
358             core.tick();
359 
360             // Run maintenance, if needed
361             core = self.maintenance(core);
362 
363             // First, check work available to the current worker.
364             if let Some(task) = core.next_task(&self.worker) {
365                 core = self.run_task(task, core)?;
366                 continue;
367             }
368 
369             // There is no more **local** work to process, try to steal work
370             // from other workers.
371             if let Some(task) = core.steal_work(&self.worker) {
372                 core = self.run_task(task, core)?;
373             } else {
374                 // Wait for work
375                 core = self.park(core);
376             }
377         }
378 
379         core.pre_shutdown(&self.worker);
380 
381         // Signal shutdown
382         self.worker.shared.shutdown(core);
383         Err(())
384     }
385 
run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult386     fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
387         // Make sure the worker is not in the **searching** state. This enables
388         // another idle worker to try to steal work.
389         core.transition_from_searching(&self.worker);
390 
391         // Make the core available to the runtime context
392         *self.core.borrow_mut() = Some(core);
393 
394         // Run the task
395         coop::budget(|| {
396             task.run();
397 
398             // As long as there is budget remaining and a task exists in the
399             // `lifo_slot`, then keep running.
400             loop {
401                 // Check if we still have the core. If not, the core was stolen
402                 // by another worker.
403                 let mut core = match self.core.borrow_mut().take() {
404                     Some(core) => core,
405                     None => return Err(()),
406                 };
407 
408                 // Check for a task in the LIFO slot
409                 let task = match core.lifo_slot.take() {
410                     Some(task) => task,
411                     None => return Ok(core),
412                 };
413 
414                 if coop::has_budget_remaining() {
415                     // Run the LIFO task, then loop
416                     *self.core.borrow_mut() = Some(core);
417                     task.run();
418                 } else {
419                     // Not enough budget left to run the LIFO task, push it to
420                     // the back of the queue and return.
421                     core.run_queue.push_back(task, self.worker.inject());
422                     return Ok(core);
423                 }
424             }
425         })
426     }
427 
maintenance(&self, mut core: Box<Core>) -> Box<Core>428     fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
429         if core.tick % GLOBAL_POLL_INTERVAL == 0 {
430             // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
431             // to run without actually putting the thread to sleep.
432             core = self.park_timeout(core, Some(Duration::from_millis(0)));
433 
434             // Run regularly scheduled maintenance
435             core.maintenance(&self.worker);
436         }
437 
438         core
439     }
440 
park(&self, mut core: Box<Core>) -> Box<Core>441     fn park(&self, mut core: Box<Core>) -> Box<Core> {
442         core.transition_to_parked(&self.worker);
443 
444         while !core.is_shutdown {
445             core = self.park_timeout(core, None);
446 
447             // Run regularly scheduled maintenance
448             core.maintenance(&self.worker);
449 
450             if core.transition_from_parked(&self.worker) {
451                 return core;
452             }
453         }
454 
455         core
456     }
457 
park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core>458     fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
459         // Take the parker out of core
460         let mut park = core.park.take().expect("park missing");
461 
462         // Store `core` in context
463         *self.core.borrow_mut() = Some(core);
464 
465         // Park thread
466         if let Some(timeout) = duration {
467             park.park_timeout(timeout).expect("park failed");
468         } else {
469             park.park().expect("park failed");
470         }
471 
472         // Remove `core` from context
473         core = self.core.borrow_mut().take().expect("core missing");
474 
475         // Place `park` back in `core`
476         core.park = Some(park);
477 
478         // If there are tasks available to steal, notify a worker
479         if core.run_queue.is_stealable() {
480             self.worker.shared.notify_parked();
481         }
482 
483         core
484     }
485 }
486 
487 impl Core {
488     /// Increment the tick
tick(&mut self)489     fn tick(&mut self) {
490         self.tick = self.tick.wrapping_add(1);
491     }
492 
493     /// Return the next notified task available to this worker.
next_task(&mut self, worker: &Worker) -> Option<Notified>494     fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
495         if self.tick % GLOBAL_POLL_INTERVAL == 0 {
496             worker.inject().pop().or_else(|| self.next_local_task())
497         } else {
498             self.next_local_task().or_else(|| worker.inject().pop())
499         }
500     }
501 
next_local_task(&mut self) -> Option<Notified>502     fn next_local_task(&mut self) -> Option<Notified> {
503         self.lifo_slot.take().or_else(|| self.run_queue.pop())
504     }
505 
steal_work(&mut self, worker: &Worker) -> Option<Notified>506     fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
507         if !self.transition_to_searching(worker) {
508             return None;
509         }
510 
511         let num = worker.shared.remotes.len();
512         // Start from a random worker
513         let start = self.rand.fastrand_n(num as u32) as usize;
514 
515         for i in 0..num {
516             let i = (start + i) % num;
517 
518             // Don't steal from ourself! We know we don't have work.
519             if i == worker.index {
520                 continue;
521             }
522 
523             let target = &worker.shared.remotes[i];
524             if let Some(task) = target.steal.steal_into(&mut self.run_queue) {
525                 return Some(task);
526             }
527         }
528 
529         // Fallback on checking the global queue
530         worker.shared.inject.pop()
531     }
532 
transition_to_searching(&mut self, worker: &Worker) -> bool533     fn transition_to_searching(&mut self, worker: &Worker) -> bool {
534         if !self.is_searching {
535             self.is_searching = worker.shared.idle.transition_worker_to_searching();
536         }
537 
538         self.is_searching
539     }
540 
transition_from_searching(&mut self, worker: &Worker)541     fn transition_from_searching(&mut self, worker: &Worker) {
542         if !self.is_searching {
543             return;
544         }
545 
546         self.is_searching = false;
547         worker.shared.transition_worker_from_searching();
548     }
549 
550     /// Prepare the worker state for parking
transition_to_parked(&mut self, worker: &Worker)551     fn transition_to_parked(&mut self, worker: &Worker) {
552         // When the final worker transitions **out** of searching to parked, it
553         // must check all the queues one last time in case work materialized
554         // between the last work scan and transitioning out of searching.
555         let is_last_searcher = worker
556             .shared
557             .idle
558             .transition_worker_to_parked(worker.index, self.is_searching);
559 
560         // The worker is no longer searching. Setting this is the local cache
561         // only.
562         self.is_searching = false;
563 
564         if is_last_searcher {
565             worker.shared.notify_if_work_pending();
566         }
567     }
568 
569     /// Returns `true` if the transition happened.
transition_from_parked(&mut self, worker: &Worker) -> bool570     fn transition_from_parked(&mut self, worker: &Worker) -> bool {
571         // If a task is in the lifo slot, then we must unpark regardless of
572         // being notified
573         if self.lifo_slot.is_some() {
574             worker.shared.idle.unpark_worker_by_id(worker.index);
575             self.is_searching = true;
576             return true;
577         }
578 
579         if worker.shared.idle.is_parked(worker.index) {
580             return false;
581         }
582 
583         // When unparked, the worker is in the searching state.
584         self.is_searching = true;
585         true
586     }
587 
588     /// Runs maintenance work such as checking the pool's state.
maintenance(&mut self, worker: &Worker)589     fn maintenance(&mut self, worker: &Worker) {
590         if !self.is_shutdown {
591             // Check if the scheduler has been shutdown
592             self.is_shutdown = worker.inject().is_closed();
593         }
594     }
595 
596     /// Signals all tasks to shut down, and waits for them to complete. Must run
597     /// before we enter the single-threaded phase of shutdown processing.
pre_shutdown(&mut self, worker: &Worker)598     fn pre_shutdown(&mut self, worker: &Worker) {
599         // The OwnedTasks was closed in Shared::close.
600         debug_assert!(worker.shared.owned.is_closed());
601 
602         // Signal to all tasks to shut down.
603         while let Some(header) = worker.shared.owned.pop_back() {
604             header.shutdown();
605         }
606     }
607 
608     /// Shutdown the core
shutdown(&mut self)609     fn shutdown(&mut self) {
610         // Take the core
611         let mut park = self.park.take().expect("park missing");
612 
613         // Drain the queue
614         while self.next_local_task().is_some() {}
615 
616         park.shutdown();
617     }
618 }
619 
620 impl Worker {
621     /// Returns a reference to the scheduler's injection queue
inject(&self) -> &Inject<Arc<Shared>>622     fn inject(&self) -> &Inject<Arc<Shared>> {
623         &self.shared.inject
624     }
625 }
626 
627 impl task::Schedule for Arc<Shared> {
release(&self, task: &Task) -> Option<Task>628     fn release(&self, task: &Task) -> Option<Task> {
629         // SAFETY: Inserted into owned in bind.
630         unsafe { self.owned.remove(task) }
631     }
632 
schedule(&self, task: Notified)633     fn schedule(&self, task: Notified) {
634         (**self).schedule(task, false);
635     }
636 
yield_now(&self, task: Notified)637     fn yield_now(&self, task: Notified) {
638         (**self).schedule(task, true);
639     }
640 }
641 
642 impl Shared {
bind_new_task<T>(me: &Arc<Self>, future: T) -> JoinHandle<T::Output> where T: Future + Send + 'static, T::Output: Send + 'static,643     pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T) -> JoinHandle<T::Output>
644     where
645         T: Future + Send + 'static,
646         T::Output: Send + 'static,
647     {
648         let (handle, notified) = me.owned.bind(future, me.clone());
649 
650         if let Some(notified) = notified {
651             me.schedule(notified, false);
652         }
653 
654         handle
655     }
656 
schedule(&self, task: Notified, is_yield: bool)657     pub(super) fn schedule(&self, task: Notified, is_yield: bool) {
658         CURRENT.with(|maybe_cx| {
659             if let Some(cx) = maybe_cx {
660                 // Make sure the task is part of the **current** scheduler.
661                 if self.ptr_eq(&cx.worker.shared) {
662                     // And the current thread still holds a core
663                     if let Some(core) = cx.core.borrow_mut().as_mut() {
664                         self.schedule_local(core, task, is_yield);
665                         return;
666                     }
667                 }
668             }
669 
670             // Otherwise, use the inject queue.
671             self.inject.push(task);
672             self.notify_parked();
673         })
674     }
675 
schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool)676     fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
677         // Spawning from the worker thread. If scheduling a "yield" then the
678         // task must always be pushed to the back of the queue, enabling other
679         // tasks to be executed. If **not** a yield, then there is more
680         // flexibility and the task may go to the front of the queue.
681         let should_notify = if is_yield {
682             core.run_queue.push_back(task, &self.inject);
683             true
684         } else {
685             // Push to the LIFO slot
686             let prev = core.lifo_slot.take();
687             let ret = prev.is_some();
688 
689             if let Some(prev) = prev {
690                 core.run_queue.push_back(prev, &self.inject);
691             }
692 
693             core.lifo_slot = Some(task);
694 
695             ret
696         };
697 
698         // Only notify if not currently parked. If `park` is `None`, then the
699         // scheduling is from a resource driver. As notifications often come in
700         // batches, the notification is delayed until the park is complete.
701         if should_notify && core.park.is_some() {
702             self.notify_parked();
703         }
704     }
705 
close(&self)706     pub(super) fn close(&self) {
707         if self.inject.close() {
708             self.owned.close();
709             self.notify_all();
710         }
711     }
712 
notify_parked(&self)713     fn notify_parked(&self) {
714         if let Some(index) = self.idle.worker_to_notify() {
715             self.remotes[index].unpark.unpark();
716         }
717     }
718 
notify_all(&self)719     fn notify_all(&self) {
720         for remote in &self.remotes[..] {
721             remote.unpark.unpark();
722         }
723     }
724 
notify_if_work_pending(&self)725     fn notify_if_work_pending(&self) {
726         for remote in &self.remotes[..] {
727             if !remote.steal.is_empty() {
728                 self.notify_parked();
729                 return;
730             }
731         }
732 
733         if !self.inject.is_empty() {
734             self.notify_parked();
735         }
736     }
737 
transition_worker_from_searching(&self)738     fn transition_worker_from_searching(&self) {
739         if self.idle.transition_worker_from_searching() {
740             // We are the final searching worker. Because work was found, we
741             // need to notify another worker.
742             self.notify_parked();
743         }
744     }
745 
746     /// Signals that a worker has observed the shutdown signal and has replaced
747     /// its core back into its handle.
748     ///
749     /// If all workers have reached this point, the final cleanup is performed.
shutdown(&self, core: Box<Core>)750     fn shutdown(&self, core: Box<Core>) {
751         let mut cores = self.shutdown_cores.lock();
752         cores.push(core);
753 
754         if cores.len() != self.remotes.len() {
755             return;
756         }
757 
758         debug_assert!(self.owned.is_empty());
759 
760         for mut core in cores.drain(..) {
761             core.shutdown();
762         }
763 
764         // Drain the injection queue
765         while let Some(task) = self.inject.pop() {
766             task.shutdown();
767         }
768     }
769 
ptr_eq(&self, other: &Shared) -> bool770     fn ptr_eq(&self, other: &Shared) -> bool {
771         std::ptr::eq(self, other)
772     }
773 }
774