1 //! Thread pool for blocking operations
2
3 use crate::loom::sync::{Arc, Condvar, Mutex};
4 use crate::loom::thread;
5 use crate::runtime::blocking::schedule::NoopSchedule;
6 use crate::runtime::blocking::shutdown;
7 use crate::runtime::blocking::task::BlockingTask;
8 use crate::runtime::task::{self, JoinHandle};
9 use crate::runtime::{Builder, Callback, Handle};
10
11 use std::collections::VecDeque;
12 use std::fmt;
13 use std::time::Duration;
14
15 pub(crate) struct BlockingPool {
16 spawner: Spawner,
17 shutdown_rx: shutdown::Receiver,
18 }
19
20 #[derive(Clone)]
21 pub(crate) struct Spawner {
22 inner: Arc<Inner>,
23 }
24
25 struct Inner {
26 /// State shared between worker threads
27 shared: Mutex<Shared>,
28
29 /// Pool threads wait on this.
30 condvar: Condvar,
31
32 /// Spawned threads use this name
33 thread_name: String,
34
35 /// Spawned thread stack size
36 stack_size: Option<usize>,
37
38 /// Call after a thread starts
39 after_start: Option<Callback>,
40
41 /// Call before a thread stops
42 before_stop: Option<Callback>,
43
44 thread_cap: usize,
45 }
46
47 struct Shared {
48 queue: VecDeque<Task>,
49 num_th: usize,
50 num_idle: u32,
51 num_notify: u32,
52 shutdown: bool,
53 shutdown_tx: Option<shutdown::Sender>,
54 }
55
56 type Task = task::Notified<NoopSchedule>;
57
58 const KEEP_ALIVE: Duration = Duration::from_secs(10);
59
60 /// Run the provided function on an executor dedicated to blocking operations.
spawn_blocking<F, R>(func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static,61 pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
62 where
63 F: FnOnce() -> R + Send + 'static,
64 {
65 let rt = Handle::current();
66
67 let (task, handle) = task::joinable(BlockingTask::new(func));
68 let _ = rt.blocking_spawner.spawn(task, &rt);
69 handle
70 }
71
72 #[allow(dead_code)]
try_spawn_blocking<F, R>(func: F) -> Result<(), ()> where F: FnOnce() -> R + Send + 'static,73 pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()>
74 where
75 F: FnOnce() -> R + Send + 'static,
76 {
77 let rt = Handle::current();
78
79 let (task, _handle) = task::joinable(BlockingTask::new(func));
80 rt.blocking_spawner.spawn(task, &rt)
81 }
82
83 // ===== impl BlockingPool =====
84
85 impl BlockingPool {
new(builder: &Builder, thread_cap: usize) -> BlockingPool86 pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
87 let (shutdown_tx, shutdown_rx) = shutdown::channel();
88
89 BlockingPool {
90 spawner: Spawner {
91 inner: Arc::new(Inner {
92 shared: Mutex::new(Shared {
93 queue: VecDeque::new(),
94 num_th: 0,
95 num_idle: 0,
96 num_notify: 0,
97 shutdown: false,
98 shutdown_tx: Some(shutdown_tx),
99 }),
100 condvar: Condvar::new(),
101 thread_name: builder.thread_name.clone(),
102 stack_size: builder.thread_stack_size,
103 after_start: builder.after_start.clone(),
104 before_stop: builder.before_stop.clone(),
105 thread_cap,
106 }),
107 },
108 shutdown_rx,
109 }
110 }
111
spawner(&self) -> &Spawner112 pub(crate) fn spawner(&self) -> &Spawner {
113 &self.spawner
114 }
115
shutdown(&mut self, timeout: Option<Duration>)116 pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) {
117 let mut shared = self.spawner.inner.shared.lock().unwrap();
118
119 // The function can be called multiple times. First, by explicitly
120 // calling `shutdown` then by the drop handler calling `shutdown`. This
121 // prevents shutting down twice.
122 if shared.shutdown {
123 return;
124 }
125
126 shared.shutdown = true;
127 shared.shutdown_tx = None;
128 self.spawner.inner.condvar.notify_all();
129
130 drop(shared);
131
132 self.shutdown_rx.wait(timeout);
133 }
134 }
135
136 impl Drop for BlockingPool {
drop(&mut self)137 fn drop(&mut self) {
138 self.shutdown(None);
139 }
140 }
141
142 impl fmt::Debug for BlockingPool {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result143 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
144 fmt.debug_struct("BlockingPool").finish()
145 }
146 }
147
148 // ===== impl Spawner =====
149
150 impl Spawner {
spawn(&self, task: Task, rt: &Handle) -> Result<(), ()>151 fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
152 let shutdown_tx = {
153 let mut shared = self.inner.shared.lock().unwrap();
154
155 if shared.shutdown {
156 // Shutdown the task
157 task.shutdown();
158
159 // no need to even push this task; it would never get picked up
160 return Err(());
161 }
162
163 shared.queue.push_back(task);
164
165 if shared.num_idle == 0 {
166 // No threads are able to process the task.
167
168 if shared.num_th == self.inner.thread_cap {
169 // At max number of threads
170 None
171 } else {
172 shared.num_th += 1;
173 assert!(shared.shutdown_tx.is_some());
174 shared.shutdown_tx.clone()
175 }
176 } else {
177 // Notify an idle worker thread. The notification counter
178 // is used to count the needed amount of notifications
179 // exactly. Thread libraries may generate spurious
180 // wakeups, this counter is used to keep us in a
181 // consistent state.
182 shared.num_idle -= 1;
183 shared.num_notify += 1;
184 self.inner.condvar.notify_one();
185 None
186 }
187 };
188
189 if let Some(shutdown_tx) = shutdown_tx {
190 self.spawn_thread(shutdown_tx, rt);
191 }
192
193 Ok(())
194 }
195
spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle)196 fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) {
197 let mut builder = thread::Builder::new().name(self.inner.thread_name.clone());
198
199 if let Some(stack_size) = self.inner.stack_size {
200 builder = builder.stack_size(stack_size);
201 }
202
203 let rt = rt.clone();
204
205 builder
206 .spawn(move || {
207 // Only the reference should be moved into the closure
208 let rt = &rt;
209 rt.enter(move || {
210 rt.blocking_spawner.inner.run();
211 drop(shutdown_tx);
212 })
213 })
214 .unwrap();
215 }
216 }
217
218 impl Inner {
run(&self)219 fn run(&self) {
220 if let Some(f) = &self.after_start {
221 f()
222 }
223
224 let mut shared = self.shared.lock().unwrap();
225
226 'main: loop {
227 // BUSY
228 while let Some(task) = shared.queue.pop_front() {
229 drop(shared);
230 task.run();
231
232 shared = self.shared.lock().unwrap();
233 }
234
235 // IDLE
236 shared.num_idle += 1;
237
238 while !shared.shutdown {
239 let lock_result = self.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap();
240
241 shared = lock_result.0;
242 let timeout_result = lock_result.1;
243
244 if shared.num_notify != 0 {
245 // We have received a legitimate wakeup,
246 // acknowledge it by decrementing the counter
247 // and transition to the BUSY state.
248 shared.num_notify -= 1;
249 break;
250 }
251
252 // Even if the condvar "timed out", if the pool is entering the
253 // shutdown phase, we want to perform the cleanup logic.
254 if !shared.shutdown && timeout_result.timed_out() {
255 break 'main;
256 }
257
258 // Spurious wakeup detected, go back to sleep.
259 }
260
261 if shared.shutdown {
262 // Drain the queue
263 while let Some(task) = shared.queue.pop_front() {
264 drop(shared);
265 task.shutdown();
266
267 shared = self.shared.lock().unwrap();
268 }
269
270 // Work was produced, and we "took" it (by decrementing num_notify).
271 // This means that num_idle was decremented once for our wakeup.
272 // But, since we are exiting, we need to "undo" that, as we'll stay idle.
273 shared.num_idle += 1;
274 // NOTE: Technically we should also do num_notify++ and notify again,
275 // but since we're shutting down anyway, that won't be necessary.
276 break;
277 }
278 }
279
280 // Thread exit
281 shared.num_th -= 1;
282
283 // num_idle should now be tracked exactly, panic
284 // with a descriptive message if it is not the
285 // case.
286 shared.num_idle = shared
287 .num_idle
288 .checked_sub(1)
289 .expect("num_idle underflowed on thread exit");
290
291 if shared.shutdown && shared.num_th == 0 {
292 self.condvar.notify_one();
293 }
294
295 drop(shared);
296
297 if let Some(f) = &self.before_stop {
298 f()
299 }
300 }
301 }
302
303 impl fmt::Debug for Spawner {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result304 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
305 fmt.debug_struct("blocking::Spawner").finish()
306 }
307 }
308