1 //! Thread pool for blocking operations
2 
3 use crate::loom::sync::{Arc, Condvar, Mutex};
4 use crate::loom::thread;
5 use crate::runtime::blocking::schedule::NoopSchedule;
6 use crate::runtime::blocking::shutdown;
7 use crate::runtime::blocking::task::BlockingTask;
8 use crate::runtime::task::{self, JoinHandle};
9 use crate::runtime::{Builder, Callback, Handle};
10 
11 use std::collections::VecDeque;
12 use std::fmt;
13 use std::time::Duration;
14 
15 pub(crate) struct BlockingPool {
16     spawner: Spawner,
17     shutdown_rx: shutdown::Receiver,
18 }
19 
20 #[derive(Clone)]
21 pub(crate) struct Spawner {
22     inner: Arc<Inner>,
23 }
24 
25 struct Inner {
26     /// State shared between worker threads
27     shared: Mutex<Shared>,
28 
29     /// Pool threads wait on this.
30     condvar: Condvar,
31 
32     /// Spawned threads use this name
33     thread_name: String,
34 
35     /// Spawned thread stack size
36     stack_size: Option<usize>,
37 
38     /// Call after a thread starts
39     after_start: Option<Callback>,
40 
41     /// Call before a thread stops
42     before_stop: Option<Callback>,
43 
44     thread_cap: usize,
45 }
46 
47 struct Shared {
48     queue: VecDeque<Task>,
49     num_th: usize,
50     num_idle: u32,
51     num_notify: u32,
52     shutdown: bool,
53     shutdown_tx: Option<shutdown::Sender>,
54 }
55 
56 type Task = task::Notified<NoopSchedule>;
57 
58 const KEEP_ALIVE: Duration = Duration::from_secs(10);
59 
60 /// Run the provided function on an executor dedicated to blocking operations.
spawn_blocking<F, R>(func: F) -> JoinHandle<R> where F: FnOnce() -> R + Send + 'static,61 pub(crate) fn spawn_blocking<F, R>(func: F) -> JoinHandle<R>
62 where
63     F: FnOnce() -> R + Send + 'static,
64 {
65     let rt = Handle::current();
66 
67     let (task, handle) = task::joinable(BlockingTask::new(func));
68     let _ = rt.blocking_spawner.spawn(task, &rt);
69     handle
70 }
71 
72 #[allow(dead_code)]
try_spawn_blocking<F, R>(func: F) -> Result<(), ()> where F: FnOnce() -> R + Send + 'static,73 pub(crate) fn try_spawn_blocking<F, R>(func: F) -> Result<(), ()>
74 where
75     F: FnOnce() -> R + Send + 'static,
76 {
77     let rt = Handle::current();
78 
79     let (task, _handle) = task::joinable(BlockingTask::new(func));
80     rt.blocking_spawner.spawn(task, &rt)
81 }
82 
83 // ===== impl BlockingPool =====
84 
85 impl BlockingPool {
new(builder: &Builder, thread_cap: usize) -> BlockingPool86     pub(crate) fn new(builder: &Builder, thread_cap: usize) -> BlockingPool {
87         let (shutdown_tx, shutdown_rx) = shutdown::channel();
88 
89         BlockingPool {
90             spawner: Spawner {
91                 inner: Arc::new(Inner {
92                     shared: Mutex::new(Shared {
93                         queue: VecDeque::new(),
94                         num_th: 0,
95                         num_idle: 0,
96                         num_notify: 0,
97                         shutdown: false,
98                         shutdown_tx: Some(shutdown_tx),
99                     }),
100                     condvar: Condvar::new(),
101                     thread_name: builder.thread_name.clone(),
102                     stack_size: builder.thread_stack_size,
103                     after_start: builder.after_start.clone(),
104                     before_stop: builder.before_stop.clone(),
105                     thread_cap,
106                 }),
107             },
108             shutdown_rx,
109         }
110     }
111 
spawner(&self) -> &Spawner112     pub(crate) fn spawner(&self) -> &Spawner {
113         &self.spawner
114     }
115 
shutdown(&mut self, timeout: Option<Duration>)116     pub(crate) fn shutdown(&mut self, timeout: Option<Duration>) {
117         let mut shared = self.spawner.inner.shared.lock().unwrap();
118 
119         // The function can be called multiple times. First, by explicitly
120         // calling `shutdown` then by the drop handler calling `shutdown`. This
121         // prevents shutting down twice.
122         if shared.shutdown {
123             return;
124         }
125 
126         shared.shutdown = true;
127         shared.shutdown_tx = None;
128         self.spawner.inner.condvar.notify_all();
129 
130         drop(shared);
131 
132         self.shutdown_rx.wait(timeout);
133     }
134 }
135 
136 impl Drop for BlockingPool {
drop(&mut self)137     fn drop(&mut self) {
138         self.shutdown(None);
139     }
140 }
141 
142 impl fmt::Debug for BlockingPool {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result143     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
144         fmt.debug_struct("BlockingPool").finish()
145     }
146 }
147 
148 // ===== impl Spawner =====
149 
150 impl Spawner {
spawn(&self, task: Task, rt: &Handle) -> Result<(), ()>151     fn spawn(&self, task: Task, rt: &Handle) -> Result<(), ()> {
152         let shutdown_tx = {
153             let mut shared = self.inner.shared.lock().unwrap();
154 
155             if shared.shutdown {
156                 // Shutdown the task
157                 task.shutdown();
158 
159                 // no need to even push this task; it would never get picked up
160                 return Err(());
161             }
162 
163             shared.queue.push_back(task);
164 
165             if shared.num_idle == 0 {
166                 // No threads are able to process the task.
167 
168                 if shared.num_th == self.inner.thread_cap {
169                     // At max number of threads
170                     None
171                 } else {
172                     shared.num_th += 1;
173                     assert!(shared.shutdown_tx.is_some());
174                     shared.shutdown_tx.clone()
175                 }
176             } else {
177                 // Notify an idle worker thread. The notification counter
178                 // is used to count the needed amount of notifications
179                 // exactly. Thread libraries may generate spurious
180                 // wakeups, this counter is used to keep us in a
181                 // consistent state.
182                 shared.num_idle -= 1;
183                 shared.num_notify += 1;
184                 self.inner.condvar.notify_one();
185                 None
186             }
187         };
188 
189         if let Some(shutdown_tx) = shutdown_tx {
190             self.spawn_thread(shutdown_tx, rt);
191         }
192 
193         Ok(())
194     }
195 
spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle)196     fn spawn_thread(&self, shutdown_tx: shutdown::Sender, rt: &Handle) {
197         let mut builder = thread::Builder::new().name(self.inner.thread_name.clone());
198 
199         if let Some(stack_size) = self.inner.stack_size {
200             builder = builder.stack_size(stack_size);
201         }
202 
203         let rt = rt.clone();
204 
205         builder
206             .spawn(move || {
207                 // Only the reference should be moved into the closure
208                 let rt = &rt;
209                 rt.enter(move || {
210                     rt.blocking_spawner.inner.run();
211                     drop(shutdown_tx);
212                 })
213             })
214             .unwrap();
215     }
216 }
217 
218 impl Inner {
run(&self)219     fn run(&self) {
220         if let Some(f) = &self.after_start {
221             f()
222         }
223 
224         let mut shared = self.shared.lock().unwrap();
225 
226         'main: loop {
227             // BUSY
228             while let Some(task) = shared.queue.pop_front() {
229                 drop(shared);
230                 task.run();
231 
232                 shared = self.shared.lock().unwrap();
233             }
234 
235             // IDLE
236             shared.num_idle += 1;
237 
238             while !shared.shutdown {
239                 let lock_result = self.condvar.wait_timeout(shared, KEEP_ALIVE).unwrap();
240 
241                 shared = lock_result.0;
242                 let timeout_result = lock_result.1;
243 
244                 if shared.num_notify != 0 {
245                     // We have received a legitimate wakeup,
246                     // acknowledge it by decrementing the counter
247                     // and transition to the BUSY state.
248                     shared.num_notify -= 1;
249                     break;
250                 }
251 
252                 // Even if the condvar "timed out", if the pool is entering the
253                 // shutdown phase, we want to perform the cleanup logic.
254                 if !shared.shutdown && timeout_result.timed_out() {
255                     break 'main;
256                 }
257 
258                 // Spurious wakeup detected, go back to sleep.
259             }
260 
261             if shared.shutdown {
262                 // Drain the queue
263                 while let Some(task) = shared.queue.pop_front() {
264                     drop(shared);
265                     task.shutdown();
266 
267                     shared = self.shared.lock().unwrap();
268                 }
269 
270                 // Work was produced, and we "took" it (by decrementing num_notify).
271                 // This means that num_idle was decremented once for our wakeup.
272                 // But, since we are exiting, we need to "undo" that, as we'll stay idle.
273                 shared.num_idle += 1;
274                 // NOTE: Technically we should also do num_notify++ and notify again,
275                 // but since we're shutting down anyway, that won't be necessary.
276                 break;
277             }
278         }
279 
280         // Thread exit
281         shared.num_th -= 1;
282 
283         // num_idle should now be tracked exactly, panic
284         // with a descriptive message if it is not the
285         // case.
286         shared.num_idle = shared
287             .num_idle
288             .checked_sub(1)
289             .expect("num_idle underflowed on thread exit");
290 
291         if shared.shutdown && shared.num_th == 0 {
292             self.condvar.notify_one();
293         }
294 
295         drop(shared);
296 
297         if let Some(f) = &self.before_stop {
298             f()
299         }
300     }
301 }
302 
303 impl fmt::Debug for Spawner {
fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result304     fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
305         fmt.debug_struct("blocking::Spawner").finish()
306     }
307 }
308