1 //! A scheduler is initialized with a fixed number of workers. Each worker is
2 //! driven by a thread. Each worker has a "core" which contains data such as the
3 //! run queue and other state. When `block_in_place` is called, the worker's
4 //! "core" is handed off to a new thread allowing the scheduler to continue to
5 //! make progress while the originating thread blocks.
6 
7 use crate::coop;
8 use crate::loom::rand::seed;
9 use crate::loom::sync::{Arc, Mutex};
10 use crate::park::{Park, Unpark};
11 use crate::runtime;
12 use crate::runtime::park::{Parker, Unparker};
13 use crate::runtime::thread_pool::{AtomicCell, Idle};
14 use crate::runtime::{queue, task};
15 use crate::util::linked_list::LinkedList;
16 use crate::util::FastRand;
17 
18 use std::cell::RefCell;
19 use std::time::Duration;
20 
21 /// A scheduler worker
22 pub(super) struct Worker {
23     /// Reference to shared state
24     shared: Arc<Shared>,
25 
26     /// Index holding this worker's remote state
27     index: usize,
28 
29     /// Used to hand-off a worker's core to another thread.
30     core: AtomicCell<Core>,
31 }
32 
33 /// Core data
34 struct Core {
35     /// Used to schedule bookkeeping tasks every so often.
36     tick: u8,
37 
38     /// When a task is scheduled from a worker, it is stored in this slot. The
39     /// worker will check this slot for a task **before** checking the run
40     /// queue. This effectively results in the **last** scheduled task to be run
41     /// next (LIFO). This is an optimization for message passing patterns and
42     /// helps to reduce latency.
43     lifo_slot: Option<Notified>,
44 
45     /// The worker-local run queue.
46     run_queue: queue::Local<Arc<Worker>>,
47 
48     /// True if the worker is currently searching for more work. Searching
49     /// involves attempting to steal from other workers.
50     is_searching: bool,
51 
52     /// True if the scheduler is being shutdown
53     is_shutdown: bool,
54 
55     /// Tasks owned by the core
56     tasks: LinkedList<Task>,
57 
58     /// Parker
59     ///
60     /// Stored in an `Option` as the parker is added / removed to make the
61     /// borrow checker happy.
62     park: Option<Parker>,
63 
64     /// Fast random number generator.
65     rand: FastRand,
66 }
67 
68 /// State shared across all workers
69 pub(super) struct Shared {
70     /// Per-worker remote state. All other workers have access to this and is
71     /// how they communicate between each other.
72     remotes: Box<[Remote]>,
73 
74     /// Submit work to the scheduler while **not** currently on a worker thread.
75     inject: queue::Inject<Arc<Worker>>,
76 
77     /// Coordinates idle workers
78     idle: Idle,
79 
80     /// Workers have have observed the shutdown signal
81     ///
82     /// The core is **not** placed back in the worker to avoid it from being
83     /// stolen by a thread that was spawned as part of `block_in_place`.
84     shutdown_workers: Mutex<Vec<(Box<Core>, Arc<Worker>)>>,
85 }
86 
87 /// Used to communicate with a worker from other threads.
88 struct Remote {
89     /// Steal tasks from this worker.
90     steal: queue::Steal<Arc<Worker>>,
91 
92     /// Transfers tasks to be released. Any worker pushes tasks, only the owning
93     /// worker pops.
94     pending_drop: task::TransferStack<Arc<Worker>>,
95 
96     /// Unparks the associated worker thread
97     unpark: Unparker,
98 }
99 
100 /// Thread-local context
101 struct Context {
102     /// Worker
103     worker: Arc<Worker>,
104 
105     /// Core data
106     core: RefCell<Option<Box<Core>>>,
107 }
108 
109 /// Starts the workers
110 pub(crate) struct Launch(Vec<Arc<Worker>>);
111 
112 /// Running a task may consume the core. If the core is still available when
113 /// running the task completes, it is returned. Otherwise, the worker will need
114 /// to stop processing.
115 type RunResult = Result<Box<Core>, ()>;
116 
117 /// A task handle
118 type Task = task::Task<Arc<Worker>>;
119 
120 /// A notified task handle
121 type Notified = task::Notified<Arc<Worker>>;
122 
123 // Tracks thread-local state
124 scoped_thread_local!(static CURRENT: Context);
125 
create(size: usize, park: Parker) -> (Arc<Shared>, Launch)126 pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
127     let mut cores = vec![];
128     let mut remotes = vec![];
129 
130     // Create the local queues
131     for _ in 0..size {
132         let (steal, run_queue) = queue::local();
133 
134         let park = park.clone();
135         let unpark = park.unpark();
136 
137         cores.push(Box::new(Core {
138             tick: 0,
139             lifo_slot: None,
140             run_queue,
141             is_searching: false,
142             is_shutdown: false,
143             tasks: LinkedList::new(),
144             park: Some(park),
145             rand: FastRand::new(seed()),
146         }));
147 
148         remotes.push(Remote {
149             steal,
150             pending_drop: task::TransferStack::new(),
151             unpark,
152         });
153     }
154 
155     let shared = Arc::new(Shared {
156         remotes: remotes.into_boxed_slice(),
157         inject: queue::Inject::new(),
158         idle: Idle::new(size),
159         shutdown_workers: Mutex::new(vec![]),
160     });
161 
162     let mut launch = Launch(vec![]);
163 
164     for (index, core) in cores.drain(..).enumerate() {
165         launch.0.push(Arc::new(Worker {
166             shared: shared.clone(),
167             index,
168             core: AtomicCell::new(Some(core)),
169         }));
170     }
171 
172     (shared, launch)
173 }
174 
175 cfg_blocking! {
176     use crate::runtime::enter::EnterContext;
177 
178     pub(crate) fn block_in_place<F, R>(f: F) -> R
179     where
180         F: FnOnce() -> R,
181     {
182         // Try to steal the worker core back
183         struct Reset(coop::Budget);
184 
185         impl Drop for Reset {
186             fn drop(&mut self) {
187                 CURRENT.with(|maybe_cx| {
188                     if let Some(cx) = maybe_cx {
189                         let core = cx.worker.core.take();
190                         let mut cx_core = cx.core.borrow_mut();
191                         assert!(cx_core.is_none());
192                         *cx_core = core;
193 
194                         // Reset the task budget as we are re-entering the
195                         // runtime.
196                         coop::set(self.0);
197                     }
198                 });
199             }
200         }
201 
202         let mut had_entered = false;
203 
204         CURRENT.with(|maybe_cx| {
205             match (crate::runtime::enter::context(),  maybe_cx.is_some()) {
206                 (EnterContext::Entered { .. }, true) => {
207                     // We are on a thread pool runtime thread, so we just need to set up blocking.
208                     had_entered = true;
209                 }
210                 (EnterContext::Entered { allow_blocking }, false) => {
211                     // We are on an executor, but _not_ on the thread pool.
212                     // That is _only_ okay if we are in a thread pool runtime's block_on method:
213                     if allow_blocking {
214                         had_entered = true;
215                         return;
216                     } else {
217                         // This probably means we are on the basic_scheduler or in a LocalSet,
218                         // where it is _not_ okay to block.
219                         panic!("can call blocking only when running on the multi-threaded runtime");
220                     }
221                 }
222                 (EnterContext::NotEntered, true) => {
223                     // This is a nested call to block_in_place (we already exited).
224                     // All the necessary setup has already been done.
225                     return;
226                 }
227                 (EnterContext::NotEntered, false) => {
228                     // We are outside of the tokio runtime, so blocking is fine.
229                     // We can also skip all of the thread pool blocking setup steps.
230                     return;
231                 }
232             }
233 
234             let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
235 
236             // Get the worker core. If none is set, then blocking is fine!
237             let core = match cx.core.borrow_mut().take() {
238                 Some(core) => core,
239                 None => return,
240             };
241 
242             // The parker should be set here
243             assert!(core.park.is_some());
244 
245             // In order to block, the core must be sent to another thread for
246             // execution.
247             //
248             // First, move the core back into the worker's shared core slot.
249             cx.worker.core.set(core);
250 
251             // Next, clone the worker handle and send it to a new thread for
252             // processing.
253             //
254             // Once the blocking task is done executing, we will attempt to
255             // steal the core back.
256             let worker = cx.worker.clone();
257             runtime::spawn_blocking(move || run(worker));
258         });
259 
260         if had_entered {
261             // Unset the current task's budget. Blocking sections are not
262             // constrained by task budgets.
263             let _reset = Reset(coop::stop());
264 
265             crate::runtime::enter::exit(f)
266         } else {
267             f()
268         }
269     }
270 }
271 
272 /// After how many ticks is the global queue polled. This helps to ensure
273 /// fairness.
274 ///
275 /// The number is fairly arbitrary. I believe this value was copied from golang.
276 const GLOBAL_POLL_INTERVAL: u8 = 61;
277 
278 impl Launch {
launch(mut self)279     pub(crate) fn launch(mut self) {
280         for worker in self.0.drain(..) {
281             runtime::spawn_blocking(move || run(worker));
282         }
283     }
284 }
285 
run(worker: Arc<Worker>)286 fn run(worker: Arc<Worker>) {
287     // Acquire a core. If this fails, then another thread is running this
288     // worker and there is nothing further to do.
289     let core = match worker.core.take() {
290         Some(core) => core,
291         None => return,
292     };
293 
294     // Set the worker context.
295     let cx = Context {
296         worker,
297         core: RefCell::new(None),
298     };
299 
300     let _enter = crate::runtime::enter(true);
301 
302     CURRENT.set(&cx, || {
303         // This should always be an error. It only returns a `Result` to support
304         // using `?` to short circuit.
305         assert!(cx.run(core).is_err());
306     });
307 }
308 
309 impl Context {
run(&self, mut core: Box<Core>) -> RunResult310     fn run(&self, mut core: Box<Core>) -> RunResult {
311         while !core.is_shutdown {
312             // Increment the tick
313             core.tick();
314 
315             // Run maintenance, if needed
316             core = self.maintenance(core);
317 
318             // First, check work available to the current worker.
319             if let Some(task) = core.next_task(&self.worker) {
320                 core = self.run_task(task, core)?;
321                 continue;
322             }
323 
324             // There is no more **local** work to process, try to steal work
325             // from other workers.
326             if let Some(task) = core.steal_work(&self.worker) {
327                 core = self.run_task(task, core)?;
328             } else {
329                 // Wait for work
330                 core = self.park(core);
331             }
332         }
333 
334         // Signal shutdown
335         self.worker.shared.shutdown(core, self.worker.clone());
336         Err(())
337     }
338 
run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult339     fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
340         // Make sure thew orker is not in the **searching** state. This enables
341         // another idle worker to try to steal work.
342         core.transition_from_searching(&self.worker);
343 
344         // Make the core available to the runtime context
345         *self.core.borrow_mut() = Some(core);
346 
347         // Run the task
348         coop::budget(|| {
349             task.run();
350 
351             // As long as there is budget remaining and a task exists in the
352             // `lifo_slot`, then keep running.
353             loop {
354                 // Check if we still have the core. If not, the core was stolen
355                 // by another worker.
356                 let mut core = match self.core.borrow_mut().take() {
357                     Some(core) => core,
358                     None => return Err(()),
359                 };
360 
361                 // Check for a task in the LIFO slot
362                 let task = match core.lifo_slot.take() {
363                     Some(task) => task,
364                     None => return Ok(core),
365                 };
366 
367                 if coop::has_budget_remaining() {
368                     // Run the LIFO task, then loop
369                     *self.core.borrow_mut() = Some(core);
370                     task.run();
371                 } else {
372                     // Not enough budget left to run the LIFO task, push it to
373                     // the back of the queue and return.
374                     core.run_queue.push_back(task, self.worker.inject());
375                     return Ok(core);
376                 }
377             }
378         })
379     }
380 
maintenance(&self, mut core: Box<Core>) -> Box<Core>381     fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
382         if core.tick % GLOBAL_POLL_INTERVAL == 0 {
383             // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
384             // to run without actually putting the thread to sleep.
385             core = self.park_timeout(core, Some(Duration::from_millis(0)));
386 
387             // Run regularly scheduled maintenance
388             core.maintenance(&self.worker);
389         }
390 
391         core
392     }
393 
park(&self, mut core: Box<Core>) -> Box<Core>394     fn park(&self, mut core: Box<Core>) -> Box<Core> {
395         core.transition_to_parked(&self.worker);
396 
397         while !core.is_shutdown {
398             core = self.park_timeout(core, None);
399 
400             // Run regularly scheduled maintenance
401             core.maintenance(&self.worker);
402 
403             if core.transition_from_parked(&self.worker) {
404                 return core;
405             }
406         }
407 
408         core
409     }
410 
park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core>411     fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
412         // Take the parker out of core
413         let mut park = core.park.take().expect("park missing");
414 
415         // Store `core` in context
416         *self.core.borrow_mut() = Some(core);
417 
418         // Park thread
419         if let Some(timeout) = duration {
420             park.park_timeout(timeout).expect("park failed");
421         } else {
422             park.park().expect("park failed");
423         }
424 
425         // Remove `core` from context
426         core = self.core.borrow_mut().take().expect("core missing");
427 
428         // Place `park` back in `core`
429         core.park = Some(park);
430 
431         // If there are tasks available to steal, notify a worker
432         if core.run_queue.is_stealable() {
433             self.worker.shared.notify_parked();
434         }
435 
436         core
437     }
438 }
439 
440 impl Core {
441     /// Increment the tick
tick(&mut self)442     fn tick(&mut self) {
443         self.tick = self.tick.wrapping_add(1);
444     }
445 
446     /// Return the next notified task available to this worker.
next_task(&mut self, worker: &Worker) -> Option<Notified>447     fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
448         if self.tick % GLOBAL_POLL_INTERVAL == 0 {
449             worker.inject().pop().or_else(|| self.next_local_task())
450         } else {
451             self.next_local_task().or_else(|| worker.inject().pop())
452         }
453     }
454 
next_local_task(&mut self) -> Option<Notified>455     fn next_local_task(&mut self) -> Option<Notified> {
456         self.lifo_slot.take().or_else(|| self.run_queue.pop())
457     }
458 
steal_work(&mut self, worker: &Worker) -> Option<Notified>459     fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
460         if !self.transition_to_searching(worker) {
461             return None;
462         }
463 
464         let num = worker.shared.remotes.len();
465         // Start from a random worker
466         let start = self.rand.fastrand_n(num as u32) as usize;
467 
468         for i in 0..num {
469             let i = (start + i) % num;
470 
471             // Don't steal from ourself! We know we don't have work.
472             if i == worker.index {
473                 continue;
474             }
475 
476             let target = &worker.shared.remotes[i];
477             if let Some(task) = target.steal.steal_into(&mut self.run_queue) {
478                 return Some(task);
479             }
480         }
481 
482         // Fallback on checking the global queue
483         worker.shared.inject.pop()
484     }
485 
transition_to_searching(&mut self, worker: &Worker) -> bool486     fn transition_to_searching(&mut self, worker: &Worker) -> bool {
487         if !self.is_searching {
488             self.is_searching = worker.shared.idle.transition_worker_to_searching();
489         }
490 
491         self.is_searching
492     }
493 
transition_from_searching(&mut self, worker: &Worker)494     fn transition_from_searching(&mut self, worker: &Worker) {
495         if !self.is_searching {
496             return;
497         }
498 
499         self.is_searching = false;
500         worker.shared.transition_worker_from_searching();
501     }
502 
503     /// Prepare the worker state for parking
transition_to_parked(&mut self, worker: &Worker)504     fn transition_to_parked(&mut self, worker: &Worker) {
505         // When the final worker transitions **out** of searching to parked, it
506         // must check all the queues one last time in case work materialized
507         // between the last work scan and transitioning out of searching.
508         let is_last_searcher = worker
509             .shared
510             .idle
511             .transition_worker_to_parked(worker.index, self.is_searching);
512 
513         // The worker is no longer searching. Setting this is the local cache
514         // only.
515         self.is_searching = false;
516 
517         if is_last_searcher {
518             worker.shared.notify_if_work_pending();
519         }
520     }
521 
522     /// Returns `true` if the transition happened.
transition_from_parked(&mut self, worker: &Worker) -> bool523     fn transition_from_parked(&mut self, worker: &Worker) -> bool {
524         // If a task is in the lifo slot, then we must unpark regardless of
525         // being notified
526         if self.lifo_slot.is_some() {
527             worker.shared.idle.unpark_worker_by_id(worker.index);
528             self.is_searching = true;
529             return true;
530         }
531 
532         if worker.shared.idle.is_parked(worker.index) {
533             return false;
534         }
535 
536         // When unparked, the worker is in the searching state.
537         self.is_searching = true;
538         true
539     }
540 
541     /// Runs maintenance work such as free pending tasks and check the pool's
542     /// state.
maintenance(&mut self, worker: &Worker)543     fn maintenance(&mut self, worker: &Worker) {
544         self.drain_pending_drop(worker);
545 
546         if !self.is_shutdown {
547             // Check if the scheduler has been shutdown
548             self.is_shutdown = worker.inject().is_closed();
549         }
550     }
551 
552     // Shutdown the core
shutdown(&mut self, worker: &Worker)553     fn shutdown(&mut self, worker: &Worker) {
554         // Take the core
555         let mut park = self.park.take().expect("park missing");
556 
557         // Signal to all tasks to shut down.
558         for header in self.tasks.iter() {
559             header.shutdown();
560         }
561 
562         loop {
563             self.drain_pending_drop(worker);
564 
565             if self.tasks.is_empty() {
566                 break;
567             }
568 
569             // Wait until signalled
570             park.park().expect("park failed");
571         }
572 
573         // Drain the queue
574         while self.next_local_task().is_some() {}
575 
576         park.shutdown();
577     }
578 
drain_pending_drop(&mut self, worker: &Worker)579     fn drain_pending_drop(&mut self, worker: &Worker) {
580         use std::mem::ManuallyDrop;
581 
582         for task in worker.remote().pending_drop.drain() {
583             let task = ManuallyDrop::new(task);
584 
585             // safety: tasks are only pushed into the `pending_drop` stacks that
586             // are associated with the list they are inserted into. When a task
587             // is pushed into `pending_drop`, the ref-inc is skipped, so we must
588             // not ref-dec here.
589             //
590             // See `bind` and `release` implementations.
591             unsafe {
592                 self.tasks.remove(task.header().into());
593             }
594         }
595     }
596 }
597 
598 impl Worker {
599     /// Returns a reference to the scheduler's injection queue
inject(&self) -> &queue::Inject<Arc<Worker>>600     fn inject(&self) -> &queue::Inject<Arc<Worker>> {
601         &self.shared.inject
602     }
603 
604     /// Return a reference to this worker's remote data
remote(&self) -> &Remote605     fn remote(&self) -> &Remote {
606         &self.shared.remotes[self.index]
607     }
608 
eq(&self, other: &Worker) -> bool609     fn eq(&self, other: &Worker) -> bool {
610         self.shared.ptr_eq(&other.shared) && self.index == other.index
611     }
612 }
613 
614 impl task::Schedule for Arc<Worker> {
bind(task: Task) -> Arc<Worker>615     fn bind(task: Task) -> Arc<Worker> {
616         CURRENT.with(|maybe_cx| {
617             let cx = maybe_cx.expect("scheduler context missing");
618 
619             // Track the task
620             cx.core
621                 .borrow_mut()
622                 .as_mut()
623                 .expect("scheduler core missing")
624                 .tasks
625                 .push_front(task);
626 
627             // Return a clone of the worker
628             cx.worker.clone()
629         })
630     }
631 
release(&self, task: &Task) -> Option<Task>632     fn release(&self, task: &Task) -> Option<Task> {
633         use std::ptr::NonNull;
634 
635         CURRENT.with(|maybe_cx| {
636             let cx = maybe_cx.expect("scheduler context missing");
637 
638             if self.eq(&cx.worker) {
639                 let mut maybe_core = cx.core.borrow_mut();
640 
641                 if let Some(core) = &mut *maybe_core {
642                     // Directly remove the task
643                     //
644                     // safety: the task is inserted in the list in `bind`.
645                     unsafe {
646                         let ptr = NonNull::from(task.header());
647                         return core.tasks.remove(ptr);
648                     }
649                 }
650             }
651 
652             // Track the task to be released by the worker that owns it
653             //
654             // Safety: We get a new handle without incrementing the ref-count.
655             // A ref-count is held by the "owned" linked list and it is only
656             // ever removed from that list as part of the release process: this
657             // method or popping the task from `pending_drop`. Thus, we can rely
658             // on the ref-count held by the linked-list to keep the memory
659             // alive.
660             //
661             // When the task is removed from the stack, it is forgotten instead
662             // of dropped.
663             let task = unsafe { Task::from_raw(task.header().into()) };
664 
665             self.remote().pending_drop.push(task);
666 
667             if cx.core.borrow().is_some() {
668                 return None;
669             }
670 
671             // The worker core has been handed off to another thread. In the
672             // event that the scheduler is currently shutting down, the thread
673             // that owns the task may be waiting on the release to complete
674             // shutdown.
675             if self.inject().is_closed() {
676                 self.remote().unpark.unpark();
677             }
678 
679             None
680         })
681     }
682 
schedule(&self, task: Notified)683     fn schedule(&self, task: Notified) {
684         self.shared.schedule(task, false);
685     }
686 
yield_now(&self, task: Notified)687     fn yield_now(&self, task: Notified) {
688         self.shared.schedule(task, true);
689     }
690 }
691 
692 impl Shared {
schedule(&self, task: Notified, is_yield: bool)693     pub(super) fn schedule(&self, task: Notified, is_yield: bool) {
694         CURRENT.with(|maybe_cx| {
695             if let Some(cx) = maybe_cx {
696                 // Make sure the task is part of the **current** scheduler.
697                 if self.ptr_eq(&cx.worker.shared) {
698                     // And the current thread still holds a core
699                     if let Some(core) = cx.core.borrow_mut().as_mut() {
700                         self.schedule_local(core, task, is_yield);
701                         return;
702                     }
703                 }
704             }
705 
706             // Otherwise, use the inject queue
707             self.inject.push(task);
708             self.notify_parked();
709         });
710     }
711 
schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool)712     fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
713         // Spawning from the worker thread. If scheduling a "yield" then the
714         // task must always be pushed to the back of the queue, enabling other
715         // tasks to be executed. If **not** a yield, then there is more
716         // flexibility and the task may go to the front of the queue.
717         let should_notify = if is_yield {
718             core.run_queue.push_back(task, &self.inject);
719             true
720         } else {
721             // Push to the LIFO slot
722             let prev = core.lifo_slot.take();
723             let ret = prev.is_some();
724 
725             if let Some(prev) = prev {
726                 core.run_queue.push_back(prev, &self.inject);
727             }
728 
729             core.lifo_slot = Some(task);
730 
731             ret
732         };
733 
734         // Only notify if not currently parked. If `park` is `None`, then the
735         // scheduling is from a resource driver. As notifications often come in
736         // batches, the notification is delayed until the park is complete.
737         if should_notify && core.park.is_some() {
738             self.notify_parked();
739         }
740     }
741 
close(&self)742     pub(super) fn close(&self) {
743         if self.inject.close() {
744             self.notify_all();
745         }
746     }
747 
notify_parked(&self)748     fn notify_parked(&self) {
749         if let Some(index) = self.idle.worker_to_notify() {
750             self.remotes[index].unpark.unpark();
751         }
752     }
753 
notify_all(&self)754     fn notify_all(&self) {
755         for remote in &self.remotes[..] {
756             remote.unpark.unpark();
757         }
758     }
759 
notify_if_work_pending(&self)760     fn notify_if_work_pending(&self) {
761         for remote in &self.remotes[..] {
762             if !remote.steal.is_empty() {
763                 self.notify_parked();
764                 return;
765             }
766         }
767 
768         if !self.inject.is_empty() {
769             self.notify_parked();
770         }
771     }
772 
transition_worker_from_searching(&self)773     fn transition_worker_from_searching(&self) {
774         if self.idle.transition_worker_from_searching() {
775             // We are the final searching worker. Because work was found, we
776             // need to notify another worker.
777             self.notify_parked();
778         }
779     }
780 
781     /// Signals that a worker has observed the shutdown signal and has replaced
782     /// its core back into its handle.
783     ///
784     /// If all workers have reached this point, the final cleanup is performed.
shutdown(&self, core: Box<Core>, worker: Arc<Worker>)785     fn shutdown(&self, core: Box<Core>, worker: Arc<Worker>) {
786         let mut workers = self.shutdown_workers.lock().unwrap();
787         workers.push((core, worker));
788 
789         if workers.len() != self.remotes.len() {
790             return;
791         }
792 
793         for (mut core, worker) in workers.drain(..) {
794             core.shutdown(&worker);
795         }
796 
797         // Drain the injection queue
798         while self.inject.pop().is_some() {}
799     }
800 
ptr_eq(&self, other: &Shared) -> bool801     fn ptr_eq(&self, other: &Shared) -> bool {
802         self as *const _ == other as *const _
803     }
804 }
805