1 use crate::Task;
2 use async_channel::{Receiver, Sender};
3 use async_mutex::Mutex;
4 use futures_lite::future;
5 use once_cell::sync::OnceCell;
6 use std::{io, thread};
7 
8 // The current number of threads (some might be shutting down and not in the pool anymore)
9 static GLOBAL_EXECUTOR_THREADS_NUMBER: Mutex<usize> = Mutex::new(0);
10 // The expected number of threads (excluding the one that are shutting down)
11 static GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER: Mutex<usize> = Mutex::new(0);
12 
13 thread_local! {
14     // Used to shutdown a thread when we receive a message from the Sender.
15     // We send an ack using to the Receiver once we're finished shutting down.
16     static THREAD_SHUTDOWN: OnceCell<(Sender<()>, Receiver<()>)> = OnceCell::new();
17 }
18 
19 /// Spawn more executor threads, up to configured max value.
20 ///
21 /// Returns how many threads we spawned.
22 ///
23 /// # Examples
24 ///
25 /// ```
26 /// async_global_executor::spawn_more_threads(2);
27 /// ```
spawn_more_threads(count: usize) -> io::Result<usize>28 pub async fn spawn_more_threads(count: usize) -> io::Result<usize> {
29     // Get the current configuration, or initialize the thread pool.
30     let config = crate::config::GLOBAL_EXECUTOR_CONFIG
31         .get()
32         .unwrap_or_else(|| {
33             crate::init();
34             crate::config::GLOBAL_EXECUTOR_CONFIG.get().unwrap()
35         });
36     // How many threads do we have (including shutting down)
37     let mut threads_number = GLOBAL_EXECUTOR_THREADS_NUMBER.lock().await;
38     // How many threads are we supposed to have (when all shutdowns are complete)
39     let mut expected_threads_number = GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER.lock().await;
40     // Ensure we don't exceed configured max threads (including shutting down)
41     let count = count.min(config.max_threads - *threads_number);
42     for _ in 0..count {
43         thread::Builder::new()
44             .name((config.thread_name_fn)())
45             .spawn(thread_main_loop)?;
46         *threads_number += 1;
47         *expected_threads_number += 1;
48     }
49     Ok(count)
50 }
51 
52 /// Stop one of the executor threads, down to configured min value
53 ///
54 /// Returns whether a thread has been stopped.
55 ///
56 /// # Examples
57 ///
58 /// ```
59 /// async_global_executor::stop_thread();
60 /// ```
stop_thread() -> Task<bool>61 pub fn stop_thread() -> Task<bool> {
62     crate::spawn(stop_current_executor_thread())
63 }
64 
65 /// Stop the current executor thread, if we exceed the configured min value
66 ///
67 /// Returns whether the thread has been stopped.
68 ///
69 /// # Examples
70 ///
71 /// ```
72 /// async_global_executor::stop_current_thread();
73 /// ```
stop_current_thread() -> Task<bool>74 pub fn stop_current_thread() -> Task<bool> {
75     crate::spawn_local(stop_current_executor_thread())
76 }
77 
thread_main_loop()78 fn thread_main_loop() {
79     // This will be used to ask for shutdown.
80     let (s, r) = async_channel::bounded(1);
81     // This wil be used to ack once shutdown is complete.
82     let (s_ack, r_ack) = async_channel::bounded(1);
83     THREAD_SHUTDOWN.with(|thread_shutdown| drop(thread_shutdown.set((s, r_ack))));
84 
85     // Main loop
86     loop {
87         #[allow(clippy::blocks_in_if_conditions)]
88         if std::panic::catch_unwind(|| {
89             crate::executor::LOCAL_EXECUTOR.with(|executor| {
90                 let local = executor.run(async {
91                     // Wait until we're asked to shutdown.
92                     let _ = r.recv().await;
93                 });
94                 let global = crate::executor::GLOBAL_EXECUTOR.run(future::pending::<()>());
95                 crate::reactor::block_on(future::or(local, global));
96             });
97         })
98         .is_ok()
99         {
100             break;
101         }
102     }
103 
104     wait_for_local_executor_completion();
105 
106     // Ack that we're done shutting down.
107     crate::reactor::block_on(async {
108         let _ = s_ack.send(()).await;
109     });
110 }
111 
wait_for_local_executor_completion()112 fn wait_for_local_executor_completion() {
113     loop {
114         #[allow(clippy::blocks_in_if_conditions)]
115         if std::panic::catch_unwind(|| {
116             crate::executor::LOCAL_EXECUTOR.with(|executor| {
117                 crate::reactor::block_on(async {
118                     // Wait for spawned tasks completion
119                     while !executor.is_empty() {
120                         executor.tick().await;
121                     }
122                 });
123             });
124         })
125         .is_ok()
126         {
127             break;
128         }
129     }
130 }
131 
stop_current_executor_thread() -> bool132 async fn stop_current_executor_thread() -> bool {
133     // How many threads are we supposed to have (when all shutdowns are complete)
134     let mut expected_threads_number = GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER.lock().await;
135     // Ensure we don't go below the configured min_threads (ignoring shutting down)
136     if *expected_threads_number
137         > crate::config::GLOBAL_EXECUTOR_CONFIG
138             .get()
139             .unwrap()
140             .min_threads
141     {
142         let (s, r_ack) =
143             THREAD_SHUTDOWN.with(|thread_shutdown| thread_shutdown.get().unwrap().clone());
144         let _ = s.send(()).await;
145         // We now expect to have one less thread (this one is shutting down)
146         *expected_threads_number -= 1;
147         // Unlock the Mutex
148         drop(expected_threads_number);
149         let _ = r_ack.recv().await;
150         // This thread is done shutting down
151         *GLOBAL_EXECUTOR_THREADS_NUMBER.lock().await -= 1;
152         true
153     } else {
154         false
155     }
156 }
157