1 #![cfg(test)]
2
3 use std::sync::atomic::{AtomicUsize, Ordering};
4 use std::sync::mpsc::channel;
5 use std::sync::{Arc, Mutex};
6
7 #[allow(deprecated)]
8 use crate::Configuration;
9 use crate::{join, Scope, ScopeFifo, ThreadPool, ThreadPoolBuilder};
10
11 #[test]
12 #[should_panic(expected = "Hello, world!")]
panic_propagate()13 fn panic_propagate() {
14 let thread_pool = ThreadPoolBuilder::new().build().unwrap();
15 thread_pool.install(|| {
16 panic!("Hello, world!");
17 });
18 }
19
20 #[test]
workers_stop()21 fn workers_stop() {
22 let registry;
23
24 {
25 // once we exit this block, thread-pool will be dropped
26 let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
27 registry = thread_pool.install(|| {
28 // do some work on these threads
29 join_a_lot(22);
30
31 thread_pool.registry.clone()
32 });
33 assert_eq!(registry.num_threads(), 22);
34 }
35
36 // once thread-pool is dropped, registry should terminate, which
37 // should lead to worker threads stopping
38 registry.wait_until_stopped();
39 }
40
join_a_lot(n: usize)41 fn join_a_lot(n: usize) {
42 if n > 0 {
43 join(|| join_a_lot(n - 1), || join_a_lot(n - 1));
44 }
45 }
46
47 #[test]
sleeper_stop()48 fn sleeper_stop() {
49 use std::{thread, time};
50
51 let registry;
52
53 {
54 // once we exit this block, thread-pool will be dropped
55 let thread_pool = ThreadPoolBuilder::new().num_threads(22).build().unwrap();
56 registry = thread_pool.registry.clone();
57
58 // Give time for at least some of the thread pool to fall asleep.
59 thread::sleep(time::Duration::from_secs(1));
60 }
61
62 // once thread-pool is dropped, registry should terminate, which
63 // should lead to worker threads stopping
64 registry.wait_until_stopped();
65 }
66
67 /// Creates a start/exit handler that increments an atomic counter.
count_handler() -> (Arc<AtomicUsize>, impl Fn(usize))68 fn count_handler() -> (Arc<AtomicUsize>, impl Fn(usize)) {
69 let count = Arc::new(AtomicUsize::new(0));
70 (count.clone(), move |_| {
71 count.fetch_add(1, Ordering::SeqCst);
72 })
73 }
74
75 /// Wait until a counter is no longer shared, then return its value.
wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize76 fn wait_for_counter(mut counter: Arc<AtomicUsize>) -> usize {
77 use std::{thread, time};
78
79 for _ in 0..60 {
80 counter = match Arc::try_unwrap(counter) {
81 Ok(counter) => return counter.into_inner(),
82 Err(counter) => {
83 thread::sleep(time::Duration::from_secs(1));
84 counter
85 }
86 };
87 }
88
89 // That's too long!
90 panic!("Counter is still shared!");
91 }
92
93 #[test]
failed_thread_stack()94 fn failed_thread_stack() {
95 // Note: we first tried to force failure with a `usize::MAX` stack, but
96 // macOS and Windows weren't fazed, or at least didn't fail the way we want.
97 // They work with `isize::MAX`, but 32-bit platforms may feasibly allocate a
98 // 2GB stack, so it might not fail until the second thread.
99 let stack_size = ::std::isize::MAX as usize;
100
101 let (start_count, start_handler) = count_handler();
102 let (exit_count, exit_handler) = count_handler();
103 let builder = ThreadPoolBuilder::new()
104 .num_threads(10)
105 .stack_size(stack_size)
106 .start_handler(start_handler)
107 .exit_handler(exit_handler);
108
109 let pool = builder.build();
110 assert!(pool.is_err(), "thread stack should have failed!");
111
112 // With such a huge stack, 64-bit will probably fail on the first thread;
113 // 32-bit might manage the first 2GB, but certainly fail the second.
114 let start_count = wait_for_counter(start_count);
115 assert!(start_count <= 1);
116 assert_eq!(start_count, wait_for_counter(exit_count));
117 }
118
119 #[test]
panic_thread_name()120 fn panic_thread_name() {
121 let (start_count, start_handler) = count_handler();
122 let (exit_count, exit_handler) = count_handler();
123 let builder = ThreadPoolBuilder::new()
124 .num_threads(10)
125 .start_handler(start_handler)
126 .exit_handler(exit_handler)
127 .thread_name(|i| {
128 if i >= 5 {
129 panic!();
130 }
131 format!("panic_thread_name#{}", i)
132 });
133
134 let pool = crate::unwind::halt_unwinding(|| builder.build());
135 assert!(pool.is_err(), "thread-name panic should propagate!");
136
137 // Assuming they're created in order, threads 0 through 4 should have
138 // been started already, and then terminated by the panic.
139 assert_eq!(5, wait_for_counter(start_count));
140 assert_eq!(5, wait_for_counter(exit_count));
141 }
142
143 #[test]
self_install()144 fn self_install() {
145 let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
146
147 // If the inner `install` blocks, then nothing will actually run it!
148 assert!(pool.install(|| pool.install(|| true)));
149 }
150
151 #[test]
mutual_install()152 fn mutual_install() {
153 let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
154 let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
155
156 let ok = pool1.install(|| {
157 // This creates a dependency from `pool1` -> `pool2`
158 pool2.install(|| {
159 // This creates a dependency from `pool2` -> `pool1`
160 pool1.install(|| {
161 // If they blocked on inter-pool installs, there would be no
162 // threads left to run this!
163 true
164 })
165 })
166 });
167 assert!(ok);
168 }
169
170 #[test]
mutual_install_sleepy()171 fn mutual_install_sleepy() {
172 use std::{thread, time};
173
174 let pool1 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
175 let pool2 = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
176
177 let ok = pool1.install(|| {
178 // This creates a dependency from `pool1` -> `pool2`
179 pool2.install(|| {
180 // Give `pool1` time to fall asleep.
181 thread::sleep(time::Duration::from_secs(1));
182
183 // This creates a dependency from `pool2` -> `pool1`
184 pool1.install(|| {
185 // Give `pool2` time to fall asleep.
186 thread::sleep(time::Duration::from_secs(1));
187
188 // If they blocked on inter-pool installs, there would be no
189 // threads left to run this!
190 true
191 })
192 })
193 });
194 assert!(ok);
195 }
196
197 #[test]
198 #[allow(deprecated)]
check_thread_pool_new()199 fn check_thread_pool_new() {
200 let pool = ThreadPool::new(Configuration::new().num_threads(22)).unwrap();
201 assert_eq!(pool.current_num_threads(), 22);
202 }
203
204 macro_rules! test_scope_order {
205 ($scope:ident => $spawn:ident) => {{
206 let builder = ThreadPoolBuilder::new().num_threads(1);
207 let pool = builder.build().unwrap();
208 pool.install(|| {
209 let vec = Mutex::new(vec![]);
210 pool.$scope(|scope| {
211 let vec = &vec;
212 for i in 0..10 {
213 scope.$spawn(move |_| {
214 vec.lock().unwrap().push(i);
215 });
216 }
217 });
218 vec.into_inner().unwrap()
219 })
220 }};
221 }
222
223 #[test]
scope_lifo_order()224 fn scope_lifo_order() {
225 let vec = test_scope_order!(scope => spawn);
226 let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
227 assert_eq!(vec, expected);
228 }
229
230 #[test]
scope_fifo_order()231 fn scope_fifo_order() {
232 let vec = test_scope_order!(scope_fifo => spawn_fifo);
233 let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
234 assert_eq!(vec, expected);
235 }
236
237 macro_rules! test_spawn_order {
238 ($spawn:ident) => {{
239 let builder = ThreadPoolBuilder::new().num_threads(1);
240 let pool = &builder.build().unwrap();
241 let (tx, rx) = channel();
242 pool.install(move || {
243 for i in 0..10 {
244 let tx = tx.clone();
245 pool.$spawn(move || {
246 tx.send(i).unwrap();
247 });
248 }
249 });
250 rx.iter().collect::<Vec<i32>>()
251 }};
252 }
253
254 #[test]
spawn_lifo_order()255 fn spawn_lifo_order() {
256 let vec = test_spawn_order!(spawn);
257 let expected: Vec<i32> = (0..10).rev().collect(); // LIFO -> reversed
258 assert_eq!(vec, expected);
259 }
260
261 #[test]
spawn_fifo_order()262 fn spawn_fifo_order() {
263 let vec = test_spawn_order!(spawn_fifo);
264 let expected: Vec<i32> = (0..10).collect(); // FIFO -> natural order
265 assert_eq!(vec, expected);
266 }
267
268 #[test]
nested_scopes()269 fn nested_scopes() {
270 // Create matching scopes for every thread pool.
271 fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&Scope<'scope>>, op: OP)
272 where
273 OP: FnOnce(&[&Scope<'scope>]) + Send,
274 {
275 if let Some((pool, tail)) = pools.split_first() {
276 pool.scope(move |s| {
277 // This move reduces the reference lifetimes by variance to match s,
278 // but the actual scopes are still tied to the invariant 'scope.
279 let mut scopes = scopes;
280 scopes.push(s);
281 nest(tail, scopes, op)
282 })
283 } else {
284 (op)(&scopes)
285 }
286 }
287
288 let pools: Vec<_> = (0..10)
289 .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
290 .collect();
291
292 let counter = AtomicUsize::new(0);
293 nest(&pools, vec![], |scopes| {
294 for &s in scopes {
295 s.spawn(|_| {
296 // Our 'scope lets us borrow the counter in every pool.
297 counter.fetch_add(1, Ordering::Relaxed);
298 });
299 }
300 });
301 assert_eq!(counter.into_inner(), pools.len());
302 }
303
304 #[test]
nested_fifo_scopes()305 fn nested_fifo_scopes() {
306 // Create matching fifo scopes for every thread pool.
307 fn nest<'scope, OP>(pools: &[ThreadPool], scopes: Vec<&ScopeFifo<'scope>>, op: OP)
308 where
309 OP: FnOnce(&[&ScopeFifo<'scope>]) + Send,
310 {
311 if let Some((pool, tail)) = pools.split_first() {
312 pool.scope_fifo(move |s| {
313 // This move reduces the reference lifetimes by variance to match s,
314 // but the actual scopes are still tied to the invariant 'scope.
315 let mut scopes = scopes;
316 scopes.push(s);
317 nest(tail, scopes, op)
318 })
319 } else {
320 (op)(&scopes)
321 }
322 }
323
324 let pools: Vec<_> = (0..10)
325 .map(|_| ThreadPoolBuilder::new().num_threads(1).build().unwrap())
326 .collect();
327
328 let counter = AtomicUsize::new(0);
329 nest(&pools, vec![], |scopes| {
330 for &s in scopes {
331 s.spawn_fifo(|_| {
332 // Our 'scope lets us borrow the counter in every pool.
333 counter.fetch_add(1, Ordering::Relaxed);
334 });
335 }
336 });
337 assert_eq!(counter.into_inner(), pools.len());
338 }
339
340 #[test]
in_place_scope_no_deadlock()341 fn in_place_scope_no_deadlock() {
342 let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
343 let (tx, rx) = channel();
344 let rx_ref = ℞
345 pool.in_place_scope(move |s| {
346 // With regular scopes this closure would never run because this scope op
347 // itself would block the only worker thread.
348 s.spawn(move |_| {
349 tx.send(()).unwrap();
350 });
351 rx_ref.recv().unwrap();
352 });
353 }
354
355 #[test]
in_place_scope_fifo_no_deadlock()356 fn in_place_scope_fifo_no_deadlock() {
357 let pool = ThreadPoolBuilder::new().num_threads(1).build().unwrap();
358 let (tx, rx) = channel();
359 let rx_ref = ℞
360 pool.in_place_scope_fifo(move |s| {
361 // With regular scopes this closure would never run because this scope op
362 // itself would block the only worker thread.
363 s.spawn_fifo(move |_| {
364 tx.send(()).unwrap();
365 });
366 rx_ref.recv().unwrap();
367 });
368 }
369