1 extern crate crossbeam_deque as deque;
2 extern crate crossbeam_utils as utils;
3 extern crate rand;
4 
5 use std::sync::atomic::Ordering::SeqCst;
6 use std::sync::atomic::{AtomicBool, AtomicUsize};
7 use std::sync::{Arc, Mutex};
8 
9 use deque::Steal::{Empty, Success};
10 use deque::Worker;
11 use rand::Rng;
12 use utils::thread::scope;
13 
14 #[test]
smoke()15 fn smoke() {
16     let w = Worker::new_lifo();
17     let s = w.stealer();
18     assert_eq!(w.pop(), None);
19     assert_eq!(s.steal(), Empty);
20 
21     w.push(1);
22     assert_eq!(w.pop(), Some(1));
23     assert_eq!(w.pop(), None);
24     assert_eq!(s.steal(), Empty);
25 
26     w.push(2);
27     assert_eq!(s.steal(), Success(2));
28     assert_eq!(s.steal(), Empty);
29     assert_eq!(w.pop(), None);
30 
31     w.push(3);
32     w.push(4);
33     w.push(5);
34     assert_eq!(s.steal(), Success(3));
35     assert_eq!(s.steal(), Success(4));
36     assert_eq!(s.steal(), Success(5));
37     assert_eq!(s.steal(), Empty);
38 
39     w.push(6);
40     w.push(7);
41     w.push(8);
42     w.push(9);
43     assert_eq!(w.pop(), Some(9));
44     assert_eq!(s.steal(), Success(6));
45     assert_eq!(w.pop(), Some(8));
46     assert_eq!(w.pop(), Some(7));
47     assert_eq!(w.pop(), None);
48 }
49 
50 #[test]
is_empty()51 fn is_empty() {
52     let w = Worker::new_lifo();
53     let s = w.stealer();
54 
55     assert!(w.is_empty());
56     w.push(1);
57     assert!(!w.is_empty());
58     w.push(2);
59     assert!(!w.is_empty());
60     let _ = w.pop();
61     assert!(!w.is_empty());
62     let _ = w.pop();
63     assert!(w.is_empty());
64 
65     assert!(s.is_empty());
66     w.push(1);
67     assert!(!s.is_empty());
68     w.push(2);
69     assert!(!s.is_empty());
70     let _ = s.steal();
71     assert!(!s.is_empty());
72     let _ = s.steal();
73     assert!(s.is_empty());
74 }
75 
76 #[test]
spsc()77 fn spsc() {
78     const STEPS: usize = 50_000;
79 
80     let w = Worker::new_lifo();
81     let s = w.stealer();
82 
83     scope(|scope| {
84         scope.spawn(|_| {
85             for i in 0..STEPS {
86                 loop {
87                     if let Success(v) = s.steal() {
88                         assert_eq!(i, v);
89                         break;
90                     }
91                 }
92             }
93 
94             assert_eq!(s.steal(), Empty);
95         });
96 
97         for i in 0..STEPS {
98             w.push(i);
99         }
100     })
101     .unwrap();
102 }
103 
104 #[test]
stampede()105 fn stampede() {
106     const THREADS: usize = 8;
107     const COUNT: usize = 50_000;
108 
109     let w = Worker::new_lifo();
110 
111     for i in 0..COUNT {
112         w.push(Box::new(i + 1));
113     }
114     let remaining = Arc::new(AtomicUsize::new(COUNT));
115 
116     scope(|scope| {
117         for _ in 0..THREADS {
118             let s = w.stealer();
119             let remaining = remaining.clone();
120 
121             scope.spawn(move |_| {
122                 let mut last = 0;
123                 while remaining.load(SeqCst) > 0 {
124                     if let Success(x) = s.steal() {
125                         assert!(last < *x);
126                         last = *x;
127                         remaining.fetch_sub(1, SeqCst);
128                     }
129                 }
130             });
131         }
132 
133         let mut last = COUNT + 1;
134         while remaining.load(SeqCst) > 0 {
135             if let Some(x) = w.pop() {
136                 assert!(last > *x);
137                 last = *x;
138                 remaining.fetch_sub(1, SeqCst);
139             }
140         }
141     })
142     .unwrap();
143 }
144 
145 #[test]
stress()146 fn stress() {
147     const THREADS: usize = 8;
148     const COUNT: usize = 50_000;
149 
150     let w = Worker::new_lifo();
151     let done = Arc::new(AtomicBool::new(false));
152     let hits = Arc::new(AtomicUsize::new(0));
153 
154     scope(|scope| {
155         for _ in 0..THREADS {
156             let s = w.stealer();
157             let done = done.clone();
158             let hits = hits.clone();
159 
160             scope.spawn(move |_| {
161                 let w2 = Worker::new_lifo();
162 
163                 while !done.load(SeqCst) {
164                     if let Success(_) = s.steal() {
165                         hits.fetch_add(1, SeqCst);
166                     }
167 
168                     let _ = s.steal_batch(&w2);
169 
170                     if let Success(_) = s.steal_batch_and_pop(&w2) {
171                         hits.fetch_add(1, SeqCst);
172                     }
173 
174                     while let Some(_) = w2.pop() {
175                         hits.fetch_add(1, SeqCst);
176                     }
177                 }
178             });
179         }
180 
181         let mut rng = rand::thread_rng();
182         let mut expected = 0;
183         while expected < COUNT {
184             if rng.gen_range(0, 3) == 0 {
185                 while let Some(_) = w.pop() {
186                     hits.fetch_add(1, SeqCst);
187                 }
188             } else {
189                 w.push(expected);
190                 expected += 1;
191             }
192         }
193 
194         while hits.load(SeqCst) < COUNT {
195             while let Some(_) = w.pop() {
196                 hits.fetch_add(1, SeqCst);
197             }
198         }
199         done.store(true, SeqCst);
200     })
201     .unwrap();
202 }
203 
204 #[test]
no_starvation()205 fn no_starvation() {
206     const THREADS: usize = 8;
207     const COUNT: usize = 50_000;
208 
209     let w = Worker::new_lifo();
210     let done = Arc::new(AtomicBool::new(false));
211     let mut all_hits = Vec::new();
212 
213     scope(|scope| {
214         for _ in 0..THREADS {
215             let s = w.stealer();
216             let done = done.clone();
217             let hits = Arc::new(AtomicUsize::new(0));
218             all_hits.push(hits.clone());
219 
220             scope.spawn(move |_| {
221                 let w2 = Worker::new_lifo();
222 
223                 while !done.load(SeqCst) {
224                     if let Success(_) = s.steal() {
225                         hits.fetch_add(1, SeqCst);
226                     }
227 
228                     let _ = s.steal_batch(&w2);
229 
230                     if let Success(_) = s.steal_batch_and_pop(&w2) {
231                         hits.fetch_add(1, SeqCst);
232                     }
233 
234                     while let Some(_) = w2.pop() {
235                         hits.fetch_add(1, SeqCst);
236                     }
237                 }
238             });
239         }
240 
241         let mut rng = rand::thread_rng();
242         let mut my_hits = 0;
243         loop {
244             for i in 0..rng.gen_range(0, COUNT) {
245                 if rng.gen_range(0, 3) == 0 && my_hits == 0 {
246                     while let Some(_) = w.pop() {
247                         my_hits += 1;
248                     }
249                 } else {
250                     w.push(i);
251                 }
252             }
253 
254             if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) {
255                 break;
256             }
257         }
258         done.store(true, SeqCst);
259     })
260     .unwrap();
261 }
262 
263 #[test]
destructors()264 fn destructors() {
265     const THREADS: usize = 8;
266     const COUNT: usize = 50_000;
267     const STEPS: usize = 1000;
268 
269     struct Elem(usize, Arc<Mutex<Vec<usize>>>);
270 
271     impl Drop for Elem {
272         fn drop(&mut self) {
273             self.1.lock().unwrap().push(self.0);
274         }
275     }
276 
277     let w = Worker::new_lifo();
278     let dropped = Arc::new(Mutex::new(Vec::new()));
279     let remaining = Arc::new(AtomicUsize::new(COUNT));
280 
281     for i in 0..COUNT {
282         w.push(Elem(i, dropped.clone()));
283     }
284 
285     scope(|scope| {
286         for _ in 0..THREADS {
287             let remaining = remaining.clone();
288             let s = w.stealer();
289 
290             scope.spawn(move |_| {
291                 let w2 = Worker::new_lifo();
292                 let mut cnt = 0;
293 
294                 while cnt < STEPS {
295                     if let Success(_) = s.steal() {
296                         cnt += 1;
297                         remaining.fetch_sub(1, SeqCst);
298                     }
299 
300                     let _ = s.steal_batch(&w2);
301 
302                     if let Success(_) = s.steal_batch_and_pop(&w2) {
303                         cnt += 1;
304                         remaining.fetch_sub(1, SeqCst);
305                     }
306 
307                     while let Some(_) = w2.pop() {
308                         cnt += 1;
309                         remaining.fetch_sub(1, SeqCst);
310                     }
311                 }
312             });
313         }
314 
315         for _ in 0..STEPS {
316             if let Some(_) = w.pop() {
317                 remaining.fetch_sub(1, SeqCst);
318             }
319         }
320     })
321     .unwrap();
322 
323     let rem = remaining.load(SeqCst);
324     assert!(rem > 0);
325 
326     {
327         let mut v = dropped.lock().unwrap();
328         assert_eq!(v.len(), COUNT - rem);
329         v.clear();
330     }
331 
332     drop(w);
333 
334     {
335         let mut v = dropped.lock().unwrap();
336         assert_eq!(v.len(), rem);
337         v.sort();
338         for pair in v.windows(2) {
339             assert_eq!(pair[0] + 1, pair[1]);
340         }
341     }
342 }
343