1 use std::sync::atomic::Ordering::SeqCst;
2 use std::sync::atomic::{AtomicBool, AtomicUsize};
3 use std::sync::{Arc, Mutex};
4 
5 use crossbeam_deque::Steal::{Empty, Success};
6 use crossbeam_deque::Worker;
7 use crossbeam_utils::thread::scope;
8 use rand::Rng;
9 
10 #[test]
smoke()11 fn smoke() {
12     let w = Worker::new_lifo();
13     let s = w.stealer();
14     assert_eq!(w.pop(), None);
15     assert_eq!(s.steal(), Empty);
16 
17     w.push(1);
18     assert_eq!(w.pop(), Some(1));
19     assert_eq!(w.pop(), None);
20     assert_eq!(s.steal(), Empty);
21 
22     w.push(2);
23     assert_eq!(s.steal(), Success(2));
24     assert_eq!(s.steal(), Empty);
25     assert_eq!(w.pop(), None);
26 
27     w.push(3);
28     w.push(4);
29     w.push(5);
30     assert_eq!(s.steal(), Success(3));
31     assert_eq!(s.steal(), Success(4));
32     assert_eq!(s.steal(), Success(5));
33     assert_eq!(s.steal(), Empty);
34 
35     w.push(6);
36     w.push(7);
37     w.push(8);
38     w.push(9);
39     assert_eq!(w.pop(), Some(9));
40     assert_eq!(s.steal(), Success(6));
41     assert_eq!(w.pop(), Some(8));
42     assert_eq!(w.pop(), Some(7));
43     assert_eq!(w.pop(), None);
44 }
45 
46 #[test]
is_empty()47 fn is_empty() {
48     let w = Worker::new_lifo();
49     let s = w.stealer();
50 
51     assert!(w.is_empty());
52     w.push(1);
53     assert!(!w.is_empty());
54     w.push(2);
55     assert!(!w.is_empty());
56     let _ = w.pop();
57     assert!(!w.is_empty());
58     let _ = w.pop();
59     assert!(w.is_empty());
60 
61     assert!(s.is_empty());
62     w.push(1);
63     assert!(!s.is_empty());
64     w.push(2);
65     assert!(!s.is_empty());
66     let _ = s.steal();
67     assert!(!s.is_empty());
68     let _ = s.steal();
69     assert!(s.is_empty());
70 }
71 
72 #[test]
spsc()73 fn spsc() {
74     const STEPS: usize = 50_000;
75 
76     let w = Worker::new_lifo();
77     let s = w.stealer();
78 
79     scope(|scope| {
80         scope.spawn(|_| {
81             for i in 0..STEPS {
82                 loop {
83                     if let Success(v) = s.steal() {
84                         assert_eq!(i, v);
85                         break;
86                     }
87                 }
88             }
89 
90             assert_eq!(s.steal(), Empty);
91         });
92 
93         for i in 0..STEPS {
94             w.push(i);
95         }
96     })
97     .unwrap();
98 }
99 
100 #[test]
stampede()101 fn stampede() {
102     const THREADS: usize = 8;
103     const COUNT: usize = 50_000;
104 
105     let w = Worker::new_lifo();
106 
107     for i in 0..COUNT {
108         w.push(Box::new(i + 1));
109     }
110     let remaining = Arc::new(AtomicUsize::new(COUNT));
111 
112     scope(|scope| {
113         for _ in 0..THREADS {
114             let s = w.stealer();
115             let remaining = remaining.clone();
116 
117             scope.spawn(move |_| {
118                 let mut last = 0;
119                 while remaining.load(SeqCst) > 0 {
120                     if let Success(x) = s.steal() {
121                         assert!(last < *x);
122                         last = *x;
123                         remaining.fetch_sub(1, SeqCst);
124                     }
125                 }
126             });
127         }
128 
129         let mut last = COUNT + 1;
130         while remaining.load(SeqCst) > 0 {
131             if let Some(x) = w.pop() {
132                 assert!(last > *x);
133                 last = *x;
134                 remaining.fetch_sub(1, SeqCst);
135             }
136         }
137     })
138     .unwrap();
139 }
140 
141 #[test]
stress()142 fn stress() {
143     const THREADS: usize = 8;
144     const COUNT: usize = 50_000;
145 
146     let w = Worker::new_lifo();
147     let done = Arc::new(AtomicBool::new(false));
148     let hits = Arc::new(AtomicUsize::new(0));
149 
150     scope(|scope| {
151         for _ in 0..THREADS {
152             let s = w.stealer();
153             let done = done.clone();
154             let hits = hits.clone();
155 
156             scope.spawn(move |_| {
157                 let w2 = Worker::new_lifo();
158 
159                 while !done.load(SeqCst) {
160                     if let Success(_) = s.steal() {
161                         hits.fetch_add(1, SeqCst);
162                     }
163 
164                     let _ = s.steal_batch(&w2);
165 
166                     if let Success(_) = s.steal_batch_and_pop(&w2) {
167                         hits.fetch_add(1, SeqCst);
168                     }
169 
170                     while w2.pop().is_some() {
171                         hits.fetch_add(1, SeqCst);
172                     }
173                 }
174             });
175         }
176 
177         let mut rng = rand::thread_rng();
178         let mut expected = 0;
179         while expected < COUNT {
180             if rng.gen_range(0..3) == 0 {
181                 while w.pop().is_some() {
182                     hits.fetch_add(1, SeqCst);
183                 }
184             } else {
185                 w.push(expected);
186                 expected += 1;
187             }
188         }
189 
190         while hits.load(SeqCst) < COUNT {
191             while w.pop().is_some() {
192                 hits.fetch_add(1, SeqCst);
193             }
194         }
195         done.store(true, SeqCst);
196     })
197     .unwrap();
198 }
199 
200 #[test]
no_starvation()201 fn no_starvation() {
202     const THREADS: usize = 8;
203     const COUNT: usize = 50_000;
204 
205     let w = Worker::new_lifo();
206     let done = Arc::new(AtomicBool::new(false));
207     let mut all_hits = Vec::new();
208 
209     scope(|scope| {
210         for _ in 0..THREADS {
211             let s = w.stealer();
212             let done = done.clone();
213             let hits = Arc::new(AtomicUsize::new(0));
214             all_hits.push(hits.clone());
215 
216             scope.spawn(move |_| {
217                 let w2 = Worker::new_lifo();
218 
219                 while !done.load(SeqCst) {
220                     if let Success(_) = s.steal() {
221                         hits.fetch_add(1, SeqCst);
222                     }
223 
224                     let _ = s.steal_batch(&w2);
225 
226                     if let Success(_) = s.steal_batch_and_pop(&w2) {
227                         hits.fetch_add(1, SeqCst);
228                     }
229 
230                     while w2.pop().is_some() {
231                         hits.fetch_add(1, SeqCst);
232                     }
233                 }
234             });
235         }
236 
237         let mut rng = rand::thread_rng();
238         let mut my_hits = 0;
239         loop {
240             for i in 0..rng.gen_range(0..COUNT) {
241                 if rng.gen_range(0..3) == 0 && my_hits == 0 {
242                     while w.pop().is_some() {
243                         my_hits += 1;
244                     }
245                 } else {
246                     w.push(i);
247                 }
248             }
249 
250             if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) {
251                 break;
252             }
253         }
254         done.store(true, SeqCst);
255     })
256     .unwrap();
257 }
258 
259 #[test]
destructors()260 fn destructors() {
261     const THREADS: usize = 8;
262     const COUNT: usize = 50_000;
263     const STEPS: usize = 1000;
264 
265     struct Elem(usize, Arc<Mutex<Vec<usize>>>);
266 
267     impl Drop for Elem {
268         fn drop(&mut self) {
269             self.1.lock().unwrap().push(self.0);
270         }
271     }
272 
273     let w = Worker::new_lifo();
274     let dropped = Arc::new(Mutex::new(Vec::new()));
275     let remaining = Arc::new(AtomicUsize::new(COUNT));
276 
277     for i in 0..COUNT {
278         w.push(Elem(i, dropped.clone()));
279     }
280 
281     scope(|scope| {
282         for _ in 0..THREADS {
283             let remaining = remaining.clone();
284             let s = w.stealer();
285 
286             scope.spawn(move |_| {
287                 let w2 = Worker::new_lifo();
288                 let mut cnt = 0;
289 
290                 while cnt < STEPS {
291                     if let Success(_) = s.steal() {
292                         cnt += 1;
293                         remaining.fetch_sub(1, SeqCst);
294                     }
295 
296                     let _ = s.steal_batch(&w2);
297 
298                     if let Success(_) = s.steal_batch_and_pop(&w2) {
299                         cnt += 1;
300                         remaining.fetch_sub(1, SeqCst);
301                     }
302 
303                     while w2.pop().is_some() {
304                         cnt += 1;
305                         remaining.fetch_sub(1, SeqCst);
306                     }
307                 }
308             });
309         }
310 
311         for _ in 0..STEPS {
312             if w.pop().is_some() {
313                 remaining.fetch_sub(1, SeqCst);
314             }
315         }
316     })
317     .unwrap();
318 
319     let rem = remaining.load(SeqCst);
320     assert!(rem > 0);
321 
322     {
323         let mut v = dropped.lock().unwrap();
324         assert_eq!(v.len(), COUNT - rem);
325         v.clear();
326     }
327 
328     drop(w);
329 
330     {
331         let mut v = dropped.lock().unwrap();
332         assert_eq!(v.len(), rem);
333         v.sort();
334         for pair in v.windows(2) {
335             assert_eq!(pair[0] + 1, pair[1]);
336         }
337     }
338 }
339