1 extern crate crossbeam_deque as deque;
2 extern crate crossbeam_utils as utils;
3 extern crate rand;
4
5 use std::sync::atomic::Ordering::SeqCst;
6 use std::sync::atomic::{AtomicBool, AtomicUsize};
7 use std::sync::{Arc, Mutex};
8
9 use deque::Steal::{Empty, Success};
10 use deque::Worker;
11 use rand::Rng;
12 use utils::thread::scope;
13
14 #[test]
smoke()15 fn smoke() {
16 let w = Worker::new_lifo();
17 let s = w.stealer();
18 assert_eq!(w.pop(), None);
19 assert_eq!(s.steal(), Empty);
20
21 w.push(1);
22 assert_eq!(w.pop(), Some(1));
23 assert_eq!(w.pop(), None);
24 assert_eq!(s.steal(), Empty);
25
26 w.push(2);
27 assert_eq!(s.steal(), Success(2));
28 assert_eq!(s.steal(), Empty);
29 assert_eq!(w.pop(), None);
30
31 w.push(3);
32 w.push(4);
33 w.push(5);
34 assert_eq!(s.steal(), Success(3));
35 assert_eq!(s.steal(), Success(4));
36 assert_eq!(s.steal(), Success(5));
37 assert_eq!(s.steal(), Empty);
38
39 w.push(6);
40 w.push(7);
41 w.push(8);
42 w.push(9);
43 assert_eq!(w.pop(), Some(9));
44 assert_eq!(s.steal(), Success(6));
45 assert_eq!(w.pop(), Some(8));
46 assert_eq!(w.pop(), Some(7));
47 assert_eq!(w.pop(), None);
48 }
49
50 #[test]
is_empty()51 fn is_empty() {
52 let w = Worker::new_lifo();
53 let s = w.stealer();
54
55 assert!(w.is_empty());
56 w.push(1);
57 assert!(!w.is_empty());
58 w.push(2);
59 assert!(!w.is_empty());
60 let _ = w.pop();
61 assert!(!w.is_empty());
62 let _ = w.pop();
63 assert!(w.is_empty());
64
65 assert!(s.is_empty());
66 w.push(1);
67 assert!(!s.is_empty());
68 w.push(2);
69 assert!(!s.is_empty());
70 let _ = s.steal();
71 assert!(!s.is_empty());
72 let _ = s.steal();
73 assert!(s.is_empty());
74 }
75
76 #[test]
spsc()77 fn spsc() {
78 const STEPS: usize = 50_000;
79
80 let w = Worker::new_lifo();
81 let s = w.stealer();
82
83 scope(|scope| {
84 scope.spawn(|_| {
85 for i in 0..STEPS {
86 loop {
87 if let Success(v) = s.steal() {
88 assert_eq!(i, v);
89 break;
90 }
91 }
92 }
93
94 assert_eq!(s.steal(), Empty);
95 });
96
97 for i in 0..STEPS {
98 w.push(i);
99 }
100 })
101 .unwrap();
102 }
103
104 #[test]
stampede()105 fn stampede() {
106 const THREADS: usize = 8;
107 const COUNT: usize = 50_000;
108
109 let w = Worker::new_lifo();
110
111 for i in 0..COUNT {
112 w.push(Box::new(i + 1));
113 }
114 let remaining = Arc::new(AtomicUsize::new(COUNT));
115
116 scope(|scope| {
117 for _ in 0..THREADS {
118 let s = w.stealer();
119 let remaining = remaining.clone();
120
121 scope.spawn(move |_| {
122 let mut last = 0;
123 while remaining.load(SeqCst) > 0 {
124 if let Success(x) = s.steal() {
125 assert!(last < *x);
126 last = *x;
127 remaining.fetch_sub(1, SeqCst);
128 }
129 }
130 });
131 }
132
133 let mut last = COUNT + 1;
134 while remaining.load(SeqCst) > 0 {
135 if let Some(x) = w.pop() {
136 assert!(last > *x);
137 last = *x;
138 remaining.fetch_sub(1, SeqCst);
139 }
140 }
141 })
142 .unwrap();
143 }
144
145 #[test]
stress()146 fn stress() {
147 const THREADS: usize = 8;
148 const COUNT: usize = 50_000;
149
150 let w = Worker::new_lifo();
151 let done = Arc::new(AtomicBool::new(false));
152 let hits = Arc::new(AtomicUsize::new(0));
153
154 scope(|scope| {
155 for _ in 0..THREADS {
156 let s = w.stealer();
157 let done = done.clone();
158 let hits = hits.clone();
159
160 scope.spawn(move |_| {
161 let w2 = Worker::new_lifo();
162
163 while !done.load(SeqCst) {
164 if let Success(_) = s.steal() {
165 hits.fetch_add(1, SeqCst);
166 }
167
168 let _ = s.steal_batch(&w2);
169
170 if let Success(_) = s.steal_batch_and_pop(&w2) {
171 hits.fetch_add(1, SeqCst);
172 }
173
174 while let Some(_) = w2.pop() {
175 hits.fetch_add(1, SeqCst);
176 }
177 }
178 });
179 }
180
181 let mut rng = rand::thread_rng();
182 let mut expected = 0;
183 while expected < COUNT {
184 if rng.gen_range(0, 3) == 0 {
185 while let Some(_) = w.pop() {
186 hits.fetch_add(1, SeqCst);
187 }
188 } else {
189 w.push(expected);
190 expected += 1;
191 }
192 }
193
194 while hits.load(SeqCst) < COUNT {
195 while let Some(_) = w.pop() {
196 hits.fetch_add(1, SeqCst);
197 }
198 }
199 done.store(true, SeqCst);
200 })
201 .unwrap();
202 }
203
204 #[test]
no_starvation()205 fn no_starvation() {
206 const THREADS: usize = 8;
207 const COUNT: usize = 50_000;
208
209 let w = Worker::new_lifo();
210 let done = Arc::new(AtomicBool::new(false));
211 let mut all_hits = Vec::new();
212
213 scope(|scope| {
214 for _ in 0..THREADS {
215 let s = w.stealer();
216 let done = done.clone();
217 let hits = Arc::new(AtomicUsize::new(0));
218 all_hits.push(hits.clone());
219
220 scope.spawn(move |_| {
221 let w2 = Worker::new_lifo();
222
223 while !done.load(SeqCst) {
224 if let Success(_) = s.steal() {
225 hits.fetch_add(1, SeqCst);
226 }
227
228 let _ = s.steal_batch(&w2);
229
230 if let Success(_) = s.steal_batch_and_pop(&w2) {
231 hits.fetch_add(1, SeqCst);
232 }
233
234 while let Some(_) = w2.pop() {
235 hits.fetch_add(1, SeqCst);
236 }
237 }
238 });
239 }
240
241 let mut rng = rand::thread_rng();
242 let mut my_hits = 0;
243 loop {
244 for i in 0..rng.gen_range(0, COUNT) {
245 if rng.gen_range(0, 3) == 0 && my_hits == 0 {
246 while let Some(_) = w.pop() {
247 my_hits += 1;
248 }
249 } else {
250 w.push(i);
251 }
252 }
253
254 if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) {
255 break;
256 }
257 }
258 done.store(true, SeqCst);
259 })
260 .unwrap();
261 }
262
263 #[test]
destructors()264 fn destructors() {
265 const THREADS: usize = 8;
266 const COUNT: usize = 50_000;
267 const STEPS: usize = 1000;
268
269 struct Elem(usize, Arc<Mutex<Vec<usize>>>);
270
271 impl Drop for Elem {
272 fn drop(&mut self) {
273 self.1.lock().unwrap().push(self.0);
274 }
275 }
276
277 let w = Worker::new_lifo();
278 let dropped = Arc::new(Mutex::new(Vec::new()));
279 let remaining = Arc::new(AtomicUsize::new(COUNT));
280
281 for i in 0..COUNT {
282 w.push(Elem(i, dropped.clone()));
283 }
284
285 scope(|scope| {
286 for _ in 0..THREADS {
287 let remaining = remaining.clone();
288 let s = w.stealer();
289
290 scope.spawn(move |_| {
291 let w2 = Worker::new_lifo();
292 let mut cnt = 0;
293
294 while cnt < STEPS {
295 if let Success(_) = s.steal() {
296 cnt += 1;
297 remaining.fetch_sub(1, SeqCst);
298 }
299
300 let _ = s.steal_batch(&w2);
301
302 if let Success(_) = s.steal_batch_and_pop(&w2) {
303 cnt += 1;
304 remaining.fetch_sub(1, SeqCst);
305 }
306
307 while let Some(_) = w2.pop() {
308 cnt += 1;
309 remaining.fetch_sub(1, SeqCst);
310 }
311 }
312 });
313 }
314
315 for _ in 0..STEPS {
316 if let Some(_) = w.pop() {
317 remaining.fetch_sub(1, SeqCst);
318 }
319 }
320 })
321 .unwrap();
322
323 let rem = remaining.load(SeqCst);
324 assert!(rem > 0);
325
326 {
327 let mut v = dropped.lock().unwrap();
328 assert_eq!(v.len(), COUNT - rem);
329 v.clear();
330 }
331
332 drop(w);
333
334 {
335 let mut v = dropped.lock().unwrap();
336 assert_eq!(v.len(), rem);
337 v.sort();
338 for pair in v.windows(2) {
339 assert_eq!(pair[0] + 1, pair[1]);
340 }
341 }
342 }
343