1 //! A scheduler is initialized with a fixed number of workers. Each worker is
2 //! driven by a thread. Each worker has a "core" which contains data such as the
3 //! run queue and other state. When `block_in_place` is called, the worker's
4 //! "core" is handed off to a new thread allowing the scheduler to continue to
5 //! make progress while the originating thread blocks.
6
7 use crate::coop;
8 use crate::loom::rand::seed;
9 use crate::loom::sync::{Arc, Mutex};
10 use crate::park::{Park, Unpark};
11 use crate::runtime;
12 use crate::runtime::park::{Parker, Unparker};
13 use crate::runtime::thread_pool::{AtomicCell, Idle};
14 use crate::runtime::{queue, task};
15 use crate::util::linked_list::LinkedList;
16 use crate::util::FastRand;
17
18 use std::cell::RefCell;
19 use std::time::Duration;
20
21 /// A scheduler worker
22 pub(super) struct Worker {
23 /// Reference to shared state
24 shared: Arc<Shared>,
25
26 /// Index holding this worker's remote state
27 index: usize,
28
29 /// Used to hand-off a worker's core to another thread.
30 core: AtomicCell<Core>,
31 }
32
33 /// Core data
34 struct Core {
35 /// Used to schedule bookkeeping tasks every so often.
36 tick: u8,
37
38 /// When a task is scheduled from a worker, it is stored in this slot. The
39 /// worker will check this slot for a task **before** checking the run
40 /// queue. This effectively results in the **last** scheduled task to be run
41 /// next (LIFO). This is an optimization for message passing patterns and
42 /// helps to reduce latency.
43 lifo_slot: Option<Notified>,
44
45 /// The worker-local run queue.
46 run_queue: queue::Local<Arc<Worker>>,
47
48 /// True if the worker is currently searching for more work. Searching
49 /// involves attempting to steal from other workers.
50 is_searching: bool,
51
52 /// True if the scheduler is being shutdown
53 is_shutdown: bool,
54
55 /// Tasks owned by the core
56 tasks: LinkedList<Task>,
57
58 /// Parker
59 ///
60 /// Stored in an `Option` as the parker is added / removed to make the
61 /// borrow checker happy.
62 park: Option<Parker>,
63
64 /// Fast random number generator.
65 rand: FastRand,
66 }
67
68 /// State shared across all workers
69 pub(super) struct Shared {
70 /// Per-worker remote state. All other workers have access to this and is
71 /// how they communicate between each other.
72 remotes: Box<[Remote]>,
73
74 /// Submit work to the scheduler while **not** currently on a worker thread.
75 inject: queue::Inject<Arc<Worker>>,
76
77 /// Coordinates idle workers
78 idle: Idle,
79
80 /// Workers have have observed the shutdown signal
81 ///
82 /// The core is **not** placed back in the worker to avoid it from being
83 /// stolen by a thread that was spawned as part of `block_in_place`.
84 shutdown_workers: Mutex<Vec<(Box<Core>, Arc<Worker>)>>,
85 }
86
87 /// Used to communicate with a worker from other threads.
88 struct Remote {
89 /// Steal tasks from this worker.
90 steal: queue::Steal<Arc<Worker>>,
91
92 /// Transfers tasks to be released. Any worker pushes tasks, only the owning
93 /// worker pops.
94 pending_drop: task::TransferStack<Arc<Worker>>,
95
96 /// Unparks the associated worker thread
97 unpark: Unparker,
98 }
99
100 /// Thread-local context
101 struct Context {
102 /// Worker
103 worker: Arc<Worker>,
104
105 /// Core data
106 core: RefCell<Option<Box<Core>>>,
107 }
108
109 /// Starts the workers
110 pub(crate) struct Launch(Vec<Arc<Worker>>);
111
112 /// Running a task may consume the core. If the core is still available when
113 /// running the task completes, it is returned. Otherwise, the worker will need
114 /// to stop processing.
115 type RunResult = Result<Box<Core>, ()>;
116
117 /// A task handle
118 type Task = task::Task<Arc<Worker>>;
119
120 /// A notified task handle
121 type Notified = task::Notified<Arc<Worker>>;
122
123 // Tracks thread-local state
124 scoped_thread_local!(static CURRENT: Context);
125
create(size: usize, park: Parker) -> (Arc<Shared>, Launch)126 pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
127 let mut cores = vec![];
128 let mut remotes = vec![];
129
130 // Create the local queues
131 for _ in 0..size {
132 let (steal, run_queue) = queue::local();
133
134 let park = park.clone();
135 let unpark = park.unpark();
136
137 cores.push(Box::new(Core {
138 tick: 0,
139 lifo_slot: None,
140 run_queue,
141 is_searching: false,
142 is_shutdown: false,
143 tasks: LinkedList::new(),
144 park: Some(park),
145 rand: FastRand::new(seed()),
146 }));
147
148 remotes.push(Remote {
149 steal,
150 pending_drop: task::TransferStack::new(),
151 unpark,
152 });
153 }
154
155 let shared = Arc::new(Shared {
156 remotes: remotes.into_boxed_slice(),
157 inject: queue::Inject::new(),
158 idle: Idle::new(size),
159 shutdown_workers: Mutex::new(vec![]),
160 });
161
162 let mut launch = Launch(vec![]);
163
164 for (index, core) in cores.drain(..).enumerate() {
165 launch.0.push(Arc::new(Worker {
166 shared: shared.clone(),
167 index,
168 core: AtomicCell::new(Some(core)),
169 }));
170 }
171
172 (shared, launch)
173 }
174
175 cfg_blocking! {
176 use crate::runtime::enter::EnterContext;
177
178 pub(crate) fn block_in_place<F, R>(f: F) -> R
179 where
180 F: FnOnce() -> R,
181 {
182 // Try to steal the worker core back
183 struct Reset(coop::Budget);
184
185 impl Drop for Reset {
186 fn drop(&mut self) {
187 CURRENT.with(|maybe_cx| {
188 if let Some(cx) = maybe_cx {
189 let core = cx.worker.core.take();
190 let mut cx_core = cx.core.borrow_mut();
191 assert!(cx_core.is_none());
192 *cx_core = core;
193
194 // Reset the task budget as we are re-entering the
195 // runtime.
196 coop::set(self.0);
197 }
198 });
199 }
200 }
201
202 let mut had_entered = false;
203
204 CURRENT.with(|maybe_cx| {
205 match (crate::runtime::enter::context(), maybe_cx.is_some()) {
206 (EnterContext::Entered { .. }, true) => {
207 // We are on a thread pool runtime thread, so we just need to set up blocking.
208 had_entered = true;
209 }
210 (EnterContext::Entered { allow_blocking }, false) => {
211 // We are on an executor, but _not_ on the thread pool.
212 // That is _only_ okay if we are in a thread pool runtime's block_on method:
213 if allow_blocking {
214 had_entered = true;
215 return;
216 } else {
217 // This probably means we are on the basic_scheduler or in a LocalSet,
218 // where it is _not_ okay to block.
219 panic!("can call blocking only when running on the multi-threaded runtime");
220 }
221 }
222 (EnterContext::NotEntered, true) => {
223 // This is a nested call to block_in_place (we already exited).
224 // All the necessary setup has already been done.
225 return;
226 }
227 (EnterContext::NotEntered, false) => {
228 // We are outside of the tokio runtime, so blocking is fine.
229 // We can also skip all of the thread pool blocking setup steps.
230 return;
231 }
232 }
233
234 let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
235
236 // Get the worker core. If none is set, then blocking is fine!
237 let core = match cx.core.borrow_mut().take() {
238 Some(core) => core,
239 None => return,
240 };
241
242 // The parker should be set here
243 assert!(core.park.is_some());
244
245 // In order to block, the core must be sent to another thread for
246 // execution.
247 //
248 // First, move the core back into the worker's shared core slot.
249 cx.worker.core.set(core);
250
251 // Next, clone the worker handle and send it to a new thread for
252 // processing.
253 //
254 // Once the blocking task is done executing, we will attempt to
255 // steal the core back.
256 let worker = cx.worker.clone();
257 runtime::spawn_blocking(move || run(worker));
258 });
259
260 if had_entered {
261 // Unset the current task's budget. Blocking sections are not
262 // constrained by task budgets.
263 let _reset = Reset(coop::stop());
264
265 crate::runtime::enter::exit(f)
266 } else {
267 f()
268 }
269 }
270 }
271
272 /// After how many ticks is the global queue polled. This helps to ensure
273 /// fairness.
274 ///
275 /// The number is fairly arbitrary. I believe this value was copied from golang.
276 const GLOBAL_POLL_INTERVAL: u8 = 61;
277
278 impl Launch {
launch(mut self)279 pub(crate) fn launch(mut self) {
280 for worker in self.0.drain(..) {
281 runtime::spawn_blocking(move || run(worker));
282 }
283 }
284 }
285
run(worker: Arc<Worker>)286 fn run(worker: Arc<Worker>) {
287 // Acquire a core. If this fails, then another thread is running this
288 // worker and there is nothing further to do.
289 let core = match worker.core.take() {
290 Some(core) => core,
291 None => return,
292 };
293
294 // Set the worker context.
295 let cx = Context {
296 worker,
297 core: RefCell::new(None),
298 };
299
300 let _enter = crate::runtime::enter(true);
301
302 CURRENT.set(&cx, || {
303 // This should always be an error. It only returns a `Result` to support
304 // using `?` to short circuit.
305 assert!(cx.run(core).is_err());
306 });
307 }
308
309 impl Context {
run(&self, mut core: Box<Core>) -> RunResult310 fn run(&self, mut core: Box<Core>) -> RunResult {
311 while !core.is_shutdown {
312 // Increment the tick
313 core.tick();
314
315 // Run maintenance, if needed
316 core = self.maintenance(core);
317
318 // First, check work available to the current worker.
319 if let Some(task) = core.next_task(&self.worker) {
320 core = self.run_task(task, core)?;
321 continue;
322 }
323
324 // There is no more **local** work to process, try to steal work
325 // from other workers.
326 if let Some(task) = core.steal_work(&self.worker) {
327 core = self.run_task(task, core)?;
328 } else {
329 // Wait for work
330 core = self.park(core);
331 }
332 }
333
334 // Signal shutdown
335 self.worker.shared.shutdown(core, self.worker.clone());
336 Err(())
337 }
338
run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult339 fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
340 // Make sure thew orker is not in the **searching** state. This enables
341 // another idle worker to try to steal work.
342 core.transition_from_searching(&self.worker);
343
344 // Make the core available to the runtime context
345 *self.core.borrow_mut() = Some(core);
346
347 // Run the task
348 coop::budget(|| {
349 task.run();
350
351 // As long as there is budget remaining and a task exists in the
352 // `lifo_slot`, then keep running.
353 loop {
354 // Check if we still have the core. If not, the core was stolen
355 // by another worker.
356 let mut core = match self.core.borrow_mut().take() {
357 Some(core) => core,
358 None => return Err(()),
359 };
360
361 // Check for a task in the LIFO slot
362 let task = match core.lifo_slot.take() {
363 Some(task) => task,
364 None => return Ok(core),
365 };
366
367 if coop::has_budget_remaining() {
368 // Run the LIFO task, then loop
369 *self.core.borrow_mut() = Some(core);
370 task.run();
371 } else {
372 // Not enough budget left to run the LIFO task, push it to
373 // the back of the queue and return.
374 core.run_queue.push_back(task, self.worker.inject());
375 return Ok(core);
376 }
377 }
378 })
379 }
380
maintenance(&self, mut core: Box<Core>) -> Box<Core>381 fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
382 if core.tick % GLOBAL_POLL_INTERVAL == 0 {
383 // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
384 // to run without actually putting the thread to sleep.
385 core = self.park_timeout(core, Some(Duration::from_millis(0)));
386
387 // Run regularly scheduled maintenance
388 core.maintenance(&self.worker);
389 }
390
391 core
392 }
393
park(&self, mut core: Box<Core>) -> Box<Core>394 fn park(&self, mut core: Box<Core>) -> Box<Core> {
395 core.transition_to_parked(&self.worker);
396
397 while !core.is_shutdown {
398 core = self.park_timeout(core, None);
399
400 // Run regularly scheduled maintenance
401 core.maintenance(&self.worker);
402
403 if core.transition_from_parked(&self.worker) {
404 return core;
405 }
406 }
407
408 core
409 }
410
park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core>411 fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
412 // Take the parker out of core
413 let mut park = core.park.take().expect("park missing");
414
415 // Store `core` in context
416 *self.core.borrow_mut() = Some(core);
417
418 // Park thread
419 if let Some(timeout) = duration {
420 park.park_timeout(timeout).expect("park failed");
421 } else {
422 park.park().expect("park failed");
423 }
424
425 // Remove `core` from context
426 core = self.core.borrow_mut().take().expect("core missing");
427
428 // Place `park` back in `core`
429 core.park = Some(park);
430
431 // If there are tasks available to steal, notify a worker
432 if core.run_queue.is_stealable() {
433 self.worker.shared.notify_parked();
434 }
435
436 core
437 }
438 }
439
440 impl Core {
441 /// Increment the tick
tick(&mut self)442 fn tick(&mut self) {
443 self.tick = self.tick.wrapping_add(1);
444 }
445
446 /// Return the next notified task available to this worker.
next_task(&mut self, worker: &Worker) -> Option<Notified>447 fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
448 if self.tick % GLOBAL_POLL_INTERVAL == 0 {
449 worker.inject().pop().or_else(|| self.next_local_task())
450 } else {
451 self.next_local_task().or_else(|| worker.inject().pop())
452 }
453 }
454
next_local_task(&mut self) -> Option<Notified>455 fn next_local_task(&mut self) -> Option<Notified> {
456 self.lifo_slot.take().or_else(|| self.run_queue.pop())
457 }
458
steal_work(&mut self, worker: &Worker) -> Option<Notified>459 fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
460 if !self.transition_to_searching(worker) {
461 return None;
462 }
463
464 let num = worker.shared.remotes.len();
465 // Start from a random worker
466 let start = self.rand.fastrand_n(num as u32) as usize;
467
468 for i in 0..num {
469 let i = (start + i) % num;
470
471 // Don't steal from ourself! We know we don't have work.
472 if i == worker.index {
473 continue;
474 }
475
476 let target = &worker.shared.remotes[i];
477 if let Some(task) = target.steal.steal_into(&mut self.run_queue) {
478 return Some(task);
479 }
480 }
481
482 // Fallback on checking the global queue
483 worker.shared.inject.pop()
484 }
485
transition_to_searching(&mut self, worker: &Worker) -> bool486 fn transition_to_searching(&mut self, worker: &Worker) -> bool {
487 if !self.is_searching {
488 self.is_searching = worker.shared.idle.transition_worker_to_searching();
489 }
490
491 self.is_searching
492 }
493
transition_from_searching(&mut self, worker: &Worker)494 fn transition_from_searching(&mut self, worker: &Worker) {
495 if !self.is_searching {
496 return;
497 }
498
499 self.is_searching = false;
500 worker.shared.transition_worker_from_searching();
501 }
502
503 /// Prepare the worker state for parking
transition_to_parked(&mut self, worker: &Worker)504 fn transition_to_parked(&mut self, worker: &Worker) {
505 // When the final worker transitions **out** of searching to parked, it
506 // must check all the queues one last time in case work materialized
507 // between the last work scan and transitioning out of searching.
508 let is_last_searcher = worker
509 .shared
510 .idle
511 .transition_worker_to_parked(worker.index, self.is_searching);
512
513 // The worker is no longer searching. Setting this is the local cache
514 // only.
515 self.is_searching = false;
516
517 if is_last_searcher {
518 worker.shared.notify_if_work_pending();
519 }
520 }
521
522 /// Returns `true` if the transition happened.
transition_from_parked(&mut self, worker: &Worker) -> bool523 fn transition_from_parked(&mut self, worker: &Worker) -> bool {
524 // If a task is in the lifo slot, then we must unpark regardless of
525 // being notified
526 if self.lifo_slot.is_some() {
527 worker.shared.idle.unpark_worker_by_id(worker.index);
528 self.is_searching = true;
529 return true;
530 }
531
532 if worker.shared.idle.is_parked(worker.index) {
533 return false;
534 }
535
536 // When unparked, the worker is in the searching state.
537 self.is_searching = true;
538 true
539 }
540
541 /// Runs maintenance work such as free pending tasks and check the pool's
542 /// state.
maintenance(&mut self, worker: &Worker)543 fn maintenance(&mut self, worker: &Worker) {
544 self.drain_pending_drop(worker);
545
546 if !self.is_shutdown {
547 // Check if the scheduler has been shutdown
548 self.is_shutdown = worker.inject().is_closed();
549 }
550 }
551
552 // Shutdown the core
shutdown(&mut self, worker: &Worker)553 fn shutdown(&mut self, worker: &Worker) {
554 // Take the core
555 let mut park = self.park.take().expect("park missing");
556
557 // Signal to all tasks to shut down.
558 for header in self.tasks.iter() {
559 header.shutdown();
560 }
561
562 loop {
563 self.drain_pending_drop(worker);
564
565 if self.tasks.is_empty() {
566 break;
567 }
568
569 // Wait until signalled
570 park.park().expect("park failed");
571 }
572
573 // Drain the queue
574 while self.next_local_task().is_some() {}
575
576 park.shutdown();
577 }
578
drain_pending_drop(&mut self, worker: &Worker)579 fn drain_pending_drop(&mut self, worker: &Worker) {
580 use std::mem::ManuallyDrop;
581
582 for task in worker.remote().pending_drop.drain() {
583 let task = ManuallyDrop::new(task);
584
585 // safety: tasks are only pushed into the `pending_drop` stacks that
586 // are associated with the list they are inserted into. When a task
587 // is pushed into `pending_drop`, the ref-inc is skipped, so we must
588 // not ref-dec here.
589 //
590 // See `bind` and `release` implementations.
591 unsafe {
592 self.tasks.remove(task.header().into());
593 }
594 }
595 }
596 }
597
598 impl Worker {
599 /// Returns a reference to the scheduler's injection queue
inject(&self) -> &queue::Inject<Arc<Worker>>600 fn inject(&self) -> &queue::Inject<Arc<Worker>> {
601 &self.shared.inject
602 }
603
604 /// Return a reference to this worker's remote data
remote(&self) -> &Remote605 fn remote(&self) -> &Remote {
606 &self.shared.remotes[self.index]
607 }
608
eq(&self, other: &Worker) -> bool609 fn eq(&self, other: &Worker) -> bool {
610 self.shared.ptr_eq(&other.shared) && self.index == other.index
611 }
612 }
613
614 impl task::Schedule for Arc<Worker> {
bind(task: Task) -> Arc<Worker>615 fn bind(task: Task) -> Arc<Worker> {
616 CURRENT.with(|maybe_cx| {
617 let cx = maybe_cx.expect("scheduler context missing");
618
619 // Track the task
620 cx.core
621 .borrow_mut()
622 .as_mut()
623 .expect("scheduler core missing")
624 .tasks
625 .push_front(task);
626
627 // Return a clone of the worker
628 cx.worker.clone()
629 })
630 }
631
release(&self, task: &Task) -> Option<Task>632 fn release(&self, task: &Task) -> Option<Task> {
633 use std::ptr::NonNull;
634
635 CURRENT.with(|maybe_cx| {
636 let cx = maybe_cx.expect("scheduler context missing");
637
638 if self.eq(&cx.worker) {
639 let mut maybe_core = cx.core.borrow_mut();
640
641 if let Some(core) = &mut *maybe_core {
642 // Directly remove the task
643 //
644 // safety: the task is inserted in the list in `bind`.
645 unsafe {
646 let ptr = NonNull::from(task.header());
647 return core.tasks.remove(ptr);
648 }
649 }
650 }
651
652 // Track the task to be released by the worker that owns it
653 //
654 // Safety: We get a new handle without incrementing the ref-count.
655 // A ref-count is held by the "owned" linked list and it is only
656 // ever removed from that list as part of the release process: this
657 // method or popping the task from `pending_drop`. Thus, we can rely
658 // on the ref-count held by the linked-list to keep the memory
659 // alive.
660 //
661 // When the task is removed from the stack, it is forgotten instead
662 // of dropped.
663 let task = unsafe { Task::from_raw(task.header().into()) };
664
665 self.remote().pending_drop.push(task);
666
667 if cx.core.borrow().is_some() {
668 return None;
669 }
670
671 // The worker core has been handed off to another thread. In the
672 // event that the scheduler is currently shutting down, the thread
673 // that owns the task may be waiting on the release to complete
674 // shutdown.
675 if self.inject().is_closed() {
676 self.remote().unpark.unpark();
677 }
678
679 None
680 })
681 }
682
schedule(&self, task: Notified)683 fn schedule(&self, task: Notified) {
684 self.shared.schedule(task, false);
685 }
686
yield_now(&self, task: Notified)687 fn yield_now(&self, task: Notified) {
688 self.shared.schedule(task, true);
689 }
690 }
691
692 impl Shared {
schedule(&self, task: Notified, is_yield: bool)693 pub(super) fn schedule(&self, task: Notified, is_yield: bool) {
694 CURRENT.with(|maybe_cx| {
695 if let Some(cx) = maybe_cx {
696 // Make sure the task is part of the **current** scheduler.
697 if self.ptr_eq(&cx.worker.shared) {
698 // And the current thread still holds a core
699 if let Some(core) = cx.core.borrow_mut().as_mut() {
700 self.schedule_local(core, task, is_yield);
701 return;
702 }
703 }
704 }
705
706 // Otherwise, use the inject queue
707 self.inject.push(task);
708 self.notify_parked();
709 });
710 }
711
schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool)712 fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
713 // Spawning from the worker thread. If scheduling a "yield" then the
714 // task must always be pushed to the back of the queue, enabling other
715 // tasks to be executed. If **not** a yield, then there is more
716 // flexibility and the task may go to the front of the queue.
717 let should_notify = if is_yield {
718 core.run_queue.push_back(task, &self.inject);
719 true
720 } else {
721 // Push to the LIFO slot
722 let prev = core.lifo_slot.take();
723 let ret = prev.is_some();
724
725 if let Some(prev) = prev {
726 core.run_queue.push_back(prev, &self.inject);
727 }
728
729 core.lifo_slot = Some(task);
730
731 ret
732 };
733
734 // Only notify if not currently parked. If `park` is `None`, then the
735 // scheduling is from a resource driver. As notifications often come in
736 // batches, the notification is delayed until the park is complete.
737 if should_notify && core.park.is_some() {
738 self.notify_parked();
739 }
740 }
741
close(&self)742 pub(super) fn close(&self) {
743 if self.inject.close() {
744 self.notify_all();
745 }
746 }
747
notify_parked(&self)748 fn notify_parked(&self) {
749 if let Some(index) = self.idle.worker_to_notify() {
750 self.remotes[index].unpark.unpark();
751 }
752 }
753
notify_all(&self)754 fn notify_all(&self) {
755 for remote in &self.remotes[..] {
756 remote.unpark.unpark();
757 }
758 }
759
notify_if_work_pending(&self)760 fn notify_if_work_pending(&self) {
761 for remote in &self.remotes[..] {
762 if !remote.steal.is_empty() {
763 self.notify_parked();
764 return;
765 }
766 }
767
768 if !self.inject.is_empty() {
769 self.notify_parked();
770 }
771 }
772
transition_worker_from_searching(&self)773 fn transition_worker_from_searching(&self) {
774 if self.idle.transition_worker_from_searching() {
775 // We are the final searching worker. Because work was found, we
776 // need to notify another worker.
777 self.notify_parked();
778 }
779 }
780
781 /// Signals that a worker has observed the shutdown signal and has replaced
782 /// its core back into its handle.
783 ///
784 /// If all workers have reached this point, the final cleanup is performed.
shutdown(&self, core: Box<Core>, worker: Arc<Worker>)785 fn shutdown(&self, core: Box<Core>, worker: Arc<Worker>) {
786 let mut workers = self.shutdown_workers.lock().unwrap();
787 workers.push((core, worker));
788
789 if workers.len() != self.remotes.len() {
790 return;
791 }
792
793 for (mut core, worker) in workers.drain(..) {
794 core.shutdown(&worker);
795 }
796
797 // Drain the injection queue
798 while self.inject.pop().is_some() {}
799 }
800
ptr_eq(&self, other: &Shared) -> bool801 fn ptr_eq(&self, other: &Shared) -> bool {
802 self as *const _ == other as *const _
803 }
804 }
805