1 use std::sync::atomic::Ordering::SeqCst;
2 use std::sync::atomic::{AtomicBool, AtomicUsize};
3 use std::sync::{Arc, Mutex};
4 
5 use crossbeam_deque::Steal::{Empty, Success};
6 use crossbeam_deque::{Injector, Worker};
7 use crossbeam_utils::thread::scope;
8 use rand::Rng;
9 
10 #[test]
smoke()11 fn smoke() {
12     let q = Injector::new();
13     assert_eq!(q.steal(), Empty);
14 
15     q.push(1);
16     q.push(2);
17     assert_eq!(q.steal(), Success(1));
18     assert_eq!(q.steal(), Success(2));
19     assert_eq!(q.steal(), Empty);
20 
21     q.push(3);
22     assert_eq!(q.steal(), Success(3));
23     assert_eq!(q.steal(), Empty);
24 }
25 
26 #[test]
is_empty()27 fn is_empty() {
28     let q = Injector::new();
29     assert!(q.is_empty());
30 
31     q.push(1);
32     assert!(!q.is_empty());
33     q.push(2);
34     assert!(!q.is_empty());
35 
36     let _ = q.steal();
37     assert!(!q.is_empty());
38     let _ = q.steal();
39     assert!(q.is_empty());
40 
41     q.push(3);
42     assert!(!q.is_empty());
43     let _ = q.steal();
44     assert!(q.is_empty());
45 }
46 
47 #[test]
spsc()48 fn spsc() {
49     const COUNT: usize = 100_000;
50 
51     let q = Injector::new();
52 
53     scope(|scope| {
54         scope.spawn(|_| {
55             for i in 0..COUNT {
56                 loop {
57                     if let Success(v) = q.steal() {
58                         assert_eq!(i, v);
59                         break;
60                     }
61                 }
62             }
63 
64             assert_eq!(q.steal(), Empty);
65         });
66 
67         for i in 0..COUNT {
68             q.push(i);
69         }
70     })
71     .unwrap();
72 }
73 
74 #[test]
mpmc()75 fn mpmc() {
76     const COUNT: usize = 25_000;
77     const THREADS: usize = 4;
78 
79     let q = Injector::new();
80     let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
81 
82     scope(|scope| {
83         for _ in 0..THREADS {
84             scope.spawn(|_| {
85                 for i in 0..COUNT {
86                     q.push(i);
87                 }
88             });
89         }
90 
91         for _ in 0..THREADS {
92             scope.spawn(|_| {
93                 for _ in 0..COUNT {
94                     loop {
95                         if let Success(n) = q.steal() {
96                             v[n].fetch_add(1, SeqCst);
97                             break;
98                         }
99                     }
100                 }
101             });
102         }
103     })
104     .unwrap();
105 
106     for c in v {
107         assert_eq!(c.load(SeqCst), THREADS);
108     }
109 }
110 
111 #[test]
stampede()112 fn stampede() {
113     const THREADS: usize = 8;
114     const COUNT: usize = 50_000;
115 
116     let q = Injector::new();
117 
118     for i in 0..COUNT {
119         q.push(Box::new(i + 1));
120     }
121     let remaining = Arc::new(AtomicUsize::new(COUNT));
122 
123     scope(|scope| {
124         for _ in 0..THREADS {
125             let remaining = remaining.clone();
126             let q = &q;
127 
128             scope.spawn(move |_| {
129                 let mut last = 0;
130                 while remaining.load(SeqCst) > 0 {
131                     if let Success(x) = q.steal() {
132                         assert!(last < *x);
133                         last = *x;
134                         remaining.fetch_sub(1, SeqCst);
135                     }
136                 }
137             });
138         }
139 
140         let mut last = 0;
141         while remaining.load(SeqCst) > 0 {
142             if let Success(x) = q.steal() {
143                 assert!(last < *x);
144                 last = *x;
145                 remaining.fetch_sub(1, SeqCst);
146             }
147         }
148     })
149     .unwrap();
150 }
151 
152 #[test]
stress()153 fn stress() {
154     const THREADS: usize = 8;
155     const COUNT: usize = 50_000;
156 
157     let q = Injector::new();
158     let done = Arc::new(AtomicBool::new(false));
159     let hits = Arc::new(AtomicUsize::new(0));
160 
161     scope(|scope| {
162         for _ in 0..THREADS {
163             let done = done.clone();
164             let hits = hits.clone();
165             let q = &q;
166 
167             scope.spawn(move |_| {
168                 let w2 = Worker::new_fifo();
169 
170                 while !done.load(SeqCst) {
171                     if let Success(_) = q.steal() {
172                         hits.fetch_add(1, SeqCst);
173                     }
174 
175                     let _ = q.steal_batch(&w2);
176 
177                     if let Success(_) = q.steal_batch_and_pop(&w2) {
178                         hits.fetch_add(1, SeqCst);
179                     }
180 
181                     while w2.pop().is_some() {
182                         hits.fetch_add(1, SeqCst);
183                     }
184                 }
185             });
186         }
187 
188         let mut rng = rand::thread_rng();
189         let mut expected = 0;
190         while expected < COUNT {
191             if rng.gen_range(0..3) == 0 {
192                 while let Success(_) = q.steal() {
193                     hits.fetch_add(1, SeqCst);
194                 }
195             } else {
196                 q.push(expected);
197                 expected += 1;
198             }
199         }
200 
201         while hits.load(SeqCst) < COUNT {
202             while let Success(_) = q.steal() {
203                 hits.fetch_add(1, SeqCst);
204             }
205         }
206         done.store(true, SeqCst);
207     })
208     .unwrap();
209 }
210 
211 #[test]
no_starvation()212 fn no_starvation() {
213     const THREADS: usize = 8;
214     const COUNT: usize = 50_000;
215 
216     let q = Injector::new();
217     let done = Arc::new(AtomicBool::new(false));
218     let mut all_hits = Vec::new();
219 
220     scope(|scope| {
221         for _ in 0..THREADS {
222             let done = done.clone();
223             let hits = Arc::new(AtomicUsize::new(0));
224             all_hits.push(hits.clone());
225             let q = &q;
226 
227             scope.spawn(move |_| {
228                 let w2 = Worker::new_fifo();
229 
230                 while !done.load(SeqCst) {
231                     if let Success(_) = q.steal() {
232                         hits.fetch_add(1, SeqCst);
233                     }
234 
235                     let _ = q.steal_batch(&w2);
236 
237                     if let Success(_) = q.steal_batch_and_pop(&w2) {
238                         hits.fetch_add(1, SeqCst);
239                     }
240 
241                     while w2.pop().is_some() {
242                         hits.fetch_add(1, SeqCst);
243                     }
244                 }
245             });
246         }
247 
248         let mut rng = rand::thread_rng();
249         let mut my_hits = 0;
250         loop {
251             for i in 0..rng.gen_range(0..COUNT) {
252                 if rng.gen_range(0..3) == 0 && my_hits == 0 {
253                     while let Success(_) = q.steal() {
254                         my_hits += 1;
255                     }
256                 } else {
257                     q.push(i);
258                 }
259             }
260 
261             if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) {
262                 break;
263             }
264         }
265         done.store(true, SeqCst);
266     })
267     .unwrap();
268 }
269 
270 #[test]
destructors()271 fn destructors() {
272     const THREADS: usize = 8;
273     const COUNT: usize = 50_000;
274     const STEPS: usize = 1000;
275 
276     struct Elem(usize, Arc<Mutex<Vec<usize>>>);
277 
278     impl Drop for Elem {
279         fn drop(&mut self) {
280             self.1.lock().unwrap().push(self.0);
281         }
282     }
283 
284     let q = Injector::new();
285     let dropped = Arc::new(Mutex::new(Vec::new()));
286     let remaining = Arc::new(AtomicUsize::new(COUNT));
287 
288     for i in 0..COUNT {
289         q.push(Elem(i, dropped.clone()));
290     }
291 
292     scope(|scope| {
293         for _ in 0..THREADS {
294             let remaining = remaining.clone();
295             let q = &q;
296 
297             scope.spawn(move |_| {
298                 let w2 = Worker::new_fifo();
299                 let mut cnt = 0;
300 
301                 while cnt < STEPS {
302                     if let Success(_) = q.steal() {
303                         cnt += 1;
304                         remaining.fetch_sub(1, SeqCst);
305                     }
306 
307                     let _ = q.steal_batch(&w2);
308 
309                     if let Success(_) = q.steal_batch_and_pop(&w2) {
310                         cnt += 1;
311                         remaining.fetch_sub(1, SeqCst);
312                     }
313 
314                     while w2.pop().is_some() {
315                         cnt += 1;
316                         remaining.fetch_sub(1, SeqCst);
317                     }
318                 }
319             });
320         }
321 
322         for _ in 0..STEPS {
323             if let Success(_) = q.steal() {
324                 remaining.fetch_sub(1, SeqCst);
325             }
326         }
327     })
328     .unwrap();
329 
330     let rem = remaining.load(SeqCst);
331     assert!(rem > 0);
332 
333     {
334         let mut v = dropped.lock().unwrap();
335         assert_eq!(v.len(), COUNT - rem);
336         v.clear();
337     }
338 
339     drop(q);
340 
341     {
342         let mut v = dropped.lock().unwrap();
343         assert_eq!(v.len(), rem);
344         v.sort();
345         for pair in v.windows(2) {
346             assert_eq!(pair[0] + 1, pair[1]);
347         }
348     }
349 }
350