1 //! Thread pool for blocking operations
2 
3 use crate::loom::sync::{Arc, Condvar, Mutex};
4 use crate::loom::thread;
5 use crate::runtime::blocking::schedule::NoopSchedule;
6 use crate::runtime::blocking::shutdown;
7 use crate::runtime::blocking::task::BlockingTask;
8 use crate::runtime::task::{self, JoinHandle};
9 use crate::runtime::{Builder, Callback, Handle};
10 
11 use slab::Slab;
12 
13 use std::collections::VecDeque;
14 use std::fmt;
15 use std::time::Duration;
16 
17 pub(crate) struct BlockingPool {
18     spawner: Spawner,
19     shutdown_rx: shutdown::Receiver,
20 }
21 
22 #[derive(Clone)]
23 pub(crate) struct Spawner {
24     inner: Arc<Inner>,
25 }
26 
27 struct Inner {
28     /// State shared between worker threads
29     shared: Mutex<Shared>,
30 
31     /// Pool threads wait on this.
32     condvar: Condvar,
33 
34     /// Spawned threads use this name
35     thread_name: String,
36 
37     /// Spawned thread stack size
38     stack_size: Option<usize>,
39 
40     /// Call after a thread starts
41     after_start: Option<Callback>,
42 
43     /// Call before a thread stops
44     before_stop: Option<Callback>,
45 
46     // Maximum number of threads
47     thread_cap: usize,
48 }
49 
50 struct Shared {
51     queue: VecDeque<Task>,
52     num_th: usize,
53     num_idle: u32,
54     num_notify: u32,
55     shutdown: bool,
56     shutdown_tx: Option<shutdown::Sender>,
57     worker_threads: Slab<thread::JoinHandle<()>>,
58 }
59 
60 type Task = task::Notified<NoopSchedule>;
61 
62 const KEEP_ALIVE: Duration = Duration::from_secs(10);
63 
64 /// Run the provided function on an executor dedicated to blocking operations.
spawn_blocking<F, R>(func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static,65 pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
66 where
67     F: FnOnce() -> R + Send + 'static,
68 {
69     let rt = Handle::current();
70 
71     let (task, handle) = task::joinable(BlockingTask::new(func));
72     let _ = rt.blocking_spawner.spawn(task, &rt);
73     handle
74 }
75 
76 #[allow(dead_code)]
try_spawn_blocking<F, R>(func: F) -> Result<(), ()> where F: FnOnce() -> R + Send + 'static,77 pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()>
78 where
79     F: FnOnce() -> R + Send + 'static,
80 {
81     let rt = Handle::current();
82 
83     let (task, _handle) = task::joinable(BlockingTask::new(func));
84     rt.blocking_spawner.spawn(task, &rt)
85 }
86 
87 // ===== impl BlockingPool =====
88 
89 impl BlockingPool {
new(builder: &Builder, thread_cap: usize) -> BlockingPool90     pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
91         let (shutdown_tx, shutdown_rx) = shutdown::channel();
92 
93         BlockingPool {
94             spawner: Spawner {
95                 inner: Arc::new(Inner {
96                     shared: Mutex::new(Shared {
97                         queue: VecDeque::new(),
98                         num_th: 0,
99                         num_idle: 0,
100                         num_notify: 0,
101                         shutdown: false,
102                         shutdown_tx: Some(shutdown_tx),
103                         worker_threads: Slab::new(),
104                     }),
105                     condvar: Condvar::new(),
106                     thread_name: builder.thread_name.clone(),
107                     stack_size: builder.thread_stack_size,
108                     after_start: builder.after_start.clone(),
109                     before_stop: builder.before_stop.clone(),
110                     thread_cap,
111                 }),
112             },
113             shutdown_rx,
114         }
115     }
116 
spawner(&self) -> &Spawner117     pub(crate) fn spawner(&self) -> &Spawner {
118         &self.spawner
119     }
120 
shutdown(&mut self, timeout: Option<Duration>)121     pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) {
122         let mut shared = self.spawner.inner.shared.lock().unwrap();
123 
124         // The function can be called multiple times. First, by explicitly
125         // calling `shutdown` then by the drop handler calling `shutdown`. This
126         // prevents shutting down twice.
127         if shared.shutdown {
128             return;
129         }
130 
131         shared.shutdown = true;
132         shared.shutdown_tx = None;
133         self.spawner.inner.condvar.notify_all();
134         let mut workers = std::mem::replace(&mut shared.worker_threads, Slab::new());
135 
136         drop(shared);
137 
138         if self.shutdown_rx.wait(timeout) {
139             for handle in workers.drain() {
140                 let _ = handle.join();
141             }
142         }
143     }
144 }
145 
146 impl Drop for BlockingPool {
drop(&mut self)147     fn drop(&mut self) {
148         self.shutdown(None);
149     }
150 }
151 
152 impl fmt::Debug for BlockingPool {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result153     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
154         fmt.debug_struct("BlockingPool").finish()
155     }
156 }
157 
158 // ===== impl Spawner =====
159 
160 impl Spawner {
spawn(&self, task: Task, rt: &Handle) -> Result<(), ()>161     pub(crate) fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
162         let shutdown_tx = {
163             let mut shared = self.inner.shared.lock().unwrap();
164 
165             if shared.shutdown {
166                 // Shutdown the task
167                 task.shutdown();
168 
169                 // no need to even push this task; it would never get picked up
170                 return Err(());
171             }
172 
173             shared.queue.push_back(task);
174 
175             if shared.num_idle == 0 {
176                 // No threads are able to process the task.
177 
178                 if shared.num_th == self.inner.thread_cap {
179                     // At max number of threads
180                     None
181                 } else {
182                     shared.num_th += 1;
183                     assert!(shared.shutdown_tx.is_some());
184                     shared.shutdown_tx.clone()
185                 }
186             } else {
187                 // Notify an idle worker thread. The notification counter
188                 // is used to count the needed amount of notifications
189                 // exactly. Thread libraries may generate spurious
190                 // wakeups, this counter is used to keep us in a
191                 // consistent state.
192                 shared.num_idle -= 1;
193                 shared.num_notify += 1;
194                 self.inner.condvar.notify_one();
195                 None
196             }
197         };
198 
199         if let Some(shutdown_tx) = shutdown_tx {
200             let mut shared = self.inner.shared.lock().unwrap();
201             let entry = shared.worker_threads.vacant_entry();
202 
203             let handle = self.spawn_thread(shutdown_tx, rt, entry.key());
204 
205             entry.insert(handle);
206         }
207 
208         Ok(())
209     }
210 
spawn_thread( &self, shutdown_tx: shutdown::Sender, rt: &Handle, worker_id: usize, ) -> thread::JoinHandle<()>211     fn spawn_thread(
212         &self,
213         shutdown_tx: shutdown::Sender,
214         rt: &Handle,
215         worker_id: usize,
216     ) -> thread::JoinHandle<()> {
217         let mut builder = thread::Builder::new().name(self.inner.thread_name.clone());
218 
219         if let Some(stack_size) = self.inner.stack_size {
220             builder = builder.stack_size(stack_size);
221         }
222 
223         let rt = rt.clone();
224 
225         builder
226             .spawn(move || {
227                 // Only the reference should be moved into the closure
228                 let rt = &rt;
229                 rt.enter(move || {
230                     rt.blocking_spawner.inner.run(worker_id);
231                     drop(shutdown_tx);
232                 })
233             })
234             .unwrap()
235     }
236 }
237 
238 impl Inner {
run(&self, worker_id: usize)239     fn run(&self, worker_id: usize) {
240         if let Some(f) = &self.after_start {
241             f()
242         }
243 
244         let mut shared = self.shared.lock().unwrap();
245 
246         'main: loop {
247             // BUSY
248             while let Some(task) = shared.queue.pop_front() {
249                 drop(shared);
250                 task.run();
251 
252                 shared = self.shared.lock().unwrap();
253             }
254 
255             // IDLE
256             shared.num_idle += 1;
257 
258             while !shared.shutdown {
259                 let lock_result = self.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap();
260 
261                 shared = lock_result.0;
262                 let timeout_result = lock_result.1;
263 
264                 if shared.num_notify != 0 {
265                     // We have received a legitimate wakeup,
266                     // acknowledge it by decrementing the counter
267                     // and transition to the BUSY state.
268                     shared.num_notify -= 1;
269                     break;
270                 }
271 
272                 // Even if the condvar "timed out", if the pool is entering the
273                 // shutdown phase, we want to perform the cleanup logic.
274                 if !shared.shutdown && timeout_result.timed_out() {
275                     shared.worker_threads.remove(worker_id);
276 
277                     break 'main;
278                 }
279 
280                 // Spurious wakeup detected, go back to sleep.
281             }
282 
283             if shared.shutdown {
284                 // Drain the queue
285                 while let Some(task) = shared.queue.pop_front() {
286                     drop(shared);
287                     task.shutdown();
288 
289                     shared = self.shared.lock().unwrap();
290                 }
291 
292                 // Work was produced, and we "took" it (by decrementing num_notify).
293                 // This means that num_idle was decremented once for our wakeup.
294                 // But, since we are exiting, we need to "undo" that, as we'll stay idle.
295                 shared.num_idle += 1;
296                 // NOTE: Technically we should also do num_notify++ and notify again,
297                 // but since we're shutting down anyway, that won't be necessary.
298                 break;
299             }
300         }
301 
302         // Thread exit
303         shared.num_th -= 1;
304 
305         // num_idle should now be tracked exactly, panic
306         // with a descriptive message if it is not the
307         // case.
308         shared.num_idle = shared
309             .num_idle
310             .checked_sub(1)
311             .expect("num_idle underflowed on thread exit");
312 
313         if shared.shutdown && shared.num_th == 0 {
314             self.condvar.notify_one();
315         }
316 
317         drop(shared);
318 
319         if let Some(f) = &self.before_stop {
320             f()
321         }
322     }
323 }
324 
325 impl fmt::Debug for Spawner {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result326     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
327         fmt.debug_struct("blocking::Spawner").finish()
328     }
329 }
330