1 //! A scheduler is initialized with a fixed number of workers. Each worker is
2 //! driven by a thread. Each worker has a "core" which contains data such as the
3 //! run queue and other state. When `block_in_place` is called, the worker's
4 //! "core" is handed off to a new thread allowing the scheduler to continue to
5 //! make progress while the originating thread blocks.
6 //!
7 //! # Shutdown
8 //!
9 //! Shutting down the runtime involves the following steps:
10 //!
11 //! 1. The Shared::close method is called. This closes the inject queue and
12 //! OwnedTasks instance and wakes up all worker threads.
13 //!
14 //! 2. Each worker thread observes the close signal next time it runs
15 //! Core::maintenance by checking whether the inject queue is closed.
16 //! The Core::is_shutdown flag is set to true.
17 //!
18 //! 3. The worker thread calls `pre_shutdown` in parallel. Here, the worker
19 //! will keep removing tasks from OwnedTasks until it is empty. No new
20 //! tasks can be pushed to the OwnedTasks during or after this step as it
21 //! was closed in step 1.
22 //!
23 //! 5. The workers call Shared::shutdown to enter the single-threaded phase of
24 //! shutdown. These calls will push their core to Shared::shutdown_cores,
25 //! and the last thread to push its core will finish the shutdown procedure.
26 //!
27 //! 6. The local run queue of each core is emptied, then the inject queue is
28 //! emptied.
29 //!
30 //! At this point, shutdown has completed. It is not possible for any of the
31 //! collections to contain any tasks at this point, as each collection was
32 //! closed first, then emptied afterwards.
33 //!
34 //! ## Spawns during shutdown
35 //!
36 //! When spawning tasks during shutdown, there are two cases:
37 //!
38 //! * The spawner observes the OwnedTasks being open, and the inject queue is
39 //! closed.
40 //! * The spawner observes the OwnedTasks being closed and doesn't check the
41 //! inject queue.
42 //!
43 //! The first case can only happen if the OwnedTasks::bind call happens before
44 //! or during step 1 of shutdown. In this case, the runtime will clean up the
45 //! task in step 3 of shutdown.
46 //!
47 //! In the latter case, the task was not spawned and the task is immediately
48 //! cancelled by the spawner.
49 //!
50 //! The correctness of shutdown requires both the inject queue and OwnedTasks
51 //! collection to have a closed bit. With a close bit on only the inject queue,
52 //! spawning could run in to a situation where a task is successfully bound long
53 //! after the runtime has shut down. With a close bit on only the OwnedTasks,
54 //! the first spawning situation could result in the notification being pushed
55 //! to the inject queue after step 6 of shutdown, which would leave a task in
56 //! the inject queue indefinitely. This would be a ref-count cycle and a memory
57 //! leak.
58
59 use crate::coop;
60 use crate::future::Future;
61 use crate::loom::rand::seed;
62 use crate::loom::sync::{Arc, Mutex};
63 use crate::park::{Park, Unpark};
64 use crate::runtime;
65 use crate::runtime::enter::EnterContext;
66 use crate::runtime::park::{Parker, Unparker};
67 use crate::runtime::task::{Inject, JoinHandle, OwnedTasks};
68 use crate::runtime::thread_pool::{AtomicCell, Idle};
69 use crate::runtime::{queue, task};
70 use crate::util::FastRand;
71
72 use std::cell::RefCell;
73 use std::time::Duration;
74
75 /// A scheduler worker
76 pub(super) struct Worker {
77 /// Reference to shared state
78 shared: Arc<Shared>,
79
80 /// Index holding this worker's remote state
81 index: usize,
82
83 /// Used to hand-off a worker's core to another thread.
84 core: AtomicCell<Core>,
85 }
86
87 /// Core data
88 struct Core {
89 /// Used to schedule bookkeeping tasks every so often.
90 tick: u8,
91
92 /// When a task is scheduled from a worker, it is stored in this slot. The
93 /// worker will check this slot for a task **before** checking the run
94 /// queue. This effectively results in the **last** scheduled task to be run
95 /// next (LIFO). This is an optimization for message passing patterns and
96 /// helps to reduce latency.
97 lifo_slot: Option<Notified>,
98
99 /// The worker-local run queue.
100 run_queue: queue::Local<Arc<Shared>>,
101
102 /// True if the worker is currently searching for more work. Searching
103 /// involves attempting to steal from other workers.
104 is_searching: bool,
105
106 /// True if the scheduler is being shutdown
107 is_shutdown: bool,
108
109 /// Parker
110 ///
111 /// Stored in an `Option` as the parker is added / removed to make the
112 /// borrow checker happy.
113 park: Option<Parker>,
114
115 /// Fast random number generator.
116 rand: FastRand,
117 }
118
119 /// State shared across all workers
120 pub(super) struct Shared {
121 /// Per-worker remote state. All other workers have access to this and is
122 /// how they communicate between each other.
123 remotes: Box<[Remote]>,
124
125 /// Submit work to the scheduler while **not** currently on a worker thread.
126 inject: Inject<Arc<Shared>>,
127
128 /// Coordinates idle workers
129 idle: Idle,
130
131 /// Collection of all active tasks spawned onto this executor.
132 owned: OwnedTasks<Arc<Shared>>,
133
134 /// Cores that have observed the shutdown signal
135 ///
136 /// The core is **not** placed back in the worker to avoid it from being
137 /// stolen by a thread that was spawned as part of `block_in_place`.
138 #[allow(clippy::vec_box)] // we're moving an already-boxed value
139 shutdown_cores: Mutex<Vec<Box<Core>>>,
140 }
141
142 /// Used to communicate with a worker from other threads.
143 struct Remote {
144 /// Steal tasks from this worker.
145 steal: queue::Steal<Arc<Shared>>,
146
147 /// Unparks the associated worker thread
148 unpark: Unparker,
149 }
150
151 /// Thread-local context
152 struct Context {
153 /// Worker
154 worker: Arc<Worker>,
155
156 /// Core data
157 core: RefCell<Option<Box<Core>>>,
158 }
159
160 /// Starts the workers
161 pub(crate) struct Launch(Vec<Arc<Worker>>);
162
163 /// Running a task may consume the core. If the core is still available when
164 /// running the task completes, it is returned. Otherwise, the worker will need
165 /// to stop processing.
166 type RunResult = Result<Box<Core>, ()>;
167
168 /// A task handle
169 type Task = task::Task<Arc<Shared>>;
170
171 /// A notified task handle
172 type Notified = task::Notified<Arc<Shared>>;
173
174 // Tracks thread-local state
175 scoped_thread_local!(static CURRENT: Context);
176
create(size: usize, park: Parker) -> (Arc<Shared>, Launch)177 pub(super) fn create(size: usize, park: Parker) -> (Arc<Shared>, Launch) {
178 let mut cores = vec![];
179 let mut remotes = vec![];
180
181 // Create the local queues
182 for _ in 0..size {
183 let (steal, run_queue) = queue::local();
184
185 let park = park.clone();
186 let unpark = park.unpark();
187
188 cores.push(Box::new(Core {
189 tick: 0,
190 lifo_slot: None,
191 run_queue,
192 is_searching: false,
193 is_shutdown: false,
194 park: Some(park),
195 rand: FastRand::new(seed()),
196 }));
197
198 remotes.push(Remote { steal, unpark });
199 }
200
201 let shared = Arc::new(Shared {
202 remotes: remotes.into_boxed_slice(),
203 inject: Inject::new(),
204 idle: Idle::new(size),
205 owned: OwnedTasks::new(),
206 shutdown_cores: Mutex::new(vec![]),
207 });
208
209 let mut launch = Launch(vec![]);
210
211 for (index, core) in cores.drain(..).enumerate() {
212 launch.0.push(Arc::new(Worker {
213 shared: shared.clone(),
214 index,
215 core: AtomicCell::new(Some(core)),
216 }));
217 }
218
219 (shared, launch)
220 }
221
block_in_place<F, R>(f: F) -> R where F: FnOnce() -> R,222 pub(crate) fn block_in_place<F, R>(f: F) -> R
223 where
224 F: FnOnce() -> R,
225 {
226 // Try to steal the worker core back
227 struct Reset(coop::Budget);
228
229 impl Drop for Reset {
230 fn drop(&mut self) {
231 CURRENT.with(|maybe_cx| {
232 if let Some(cx) = maybe_cx {
233 let core = cx.worker.core.take();
234 let mut cx_core = cx.core.borrow_mut();
235 assert!(cx_core.is_none());
236 *cx_core = core;
237
238 // Reset the task budget as we are re-entering the
239 // runtime.
240 coop::set(self.0);
241 }
242 });
243 }
244 }
245
246 let mut had_entered = false;
247
248 CURRENT.with(|maybe_cx| {
249 match (crate::runtime::enter::context(), maybe_cx.is_some()) {
250 (EnterContext::Entered { .. }, true) => {
251 // We are on a thread pool runtime thread, so we just need to
252 // set up blocking.
253 had_entered = true;
254 }
255 (EnterContext::Entered { allow_blocking }, false) => {
256 // We are on an executor, but _not_ on the thread pool. That is
257 // _only_ okay if we are in a thread pool runtime's block_on
258 // method:
259 if allow_blocking {
260 had_entered = true;
261 return;
262 } else {
263 // This probably means we are on the basic_scheduler or in a
264 // LocalSet, where it is _not_ okay to block.
265 panic!("can call blocking only when running on the multi-threaded runtime");
266 }
267 }
268 (EnterContext::NotEntered, true) => {
269 // This is a nested call to block_in_place (we already exited).
270 // All the necessary setup has already been done.
271 return;
272 }
273 (EnterContext::NotEntered, false) => {
274 // We are outside of the tokio runtime, so blocking is fine.
275 // We can also skip all of the thread pool blocking setup steps.
276 return;
277 }
278 }
279
280 let cx = maybe_cx.expect("no .is_some() == false cases above should lead here");
281
282 // Get the worker core. If none is set, then blocking is fine!
283 let core = match cx.core.borrow_mut().take() {
284 Some(core) => core,
285 None => return,
286 };
287
288 // The parker should be set here
289 assert!(core.park.is_some());
290
291 // In order to block, the core must be sent to another thread for
292 // execution.
293 //
294 // First, move the core back into the worker's shared core slot.
295 cx.worker.core.set(core);
296
297 // Next, clone the worker handle and send it to a new thread for
298 // processing.
299 //
300 // Once the blocking task is done executing, we will attempt to
301 // steal the core back.
302 let worker = cx.worker.clone();
303 runtime::spawn_blocking(move || run(worker));
304 });
305
306 if had_entered {
307 // Unset the current task's budget. Blocking sections are not
308 // constrained by task budgets.
309 let _reset = Reset(coop::stop());
310
311 crate::runtime::enter::exit(f)
312 } else {
313 f()
314 }
315 }
316
317 /// After how many ticks is the global queue polled. This helps to ensure
318 /// fairness.
319 ///
320 /// The number is fairly arbitrary. I believe this value was copied from golang.
321 const GLOBAL_POLL_INTERVAL: u8 = 61;
322
323 impl Launch {
launch(mut self)324 pub(crate) fn launch(mut self) {
325 for worker in self.0.drain(..) {
326 runtime::spawn_blocking(move || run(worker));
327 }
328 }
329 }
330
run(worker: Arc<Worker>)331 fn run(worker: Arc<Worker>) {
332 // Acquire a core. If this fails, then another thread is running this
333 // worker and there is nothing further to do.
334 let core = match worker.core.take() {
335 Some(core) => core,
336 None => return,
337 };
338
339 // Set the worker context.
340 let cx = Context {
341 worker,
342 core: RefCell::new(None),
343 };
344
345 let _enter = crate::runtime::enter(true);
346
347 CURRENT.set(&cx, || {
348 // This should always be an error. It only returns a `Result` to support
349 // using `?` to short circuit.
350 assert!(cx.run(core).is_err());
351 });
352 }
353
354 impl Context {
run(&self, mut core: Box<Core>) -> RunResult355 fn run(&self, mut core: Box<Core>) -> RunResult {
356 while !core.is_shutdown {
357 // Increment the tick
358 core.tick();
359
360 // Run maintenance, if needed
361 core = self.maintenance(core);
362
363 // First, check work available to the current worker.
364 if let Some(task) = core.next_task(&self.worker) {
365 core = self.run_task(task, core)?;
366 continue;
367 }
368
369 // There is no more **local** work to process, try to steal work
370 // from other workers.
371 if let Some(task) = core.steal_work(&self.worker) {
372 core = self.run_task(task, core)?;
373 } else {
374 // Wait for work
375 core = self.park(core);
376 }
377 }
378
379 core.pre_shutdown(&self.worker);
380
381 // Signal shutdown
382 self.worker.shared.shutdown(core);
383 Err(())
384 }
385
run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult386 fn run_task(&self, task: Notified, mut core: Box<Core>) -> RunResult {
387 let task = self.worker.shared.owned.assert_owner(task);
388
389 // Make sure the worker is not in the **searching** state. This enables
390 // another idle worker to try to steal work.
391 core.transition_from_searching(&self.worker);
392
393 // Make the core available to the runtime context
394 *self.core.borrow_mut() = Some(core);
395
396 // Run the task
397 coop::budget(|| {
398 task.run();
399
400 // As long as there is budget remaining and a task exists in the
401 // `lifo_slot`, then keep running.
402 loop {
403 // Check if we still have the core. If not, the core was stolen
404 // by another worker.
405 let mut core = match self.core.borrow_mut().take() {
406 Some(core) => core,
407 None => return Err(()),
408 };
409
410 // Check for a task in the LIFO slot
411 let task = match core.lifo_slot.take() {
412 Some(task) => task,
413 None => return Ok(core),
414 };
415
416 if coop::has_budget_remaining() {
417 // Run the LIFO task, then loop
418 *self.core.borrow_mut() = Some(core);
419 let task = self.worker.shared.owned.assert_owner(task);
420 task.run();
421 } else {
422 // Not enough budget left to run the LIFO task, push it to
423 // the back of the queue and return.
424 core.run_queue.push_back(task, self.worker.inject());
425 return Ok(core);
426 }
427 }
428 })
429 }
430
maintenance(&self, mut core: Box<Core>) -> Box<Core>431 fn maintenance(&self, mut core: Box<Core>) -> Box<Core> {
432 if core.tick % GLOBAL_POLL_INTERVAL == 0 {
433 // Call `park` with a 0 timeout. This enables the I/O driver, timer, ...
434 // to run without actually putting the thread to sleep.
435 core = self.park_timeout(core, Some(Duration::from_millis(0)));
436
437 // Run regularly scheduled maintenance
438 core.maintenance(&self.worker);
439 }
440
441 core
442 }
443
park(&self, mut core: Box<Core>) -> Box<Core>444 fn park(&self, mut core: Box<Core>) -> Box<Core> {
445 core.transition_to_parked(&self.worker);
446
447 while !core.is_shutdown {
448 core = self.park_timeout(core, None);
449
450 // Run regularly scheduled maintenance
451 core.maintenance(&self.worker);
452
453 if core.transition_from_parked(&self.worker) {
454 return core;
455 }
456 }
457
458 core
459 }
460
park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core>461 fn park_timeout(&self, mut core: Box<Core>, duration: Option<Duration>) -> Box<Core> {
462 // Take the parker out of core
463 let mut park = core.park.take().expect("park missing");
464
465 // Store `core` in context
466 *self.core.borrow_mut() = Some(core);
467
468 // Park thread
469 if let Some(timeout) = duration {
470 park.park_timeout(timeout).expect("park failed");
471 } else {
472 park.park().expect("park failed");
473 }
474
475 // Remove `core` from context
476 core = self.core.borrow_mut().take().expect("core missing");
477
478 // Place `park` back in `core`
479 core.park = Some(park);
480
481 // If there are tasks available to steal, notify a worker
482 if core.run_queue.is_stealable() {
483 self.worker.shared.notify_parked();
484 }
485
486 core
487 }
488 }
489
490 impl Core {
491 /// Increment the tick
tick(&mut self)492 fn tick(&mut self) {
493 self.tick = self.tick.wrapping_add(1);
494 }
495
496 /// Return the next notified task available to this worker.
next_task(&mut self, worker: &Worker) -> Option<Notified>497 fn next_task(&mut self, worker: &Worker) -> Option<Notified> {
498 if self.tick % GLOBAL_POLL_INTERVAL == 0 {
499 worker.inject().pop().or_else(|| self.next_local_task())
500 } else {
501 self.next_local_task().or_else(|| worker.inject().pop())
502 }
503 }
504
next_local_task(&mut self) -> Option<Notified>505 fn next_local_task(&mut self) -> Option<Notified> {
506 self.lifo_slot.take().or_else(|| self.run_queue.pop())
507 }
508
steal_work(&mut self, worker: &Worker) -> Option<Notified>509 fn steal_work(&mut self, worker: &Worker) -> Option<Notified> {
510 if !self.transition_to_searching(worker) {
511 return None;
512 }
513
514 let num = worker.shared.remotes.len();
515 // Start from a random worker
516 let start = self.rand.fastrand_n(num as u32) as usize;
517
518 for i in 0..num {
519 let i = (start + i) % num;
520
521 // Don't steal from ourself! We know we don't have work.
522 if i == worker.index {
523 continue;
524 }
525
526 let target = &worker.shared.remotes[i];
527 if let Some(task) = target.steal.steal_into(&mut self.run_queue) {
528 return Some(task);
529 }
530 }
531
532 // Fallback on checking the global queue
533 worker.shared.inject.pop()
534 }
535
transition_to_searching(&mut self, worker: &Worker) -> bool536 fn transition_to_searching(&mut self, worker: &Worker) -> bool {
537 if !self.is_searching {
538 self.is_searching = worker.shared.idle.transition_worker_to_searching();
539 }
540
541 self.is_searching
542 }
543
transition_from_searching(&mut self, worker: &Worker)544 fn transition_from_searching(&mut self, worker: &Worker) {
545 if !self.is_searching {
546 return;
547 }
548
549 self.is_searching = false;
550 worker.shared.transition_worker_from_searching();
551 }
552
553 /// Prepare the worker state for parking
transition_to_parked(&mut self, worker: &Worker)554 fn transition_to_parked(&mut self, worker: &Worker) {
555 // When the final worker transitions **out** of searching to parked, it
556 // must check all the queues one last time in case work materialized
557 // between the last work scan and transitioning out of searching.
558 let is_last_searcher = worker
559 .shared
560 .idle
561 .transition_worker_to_parked(worker.index, self.is_searching);
562
563 // The worker is no longer searching. Setting this is the local cache
564 // only.
565 self.is_searching = false;
566
567 if is_last_searcher {
568 worker.shared.notify_if_work_pending();
569 }
570 }
571
572 /// Returns `true` if the transition happened.
transition_from_parked(&mut self, worker: &Worker) -> bool573 fn transition_from_parked(&mut self, worker: &Worker) -> bool {
574 // If a task is in the lifo slot, then we must unpark regardless of
575 // being notified
576 if self.lifo_slot.is_some() {
577 worker.shared.idle.unpark_worker_by_id(worker.index);
578 self.is_searching = true;
579 return true;
580 }
581
582 if worker.shared.idle.is_parked(worker.index) {
583 return false;
584 }
585
586 // When unparked, the worker is in the searching state.
587 self.is_searching = true;
588 true
589 }
590
591 /// Runs maintenance work such as checking the pool's state.
maintenance(&mut self, worker: &Worker)592 fn maintenance(&mut self, worker: &Worker) {
593 if !self.is_shutdown {
594 // Check if the scheduler has been shutdown
595 self.is_shutdown = worker.inject().is_closed();
596 }
597 }
598
599 /// Signals all tasks to shut down, and waits for them to complete. Must run
600 /// before we enter the single-threaded phase of shutdown processing.
pre_shutdown(&mut self, worker: &Worker)601 fn pre_shutdown(&mut self, worker: &Worker) {
602 // Signal to all tasks to shut down.
603 worker.shared.owned.close_and_shutdown_all();
604 }
605
606 /// Shutdown the core
shutdown(&mut self)607 fn shutdown(&mut self) {
608 // Take the core
609 let mut park = self.park.take().expect("park missing");
610
611 // Drain the queue
612 while self.next_local_task().is_some() {}
613
614 park.shutdown();
615 }
616 }
617
618 impl Worker {
619 /// Returns a reference to the scheduler's injection queue
inject(&self) -> &Inject<Arc<Shared>>620 fn inject(&self) -> &Inject<Arc<Shared>> {
621 &self.shared.inject
622 }
623 }
624
625 impl task::Schedule for Arc<Shared> {
release(&self, task: &Task) -> Option<Task>626 fn release(&self, task: &Task) -> Option<Task> {
627 self.owned.remove(task)
628 }
629
schedule(&self, task: Notified)630 fn schedule(&self, task: Notified) {
631 (**self).schedule(task, false);
632 }
633
yield_now(&self, task: Notified)634 fn yield_now(&self, task: Notified) {
635 (**self).schedule(task, true);
636 }
637 }
638
639 impl Shared {
bind_new_task<T>(me: &Arc<Self>, future: T) -> JoinHandle<T::Output> where T: Future + Send + 'static, T::Output: Send + 'static,640 pub(super) fn bind_new_task<T>(me: &Arc<Self>, future: T) -> JoinHandle<T::Output>
641 where
642 T: Future + Send + 'static,
643 T::Output: Send + 'static,
644 {
645 let (handle, notified) = me.owned.bind(future, me.clone());
646
647 if let Some(notified) = notified {
648 me.schedule(notified, false);
649 }
650
651 handle
652 }
653
schedule(&self, task: Notified, is_yield: bool)654 pub(super) fn schedule(&self, task: Notified, is_yield: bool) {
655 CURRENT.with(|maybe_cx| {
656 if let Some(cx) = maybe_cx {
657 // Make sure the task is part of the **current** scheduler.
658 if self.ptr_eq(&cx.worker.shared) {
659 // And the current thread still holds a core
660 if let Some(core) = cx.core.borrow_mut().as_mut() {
661 self.schedule_local(core, task, is_yield);
662 return;
663 }
664 }
665 }
666
667 // Otherwise, use the inject queue.
668 self.inject.push(task);
669 self.notify_parked();
670 })
671 }
672
schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool)673 fn schedule_local(&self, core: &mut Core, task: Notified, is_yield: bool) {
674 // Spawning from the worker thread. If scheduling a "yield" then the
675 // task must always be pushed to the back of the queue, enabling other
676 // tasks to be executed. If **not** a yield, then there is more
677 // flexibility and the task may go to the front of the queue.
678 let should_notify = if is_yield {
679 core.run_queue.push_back(task, &self.inject);
680 true
681 } else {
682 // Push to the LIFO slot
683 let prev = core.lifo_slot.take();
684 let ret = prev.is_some();
685
686 if let Some(prev) = prev {
687 core.run_queue.push_back(prev, &self.inject);
688 }
689
690 core.lifo_slot = Some(task);
691
692 ret
693 };
694
695 // Only notify if not currently parked. If `park` is `None`, then the
696 // scheduling is from a resource driver. As notifications often come in
697 // batches, the notification is delayed until the park is complete.
698 if should_notify && core.park.is_some() {
699 self.notify_parked();
700 }
701 }
702
close(&self)703 pub(super) fn close(&self) {
704 if self.inject.close() {
705 self.notify_all();
706 }
707 }
708
notify_parked(&self)709 fn notify_parked(&self) {
710 if let Some(index) = self.idle.worker_to_notify() {
711 self.remotes[index].unpark.unpark();
712 }
713 }
714
notify_all(&self)715 fn notify_all(&self) {
716 for remote in &self.remotes[..] {
717 remote.unpark.unpark();
718 }
719 }
720
notify_if_work_pending(&self)721 fn notify_if_work_pending(&self) {
722 for remote in &self.remotes[..] {
723 if !remote.steal.is_empty() {
724 self.notify_parked();
725 return;
726 }
727 }
728
729 if !self.inject.is_empty() {
730 self.notify_parked();
731 }
732 }
733
transition_worker_from_searching(&self)734 fn transition_worker_from_searching(&self) {
735 if self.idle.transition_worker_from_searching() {
736 // We are the final searching worker. Because work was found, we
737 // need to notify another worker.
738 self.notify_parked();
739 }
740 }
741
742 /// Signals that a worker has observed the shutdown signal and has replaced
743 /// its core back into its handle.
744 ///
745 /// If all workers have reached this point, the final cleanup is performed.
shutdown(&self, core: Box<Core>)746 fn shutdown(&self, core: Box<Core>) {
747 let mut cores = self.shutdown_cores.lock();
748 cores.push(core);
749
750 if cores.len() != self.remotes.len() {
751 return;
752 }
753
754 debug_assert!(self.owned.is_empty());
755
756 for mut core in cores.drain(..) {
757 core.shutdown();
758 }
759
760 // Drain the injection queue
761 //
762 // We already shut down every task, so we can simply drop the tasks.
763 while let Some(task) = self.inject.pop() {
764 drop(task);
765 }
766 }
767
ptr_eq(&self, other: &Shared) -> bool768 fn ptr_eq(&self, other: &Shared) -> bool {
769 std::ptr::eq(self, other)
770 }
771 }
772