1 use std::sync::atomic::Ordering::SeqCst;
2 use std::sync::atomic::{AtomicBool, AtomicUsize};
3 use std::sync::{Arc, Mutex};
4
5 use crossbeam_deque::Steal::{Empty, Success};
6 use crossbeam_deque::Worker;
7 use crossbeam_utils::thread::scope;
8 use rand::Rng;
9
10 #[test]
smoke()11 fn smoke() {
12 let w = Worker::new_lifo();
13 let s = w.stealer();
14 assert_eq!(w.pop(), None);
15 assert_eq!(s.steal(), Empty);
16
17 w.push(1);
18 assert_eq!(w.pop(), Some(1));
19 assert_eq!(w.pop(), None);
20 assert_eq!(s.steal(), Empty);
21
22 w.push(2);
23 assert_eq!(s.steal(), Success(2));
24 assert_eq!(s.steal(), Empty);
25 assert_eq!(w.pop(), None);
26
27 w.push(3);
28 w.push(4);
29 w.push(5);
30 assert_eq!(s.steal(), Success(3));
31 assert_eq!(s.steal(), Success(4));
32 assert_eq!(s.steal(), Success(5));
33 assert_eq!(s.steal(), Empty);
34
35 w.push(6);
36 w.push(7);
37 w.push(8);
38 w.push(9);
39 assert_eq!(w.pop(), Some(9));
40 assert_eq!(s.steal(), Success(6));
41 assert_eq!(w.pop(), Some(8));
42 assert_eq!(w.pop(), Some(7));
43 assert_eq!(w.pop(), None);
44 }
45
46 #[test]
is_empty()47 fn is_empty() {
48 let w = Worker::new_lifo();
49 let s = w.stealer();
50
51 assert!(w.is_empty());
52 w.push(1);
53 assert!(!w.is_empty());
54 w.push(2);
55 assert!(!w.is_empty());
56 let _ = w.pop();
57 assert!(!w.is_empty());
58 let _ = w.pop();
59 assert!(w.is_empty());
60
61 assert!(s.is_empty());
62 w.push(1);
63 assert!(!s.is_empty());
64 w.push(2);
65 assert!(!s.is_empty());
66 let _ = s.steal();
67 assert!(!s.is_empty());
68 let _ = s.steal();
69 assert!(s.is_empty());
70 }
71
72 #[test]
spsc()73 fn spsc() {
74 const STEPS: usize = 50_000;
75
76 let w = Worker::new_lifo();
77 let s = w.stealer();
78
79 scope(|scope| {
80 scope.spawn(|_| {
81 for i in 0..STEPS {
82 loop {
83 if let Success(v) = s.steal() {
84 assert_eq!(i, v);
85 break;
86 }
87 }
88 }
89
90 assert_eq!(s.steal(), Empty);
91 });
92
93 for i in 0..STEPS {
94 w.push(i);
95 }
96 })
97 .unwrap();
98 }
99
100 #[test]
stampede()101 fn stampede() {
102 const THREADS: usize = 8;
103 const COUNT: usize = 50_000;
104
105 let w = Worker::new_lifo();
106
107 for i in 0..COUNT {
108 w.push(Box::new(i + 1));
109 }
110 let remaining = Arc::new(AtomicUsize::new(COUNT));
111
112 scope(|scope| {
113 for _ in 0..THREADS {
114 let s = w.stealer();
115 let remaining = remaining.clone();
116
117 scope.spawn(move |_| {
118 let mut last = 0;
119 while remaining.load(SeqCst) > 0 {
120 if let Success(x) = s.steal() {
121 assert!(last < *x);
122 last = *x;
123 remaining.fetch_sub(1, SeqCst);
124 }
125 }
126 });
127 }
128
129 let mut last = COUNT + 1;
130 while remaining.load(SeqCst) > 0 {
131 if let Some(x) = w.pop() {
132 assert!(last > *x);
133 last = *x;
134 remaining.fetch_sub(1, SeqCst);
135 }
136 }
137 })
138 .unwrap();
139 }
140
141 #[test]
stress()142 fn stress() {
143 const THREADS: usize = 8;
144 const COUNT: usize = 50_000;
145
146 let w = Worker::new_lifo();
147 let done = Arc::new(AtomicBool::new(false));
148 let hits = Arc::new(AtomicUsize::new(0));
149
150 scope(|scope| {
151 for _ in 0..THREADS {
152 let s = w.stealer();
153 let done = done.clone();
154 let hits = hits.clone();
155
156 scope.spawn(move |_| {
157 let w2 = Worker::new_lifo();
158
159 while !done.load(SeqCst) {
160 if let Success(_) = s.steal() {
161 hits.fetch_add(1, SeqCst);
162 }
163
164 let _ = s.steal_batch(&w2);
165
166 if let Success(_) = s.steal_batch_and_pop(&w2) {
167 hits.fetch_add(1, SeqCst);
168 }
169
170 while w2.pop().is_some() {
171 hits.fetch_add(1, SeqCst);
172 }
173 }
174 });
175 }
176
177 let mut rng = rand::thread_rng();
178 let mut expected = 0;
179 while expected < COUNT {
180 if rng.gen_range(0..3) == 0 {
181 while w.pop().is_some() {
182 hits.fetch_add(1, SeqCst);
183 }
184 } else {
185 w.push(expected);
186 expected += 1;
187 }
188 }
189
190 while hits.load(SeqCst) < COUNT {
191 while w.pop().is_some() {
192 hits.fetch_add(1, SeqCst);
193 }
194 }
195 done.store(true, SeqCst);
196 })
197 .unwrap();
198 }
199
200 #[test]
no_starvation()201 fn no_starvation() {
202 const THREADS: usize = 8;
203 const COUNT: usize = 50_000;
204
205 let w = Worker::new_lifo();
206 let done = Arc::new(AtomicBool::new(false));
207 let mut all_hits = Vec::new();
208
209 scope(|scope| {
210 for _ in 0..THREADS {
211 let s = w.stealer();
212 let done = done.clone();
213 let hits = Arc::new(AtomicUsize::new(0));
214 all_hits.push(hits.clone());
215
216 scope.spawn(move |_| {
217 let w2 = Worker::new_lifo();
218
219 while !done.load(SeqCst) {
220 if let Success(_) = s.steal() {
221 hits.fetch_add(1, SeqCst);
222 }
223
224 let _ = s.steal_batch(&w2);
225
226 if let Success(_) = s.steal_batch_and_pop(&w2) {
227 hits.fetch_add(1, SeqCst);
228 }
229
230 while w2.pop().is_some() {
231 hits.fetch_add(1, SeqCst);
232 }
233 }
234 });
235 }
236
237 let mut rng = rand::thread_rng();
238 let mut my_hits = 0;
239 loop {
240 for i in 0..rng.gen_range(0..COUNT) {
241 if rng.gen_range(0..3) == 0 && my_hits == 0 {
242 while w.pop().is_some() {
243 my_hits += 1;
244 }
245 } else {
246 w.push(i);
247 }
248 }
249
250 if my_hits > 0 && all_hits.iter().all(|h| h.load(SeqCst) > 0) {
251 break;
252 }
253 }
254 done.store(true, SeqCst);
255 })
256 .unwrap();
257 }
258
259 #[test]
destructors()260 fn destructors() {
261 const THREADS: usize = 8;
262 const COUNT: usize = 50_000;
263 const STEPS: usize = 1000;
264
265 struct Elem(usize, Arc<Mutex<Vec<usize>>>);
266
267 impl Drop for Elem {
268 fn drop(&mut self) {
269 self.1.lock().unwrap().push(self.0);
270 }
271 }
272
273 let w = Worker::new_lifo();
274 let dropped = Arc::new(Mutex::new(Vec::new()));
275 let remaining = Arc::new(AtomicUsize::new(COUNT));
276
277 for i in 0..COUNT {
278 w.push(Elem(i, dropped.clone()));
279 }
280
281 scope(|scope| {
282 for _ in 0..THREADS {
283 let remaining = remaining.clone();
284 let s = w.stealer();
285
286 scope.spawn(move |_| {
287 let w2 = Worker::new_lifo();
288 let mut cnt = 0;
289
290 while cnt < STEPS {
291 if let Success(_) = s.steal() {
292 cnt += 1;
293 remaining.fetch_sub(1, SeqCst);
294 }
295
296 let _ = s.steal_batch(&w2);
297
298 if let Success(_) = s.steal_batch_and_pop(&w2) {
299 cnt += 1;
300 remaining.fetch_sub(1, SeqCst);
301 }
302
303 while w2.pop().is_some() {
304 cnt += 1;
305 remaining.fetch_sub(1, SeqCst);
306 }
307 }
308 });
309 }
310
311 for _ in 0..STEPS {
312 if w.pop().is_some() {
313 remaining.fetch_sub(1, SeqCst);
314 }
315 }
316 })
317 .unwrap();
318
319 let rem = remaining.load(SeqCst);
320 assert!(rem > 0);
321
322 {
323 let mut v = dropped.lock().unwrap();
324 assert_eq!(v.len(), COUNT - rem);
325 v.clear();
326 }
327
328 drop(w);
329
330 {
331 let mut v = dropped.lock().unwrap();
332 assert_eq!(v.len(), rem);
333 v.sort();
334 for pair in v.windows(2) {
335 assert_eq!(pair[0] + 1, pair[1]);
336 }
337 }
338 }
339