1 //! Thread pool for blocking operations
2
3 use crate::loom::sync::{Arc, Condvar, Mutex};
4 use crate::loom::thread;
5 use crate::runtime::blocking::schedule::NoopSchedule;
6 use crate::runtime::blocking::shutdown;
7 use crate::runtime::blocking::task::BlockingTask;
8 use crate::runtime::task::{self, JoinHandle};
9 use crate::runtime::{Builder, Callback, Handle};
10
11 use slab::Slab;
12
13 use std::collections::VecDeque;
14 use std::fmt;
15 use std::time::Duration;
16
17 pub(crate) struct BlockingPool {
18 spawner: Spawner,
19 shutdown_rx: shutdown::Receiver,
20 }
21
22 #[derive(Clone)]
23 pub(crate) struct Spawner {
24 inner: Arc<Inner>,
25 }
26
27 struct Inner {
28 /// State shared between worker threads
29 shared: Mutex<Shared>,
30
31 /// Pool threads wait on this.
32 condvar: Condvar,
33
34 /// Spawned threads use this name
35 thread_name: String,
36
37 /// Spawned thread stack size
38 stack_size: Option<usize>,
39
40 /// Call after a thread starts
41 after_start: Option<Callback>,
42
43 /// Call before a thread stops
44 before_stop: Option<Callback>,
45
46 // Maximum number of threads
47 thread_cap: usize,
48 }
49
50 struct Shared {
51 queue: VecDeque<Task>,
52 num_th: usize,
53 num_idle: u32,
54 num_notify: u32,
55 shutdown: bool,
56 shutdown_tx: Option<shutdown::Sender>,
57 worker_threads: Slab<thread::JoinHandle<()>>,
58 }
59
60 type Task = task::Notified<NoopSchedule>;
61
62 const KEEP_ALIVE: Duration = Duration::from_secs(10);
63
64 /// Run the provided function on an executor dedicated to blocking operations.
spawn_blocking<F, R>(func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static,65 pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
66 where
67 F: FnOnce() -> R + Send + 'static,
68 {
69 let rt = Handle::current();
70
71 let (task, handle) = task::joinable(BlockingTask::new(func));
72 let _ = rt.blocking_spawner.spawn(task, &rt);
73 handle
74 }
75
76 #[allow(dead_code)]
try_spawn_blocking<F, R>(func: F) -> Result<(), ()> where F: FnOnce() -> R + Send + 'static,77 pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()>
78 where
79 F: FnOnce() -> R + Send + 'static,
80 {
81 let rt = Handle::current();
82
83 let (task, _handle) = task::joinable(BlockingTask::new(func));
84 rt.blocking_spawner.spawn(task, &rt)
85 }
86
87 // ===== impl BlockingPool =====
88
89 impl BlockingPool {
new(builder: &Builder, thread_cap: usize) -> BlockingPool90 pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
91 let (shutdown_tx, shutdown_rx) = shutdown::channel();
92
93 BlockingPool {
94 spawner: Spawner {
95 inner: Arc::new(Inner {
96 shared: Mutex::new(Shared {
97 queue: VecDeque::new(),
98 num_th: 0,
99 num_idle: 0,
100 num_notify: 0,
101 shutdown: false,
102 shutdown_tx: Some(shutdown_tx),
103 worker_threads: Slab::new(),
104 }),
105 condvar: Condvar::new(),
106 thread_name: builder.thread_name.clone(),
107 stack_size: builder.thread_stack_size,
108 after_start: builder.after_start.clone(),
109 before_stop: builder.before_stop.clone(),
110 thread_cap,
111 }),
112 },
113 shutdown_rx,
114 }
115 }
116
spawner(&self) -> &Spawner117 pub(crate) fn spawner(&self) -> &Spawner {
118 &self.spawner
119 }
120
shutdown(&mut self, timeout: Option<Duration>)121 pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) {
122 let mut shared = self.spawner.inner.shared.lock().unwrap();
123
124 // The function can be called multiple times. First, by explicitly
125 // calling `shutdown` then by the drop handler calling `shutdown`. This
126 // prevents shutting down twice.
127 if shared.shutdown {
128 return;
129 }
130
131 shared.shutdown = true;
132 shared.shutdown_tx = None;
133 self.spawner.inner.condvar.notify_all();
134 let mut workers = std::mem::replace(&mut shared.worker_threads, Slab::new());
135
136 drop(shared);
137
138 if self.shutdown_rx.wait(timeout) {
139 for handle in workers.drain() {
140 let _ = handle.join();
141 }
142 }
143 }
144 }
145
146 impl Drop for BlockingPool {
drop(&mut self)147 fn drop(&mut self) {
148 self.shutdown(None);
149 }
150 }
151
152 impl fmt::Debug for BlockingPool {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result153 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
154 fmt.debug_struct("BlockingPool").finish()
155 }
156 }
157
158 // ===== impl Spawner =====
159
160 impl Spawner {
spawn(&self, task: Task, rt: &Handle) -> Result<(), ()>161 pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
162 let shutdown_tx = {
163 let mut shared = self.inner.shared.lock().unwrap();
164
165 if shared.shutdown {
166 // Shutdown the task
167 task.shutdown();
168
169 // no need to even push this task; it would never get picked up
170 return Err(());
171 }
172
173 shared.queue.push_back(task);
174
175 if shared.num_idle == 0 {
176 // No threads are able to process the task.
177
178 if shared.num_th == self.inner.thread_cap {
179 // At max number of threads
180 None
181 } else {
182 shared.num_th += 1;
183 assert!(shared.shutdown_tx.is_some());
184 shared.shutdown_tx.clone()
185 }
186 } else {
187 // Notify an idle worker thread. The notification counter
188 // is used to count the needed amount of notifications
189 // exactly. Thread libraries may generate spurious
190 // wakeups, this counter is used to keep us in a
191 // consistent state.
192 shared.num_idle -= 1;
193 shared.num_notify += 1;
194 self.inner.condvar.notify_one();
195 None
196 }
197 };
198
199 if let Some(shutdown_tx) = shutdown_tx {
200 let mut shared = self.inner.shared.lock().unwrap();
201 let entry = shared.worker_threads.vacant_entry();
202
203 let handle = self.spawn_thread(shutdown_tx, rt, entry.key());
204
205 entry.insert(handle);
206 }
207
208 Ok(())
209 }
210
spawn_thread( &self, shutdown_tx: shutdown::Sender, rt: &Handle, worker_id: usize, ) -> thread::JoinHandle<()>211 fn spawn_thread(
212 &self,
213 shutdown_tx: shutdown::Sender,
214 rt: &Handle,
215 worker_id: usize,
216 ) -> thread::JoinHandle<()> {
217 let mut builder = thread::Builder::new().name(self.inner.thread_name.clone());
218
219 if let Some(stack_size) = self.inner.stack_size {
220 builder = builder.stack_size(stack_size);
221 }
222
223 let rt = rt.clone();
224
225 builder
226 .spawn(move || {
227 // Only the reference should be moved into the closure
228 let rt = &rt;
229 rt.enter(move || {
230 rt.blocking_spawner.inner.run(worker_id);
231 drop(shutdown_tx);
232 })
233 })
234 .unwrap()
235 }
236 }
237
238 impl Inner {
run(&self, worker_id: usize)239 fn run(&self, worker_id: usize) {
240 if let Some(f) = &self.after_start {
241 f()
242 }
243
244 let mut shared = self.shared.lock().unwrap();
245
246 'main: loop {
247 // BUSY
248 while let Some(task) = shared.queue.pop_front() {
249 drop(shared);
250 task.run();
251
252 shared = self.shared.lock().unwrap();
253 }
254
255 // IDLE
256 shared.num_idle += 1;
257
258 while !shared.shutdown {
259 let lock_result = self.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap();
260
261 shared = lock_result.0;
262 let timeout_result = lock_result.1;
263
264 if shared.num_notify != 0 {
265 // We have received a legitimate wakeup,
266 // acknowledge it by decrementing the counter
267 // and transition to the BUSY state.
268 shared.num_notify -= 1;
269 break;
270 }
271
272 // Even if the condvar "timed out", if the pool is entering the
273 // shutdown phase, we want to perform the cleanup logic.
274 if !shared.shutdown && timeout_result.timed_out() {
275 shared.worker_threads.remove(worker_id);
276
277 break 'main;
278 }
279
280 // Spurious wakeup detected, go back to sleep.
281 }
282
283 if shared.shutdown {
284 // Drain the queue
285 while let Some(task) = shared.queue.pop_front() {
286 drop(shared);
287 task.shutdown();
288
289 shared = self.shared.lock().unwrap();
290 }
291
292 // Work was produced, and we "took" it (by decrementing num_notify).
293 // This means that num_idle was decremented once for our wakeup.
294 // But, since we are exiting, we need to "undo" that, as we'll stay idle.
295 shared.num_idle += 1;
296 // NOTE: Technically we should also do num_notify++ and notify again,
297 // but since we're shutting down anyway, that won't be necessary.
298 break;
299 }
300 }
301
302 // Thread exit
303 shared.num_th -= 1;
304
305 // num_idle should now be tracked exactly, panic
306 // with a descriptive message if it is not the
307 // case.
308 shared.num_idle = shared
309 .num_idle
310 .checked_sub(1)
311 .expect("num_idle underflowed on thread exit");
312
313 if shared.shutdown && shared.num_th == 0 {
314 self.condvar.notify_one();
315 }
316
317 drop(shared);
318
319 if let Some(f) = &self.before_stop {
320 f()
321 }
322 }
323 }
324
325 impl fmt::Debug for Spawner {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result326 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
327 fmt.debug_struct("blocking::Spawner").finish()
328 }
329 }
330