1 use crate::Task;
2 use async_channel::{Receiver, Sender};
3 use async_mutex::Mutex;
4 use futures_lite::future;
5 use once_cell::sync::OnceCell;
6 use std::{io, thread};
7
8 // The current number of threads (some might be shutting down and not in the pool anymore)
9 static GLOBAL_EXECUTOR_THREADS_NUMBER: Mutex<usize> = Mutex::new(0);
10 // The expected number of threads (excluding the one that are shutting down)
11 static GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER: Mutex<usize> = Mutex::new(0);
12
13 thread_local! {
14 // Used to shutdown a thread when we receive a message from the Sender.
15 // We send an ack using to the Receiver once we're finished shutting down.
16 static THREAD_SHUTDOWN: OnceCell<(Sender<()>, Receiver<()>)> = OnceCell::new();
17 }
18
19 /// Spawn more executor threads, up to configured max value.
20 ///
21 /// Returns how many threads we spawned.
22 ///
23 /// # Examples
24 ///
25 /// ```
26 /// async_global_executor::spawn_more_threads(2);
27 /// ```
spawn_more_threads(count: usize) -> io::Result<usize>28 pub async fn spawn_more_threads(count: usize) -> io::Result<usize> {
29 // Get the current configuration, or initialize the thread pool.
30 let config = crate::config::GLOBAL_EXECUTOR_CONFIG
31 .get()
32 .unwrap_or_else(|| {
33 crate::init();
34 crate::config::GLOBAL_EXECUTOR_CONFIG.get().unwrap()
35 });
36 // How many threads do we have (including shutting down)
37 let mut threads_number = GLOBAL_EXECUTOR_THREADS_NUMBER.lock().await;
38 // How many threads are we supposed to have (when all shutdowns are complete)
39 let mut expected_threads_number = GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER.lock().await;
40 // Ensure we don't exceed configured max threads (including shutting down)
41 let count = count.min(config.max_threads - *threads_number);
42 for _ in 0..count {
43 thread::Builder::new()
44 .name((config.thread_name_fn)())
45 .spawn(thread_main_loop)?;
46 *threads_number += 1;
47 *expected_threads_number += 1;
48 }
49 Ok(count)
50 }
51
52 /// Stop one of the executor threads, down to configured min value
53 ///
54 /// Returns whether a thread has been stopped.
55 ///
56 /// # Examples
57 ///
58 /// ```
59 /// async_global_executor::stop_thread();
60 /// ```
stop_thread() -> Task<bool>61 pub fn stop_thread() -> Task<bool> {
62 crate::spawn(stop_current_executor_thread())
63 }
64
65 /// Stop the current executor thread, if we exceed the configured min value
66 ///
67 /// Returns whether the thread has been stopped.
68 ///
69 /// # Examples
70 ///
71 /// ```
72 /// async_global_executor::stop_current_thread();
73 /// ```
stop_current_thread() -> Task<bool>74 pub fn stop_current_thread() -> Task<bool> {
75 crate::spawn_local(stop_current_executor_thread())
76 }
77
thread_main_loop()78 fn thread_main_loop() {
79 // This will be used to ask for shutdown.
80 let (s, r) = async_channel::bounded(1);
81 // This wil be used to ack once shutdown is complete.
82 let (s_ack, r_ack) = async_channel::bounded(1);
83 THREAD_SHUTDOWN.with(|thread_shutdown| drop(thread_shutdown.set((s, r_ack))));
84
85 // Main loop
86 loop {
87 #[allow(clippy::blocks_in_if_conditions)]
88 if std::panic::catch_unwind(|| {
89 crate::executor::LOCAL_EXECUTOR.with(|executor| {
90 let local = executor.run(async {
91 // Wait until we're asked to shutdown.
92 let _ = r.recv().await;
93 });
94 let global = crate::executor::GLOBAL_EXECUTOR.run(future::pending::<()>());
95 crate::reactor::block_on(future::or(local, global));
96 });
97 })
98 .is_ok()
99 {
100 break;
101 }
102 }
103
104 wait_for_local_executor_completion();
105
106 // Ack that we're done shutting down.
107 crate::reactor::block_on(async {
108 let _ = s_ack.send(()).await;
109 });
110 }
111
wait_for_local_executor_completion()112 fn wait_for_local_executor_completion() {
113 loop {
114 #[allow(clippy::blocks_in_if_conditions)]
115 if std::panic::catch_unwind(|| {
116 crate::executor::LOCAL_EXECUTOR.with(|executor| {
117 crate::reactor::block_on(async {
118 // Wait for spawned tasks completion
119 while !executor.is_empty() {
120 executor.tick().await;
121 }
122 });
123 });
124 })
125 .is_ok()
126 {
127 break;
128 }
129 }
130 }
131
stop_current_executor_thread() -> bool132 async fn stop_current_executor_thread() -> bool {
133 // How many threads are we supposed to have (when all shutdowns are complete)
134 let mut expected_threads_number = GLOBAL_EXECUTOR_EXPECTED_THREADS_NUMBER.lock().await;
135 // Ensure we don't go below the configured min_threads (ignoring shutting down)
136 if *expected_threads_number
137 > crate::config::GLOBAL_EXECUTOR_CONFIG
138 .get()
139 .unwrap()
140 .min_threads
141 {
142 let (s, r_ack) =
143 THREAD_SHUTDOWN.with(|thread_shutdown| thread_shutdown.get().unwrap().clone());
144 let _ = s.send(()).await;
145 // We now expect to have one less thread (this one is shutting down)
146 *expected_threads_number -= 1;
147 // Unlock the Mutex
148 drop(expected_threads_number);
149 let _ = r_ack.recv().await;
150 // This thread is done shutting down
151 *GLOBAL_EXECUTOR_THREADS_NUMBER.lock().await -= 1;
152 true
153 } else {
154 false
155 }
156 }
157