1 //! Threadpool
2 
3 mod atomic_cell;
4 use atomic_cell::AtomicCell;
5 
6 mod idle;
7 use self::idle::Idle;
8 
9 mod worker;
10 pub(crate) use worker::Launch;
11 
12 pub(crate) use worker::block_in_place;
13 
14 use crate::loom::sync::Arc;
15 use crate::runtime::stats::RuntimeStats;
16 use crate::runtime::task::JoinHandle;
17 use crate::runtime::{Callback, Parker};
18 
19 use std::fmt;
20 use std::future::Future;
21 
22 /// Work-stealing based thread pool for executing futures.
23 pub(crate) struct ThreadPool {
24     spawner: Spawner,
25 }
26 
27 /// Submits futures to the associated thread pool for execution.
28 ///
29 /// A `Spawner` instance is a handle to a single thread pool that allows the owner
30 /// of the handle to spawn futures onto the thread pool.
31 ///
32 /// The `Spawner` handle is *only* used for spawning new futures. It does not
33 /// impact the lifecycle of the thread pool in any way. The thread pool may
34 /// shut down while there are outstanding `Spawner` instances.
35 ///
36 /// `Spawner` instances are obtained by calling [`ThreadPool::spawner`].
37 ///
38 /// [`ThreadPool::spawner`]: method@ThreadPool::spawner
39 #[derive(Clone)]
40 pub(crate) struct Spawner {
41     shared: Arc<worker::Shared>,
42 }
43 
44 // ===== impl ThreadPool =====
45 
46 impl ThreadPool {
new( size: usize, parker: Parker, before_park: Option<Callback>, after_unpark: Option<Callback>, ) -> (ThreadPool, Launch)47     pub(crate) fn new(
48         size: usize,
49         parker: Parker,
50         before_park: Option<Callback>,
51         after_unpark: Option<Callback>,
52     ) -> (ThreadPool, Launch) {
53         let (shared, launch) = worker::create(size, parker, before_park, after_unpark);
54         let spawner = Spawner { shared };
55         let thread_pool = ThreadPool { spawner };
56 
57         (thread_pool, launch)
58     }
59 
60     /// Returns reference to `Spawner`.
61     ///
62     /// The `Spawner` handle can be cloned and enables spawning tasks from other
63     /// threads.
spawner(&self) -> &Spawner64     pub(crate) fn spawner(&self) -> &Spawner {
65         &self.spawner
66     }
67 
68     /// Blocks the current thread waiting for the future to complete.
69     ///
70     /// The future will execute on the current thread, but all spawned tasks
71     /// will be executed on the thread pool.
block_on<F>(&self, future: F) -> F::Output where F: Future,72     pub(crate) fn block_on<F>(&self, future: F) -> F::Output
73     where
74         F: Future,
75     {
76         let mut enter = crate::runtime::enter(true);
77         enter.block_on(future).expect("failed to park thread")
78     }
79 }
80 
81 impl fmt::Debug for ThreadPool {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result82     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
83         fmt.debug_struct("ThreadPool").finish()
84     }
85 }
86 
87 impl Drop for ThreadPool {
drop(&mut self)88     fn drop(&mut self) {
89         self.spawner.shutdown();
90     }
91 }
92 
93 // ==== impl Spawner =====
94 
95 impl Spawner {
96     /// Spawns a future onto the thread pool
spawn<F>(&self, future: F) -> JoinHandle<F::Output> where F: crate::future::Future + Send + 'static, F::Output: Send + 'static,97     pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
98     where
99         F: crate::future::Future + Send + 'static,
100         F::Output: Send + 'static,
101     {
102         worker::Shared::bind_new_task(&self.shared, future)
103     }
104 
shutdown(&mut self)105     pub(crate) fn shutdown(&mut self) {
106         self.shared.close();
107     }
108 
stats(&self) -> &RuntimeStats109     pub(crate) fn stats(&self) -> &RuntimeStats {
110         self.shared.stats()
111     }
112 }
113 
114 impl fmt::Debug for Spawner {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result115     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
116         fmt.debug_struct("Spawner").finish()
117     }
118 }
119