1 #![cfg(test)]
2 
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::sync::mpsc::channel;
5 use std::sync::{Arc, Mutex};
6 
7 #[allow(deprecated)]
8 use crate::Configuration;
9 use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder};
10 
11 #[test]
12 #[should_panic(expected = "Hello, world!")]
panic_propagate()13 fn panic_propagate() {
14     let thread_pool = ThreadPoolBuilder::new().build().unwrap();
15     thread_pool.install(|| {
16         panic!("Hello, world!");
17     });
18 }
19 
20 #[test]
workers_stop()21 fn workers_stop() {
22     let registry;
23 
24     {
25         // once we exit this block, thread-pool will be dropped
26         let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
27         registry = thread_pool.install(|| {
28             // do some work on these threads
29             join_a_lot(22);
30 
31             thread_pool.registry.clone()
32         });
33         assert_eq!(registry.num_threads(), 22);
34     }
35 
36     // once thread-pool is dropped, registry should terminate, which
37     // should lead to worker threads stopping
38     registry.wait_until_stopped();
39 }
40 
join_a_lot(n: usize)41 fn join_a_lot(n: usize) {
42     if n > 0 {
43         join(|| join_a_lot(n - 1), || join_a_lot(n - 1));
44     }
45 }
46 
47 #[test]
sleeper_stop()48 fn sleeper_stop() {
49     use std::{thread, time};
50 
51     let registry;
52 
53     {
54         // once we exit this block, thread-pool will be dropped
55         let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
56         registry = thread_pool.registry.clone();
57 
58         // Give time for at least some of the thread pool to fall asleep.
59         thread::sleep(time::Duration::from_secs(1));
60     }
61 
62     // once thread-pool is dropped, registry should terminate, which
63     // should lead to worker threads stopping
64     registry.wait_until_stopped();
65 }
66 
67 /// Creates a start/exit handler that increments an atomic counter.
count_handler() -> (Arc<AtomicUsize>, impl Fn(usize))68 fn count_handler() -> (Arc<AtomicUsize>, impl Fn(usize)) {
69     let count = Arc::new(AtomicUsize::new(0));
70     (count.clone(), move |_| {
71         count.fetch_add(1, Ordering::SeqCst);
72     })
73 }
74 
75 /// Wait until a counter is no longer shared, then return its value.
wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize76 fn wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize {
77     use std::{thread, time};
78 
79     for _ in 0..60 {
80         counter = match Arc::try_unwrap(counter) {
81             Ok(counter) => return counter.into_inner(),
82             Err(counter) => {
83                 thread::sleep(time::Duration::from_secs(1));
84                 counter
85             }
86         };
87     }
88 
89     // That's too long!
90     panic!("Counter is still shared!");
91 }
92 
93 #[test]
failed_thread_stack()94 fn failed_thread_stack() {
95     // Note: we first tried to force failure with a `usize::MAX` stack, but
96     // macOS and Windows weren't fazed, or at least didn't fail the way we want.
97     // They work with `isize::MAX`, but 32-bit platforms may feasibly allocate a
98     // 2GB stack, so it might not fail until the second thread.
99     let stack_size = ::std::isize::MAX as usize;
100 
101     let (start_count, start_handler) = count_handler();
102     let (exit_count, exit_handler) = count_handler();
103     let builder = ThreadPoolBuilder::new()
104         .num_threads(10)
105         .stack_size(stack_size)
106         .start_handler(start_handler)
107         .exit_handler(exit_handler);
108 
109     let pool = builder.build();
110     assert!(pool.is_err(), "thread stack should have failed!");
111 
112     // With such a huge stack, 64-bit will probably fail on the first thread;
113     // 32-bit might manage the first 2GB, but certainly fail the second.
114     let start_count = wait_for_counter(start_count);
115     assert!(start_count <= 1);
116     assert_eq!(start_count, wait_for_counter(exit_count));
117 }
118 
119 #[test]
panic_thread_name()120 fn panic_thread_name() {
121     let (start_count, start_handler) = count_handler();
122     let (exit_count, exit_handler) = count_handler();
123     let builder = ThreadPoolBuilder::new()
124         .num_threads(10)
125         .start_handler(start_handler)
126         .exit_handler(exit_handler)
127         .thread_name(|i| {
128             if i >= 5 {
129                 panic!();
130             }
131             format!("panic_thread_name#{}", i)
132         });
133 
134     let pool = crate::unwind::halt_unwinding(|| builder.build());
135     assert!(pool.is_err(), "thread-name panic should propagate!");
136 
137     // Assuming they're created in order, threads 0 through 4 should have
138     // been started already, and then terminated by the panic.
139     assert_eq!(5, wait_for_counter(start_count));
140     assert_eq!(5, wait_for_counter(exit_count));
141 }
142 
143 #[test]
self_install()144 fn self_install() {
145     let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
146 
147     // If the inner `install` blocks, then nothing will actually run it!
148     assert!(pool.install(|| pool.install(|| true)));
149 }
150 
151 #[test]
mutual_install()152 fn mutual_install() {
153     let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
154     let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
155 
156     let ok = pool1.install(|| {
157         // This creates a dependency from `pool1` -> `pool2`
158         pool2.install(|| {
159             // This creates a dependency from `pool2` -> `pool1`
160             pool1.install(|| {
161                 // If they blocked on inter-pool installs, there would be no
162                 // threads left to run this!
163                 true
164             })
165         })
166     });
167     assert!(ok);
168 }
169 
170 #[test]
mutual_install_sleepy()171 fn mutual_install_sleepy() {
172     use std::{thread, time};
173 
174     let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
175     let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
176 
177     let ok = pool1.install(|| {
178         // This creates a dependency from `pool1` -> `pool2`
179         pool2.install(|| {
180             // Give `pool1` time to fall asleep.
181             thread::sleep(time::Duration::from_secs(1));
182 
183             // This creates a dependency from `pool2` -> `pool1`
184             pool1.install(|| {
185                 // Give `pool2` time to fall asleep.
186                 thread::sleep(time::Duration::from_secs(1));
187 
188                 // If they blocked on inter-pool installs, there would be no
189                 // threads left to run this!
190                 true
191             })
192         })
193     });
194     assert!(ok);
195 }
196 
197 #[test]
198 #[allow(deprecated)]
check_thread_pool_new()199 fn check_thread_pool_new() {
200     let pool = ThreadPool::new(Configuration::new().num_threads(22)).unwrap();
201     assert_eq!(pool.current_num_threads(), 22);
202 }
203 
204 macro_rules! test_scope_order {
205     ($scope:ident => $spawn:ident) => {{
206         let builder = ThreadPoolBuilder::new().num_threads(1);
207         let pool = builder.build().unwrap();
208         pool.install(|| {
209             let vec = Mutex::new(vec![]);
210             pool.$scope(|scope| {
211                 let vec = &vec;
212                 for i in 0..10 {
213                     scope.$spawn(move |_| {
214                         vec.lock().unwrap().push(i);
215                     });
216                 }
217             });
218             vec.into_inner().unwrap()
219         })
220     }};
221 }
222 
223 #[test]
scope_lifo_order()224 fn scope_lifo_order() {
225     let vec = test_scope_order!(scope => spawn);
226     let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
227     assert_eq!(vec, expected);
228 }
229 
230 #[test]
scope_fifo_order()231 fn scope_fifo_order() {
232     let vec = test_scope_order!(scope_fifo => spawn_fifo);
233     let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
234     assert_eq!(vec, expected);
235 }
236 
237 macro_rules! test_spawn_order {
238     ($spawn:ident) => {{
239         let builder = ThreadPoolBuilder::new().num_threads(1);
240         let pool = &builder.build().unwrap();
241         let (tx, rx) = channel();
242         pool.install(move || {
243             for i in 0..10 {
244                 let tx = tx.clone();
245                 pool.$spawn(move || {
246                     tx.send(i).unwrap();
247                 });
248             }
249         });
250         rx.iter().collect::<Vec<i32>>()
251     }};
252 }
253 
254 #[test]
spawn_lifo_order()255 fn spawn_lifo_order() {
256     let vec = test_spawn_order!(spawn);
257     let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
258     assert_eq!(vec, expected);
259 }
260 
261 #[test]
spawn_fifo_order()262 fn spawn_fifo_order() {
263     let vec = test_spawn_order!(spawn_fifo);
264     let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
265     assert_eq!(vec, expected);
266 }
267 
268 #[test]
nested_scopes()269 fn nested_scopes() {
270     // Create matching scopes for every thread pool.
271     fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP)
272     where
273         OP: FnOnce(&[&Scope<'scope>]) + Send,
274     {
275         if let Some((pool, tail)) = pools.split_first() {
276             pool.scope(move |s| {
277                 // This move reduces the reference lifetimes by variance to match s,
278                 // but the actual scopes are still tied to the invariant 'scope.
279                 let mut scopes = scopes;
280                 scopes.push(s);
281                 nest(tail, scopes, op)
282             })
283         } else {
284             (op)(&scopes)
285         }
286     }
287 
288     let pools: Vec<_> = (0..10)
289         .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
290         .collect();
291 
292     let counter = AtomicUsize::new(0);
293     nest(&pools, vec![], |scopes| {
294         for &s in scopes {
295             s.spawn(|_| {
296                 // Our 'scope lets us borrow the counter in every pool.
297                 counter.fetch_add(1, Ordering::Relaxed);
298             });
299         }
300     });
301     assert_eq!(counter.into_inner(), pools.len());
302 }
303 
304 #[test]
nested_fifo_scopes()305 fn nested_fifo_scopes() {
306     // Create matching fifo scopes for every thread pool.
307     fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP)
308     where
309         OP: FnOnce(&[&ScopeFifo<'scope>]) + Send,
310     {
311         if let Some((pool, tail)) = pools.split_first() {
312             pool.scope_fifo(move |s| {
313                 // This move reduces the reference lifetimes by variance to match s,
314                 // but the actual scopes are still tied to the invariant 'scope.
315                 let mut scopes = scopes;
316                 scopes.push(s);
317                 nest(tail, scopes, op)
318             })
319         } else {
320             (op)(&scopes)
321         }
322     }
323 
324     let pools: Vec<_> = (0..10)
325         .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
326         .collect();
327 
328     let counter = AtomicUsize::new(0);
329     nest(&pools, vec![], |scopes| {
330         for &s in scopes {
331             s.spawn_fifo(|_| {
332                 // Our 'scope lets us borrow the counter in every pool.
333                 counter.fetch_add(1, Ordering::Relaxed);
334             });
335         }
336     });
337     assert_eq!(counter.into_inner(), pools.len());
338 }
339 
340 #[test]
in_place_scope_no_deadlock()341 fn in_place_scope_no_deadlock() {
342     let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
343     let (tx, rx) = channel();
344     let rx_ref = &rx;
345     pool.in_place_scope(move |s| {
346         // With regular scopes this closure would never run because this scope op
347         // itself would block the only worker thread.
348         s.spawn(move |_| {
349             tx.send(()).unwrap();
350         });
351         rx_ref.recv().unwrap();
352     });
353 }
354 
355 #[test]
in_place_scope_fifo_no_deadlock()356 fn in_place_scope_fifo_no_deadlock() {
357     let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
358     let (tx, rx) = channel();
359     let rx_ref = &rx;
360     pool.in_place_scope_fifo(move |s| {
361         // With regular scopes this closure would never run because this scope op
362         // itself would block the only worker thread.
363         s.spawn_fifo(move |_| {
364             tx.send(()).unwrap();
365         });
366         rx_ref.recv().unwrap();
367     });
368 }
369