1 use std::sync::atomic::{AtomicUsize, Ordering};
2 
3 use concurrent_queue::{ConcurrentQueue, PopError, PushError};
4 use easy_parallel::Parallel;
5 
6 #[test]
smoke()7 fn smoke() {
8     let q = ConcurrentQueue::bounded(2);
9 
10     q.push(7).unwrap();
11     assert_eq!(q.pop(), Ok(7));
12 
13     q.push(8).unwrap();
14     assert_eq!(q.pop(), Ok(8));
15     assert!(q.pop().is_err());
16 }
17 
18 #[test]
capacity()19 fn capacity() {
20     for i in 1..10 {
21         let q = ConcurrentQueue::<i32>::bounded(i);
22         assert_eq!(q.capacity(), Some(i));
23     }
24 }
25 
26 #[test]
27 #[should_panic(expected = "capacity must be positive")]
zero_capacity()28 fn zero_capacity() {
29     let _ = ConcurrentQueue::<i32>::bounded(0);
30 }
31 
32 #[test]
len_empty_full()33 fn len_empty_full() {
34     let q = ConcurrentQueue::bounded(2);
35 
36     assert_eq!(q.len(), 0);
37     assert_eq!(q.is_empty(), true);
38     assert_eq!(q.is_full(), false);
39 
40     q.push(()).unwrap();
41 
42     assert_eq!(q.len(), 1);
43     assert_eq!(q.is_empty(), false);
44     assert_eq!(q.is_full(), false);
45 
46     q.push(()).unwrap();
47 
48     assert_eq!(q.len(), 2);
49     assert_eq!(q.is_empty(), false);
50     assert_eq!(q.is_full(), true);
51 
52     q.pop().unwrap();
53 
54     assert_eq!(q.len(), 1);
55     assert_eq!(q.is_empty(), false);
56     assert_eq!(q.is_full(), false);
57 }
58 
59 #[test]
len()60 fn len() {
61     const COUNT: usize = 25_000;
62     const CAP: usize = 1000;
63 
64     let q = ConcurrentQueue::bounded(CAP);
65     assert_eq!(q.len(), 0);
66 
67     for _ in 0..CAP / 10 {
68         for i in 0..50 {
69             q.push(i).unwrap();
70             assert_eq!(q.len(), i + 1);
71         }
72 
73         for i in 0..50 {
74             q.pop().unwrap();
75             assert_eq!(q.len(), 50 - i - 1);
76         }
77     }
78     assert_eq!(q.len(), 0);
79 
80     for i in 0..CAP {
81         q.push(i).unwrap();
82         assert_eq!(q.len(), i + 1);
83     }
84 
85     for _ in 0..CAP {
86         q.pop().unwrap();
87     }
88     assert_eq!(q.len(), 0);
89 
90     Parallel::new()
91         .add(|| {
92             for i in 0..COUNT {
93                 loop {
94                     if let Ok(x) = q.pop() {
95                         assert_eq!(x, i);
96                         break;
97                     }
98                 }
99                 let len = q.len();
100                 assert!(len <= CAP);
101             }
102         })
103         .add(|| {
104             for i in 0..COUNT {
105                 while q.push(i).is_err() {}
106                 let len = q.len();
107                 assert!(len <= CAP);
108             }
109         })
110         .run();
111 
112     assert_eq!(q.len(), 0);
113 }
114 
115 #[test]
close()116 fn close() {
117     let q = ConcurrentQueue::bounded(2);
118     assert_eq!(q.push(10), Ok(()));
119 
120     assert!(!q.is_closed());
121     assert!(q.close());
122 
123     assert!(q.is_closed());
124     assert!(!q.close());
125 
126     assert_eq!(q.push(20), Err(PushError::Closed(20)));
127     assert_eq!(q.pop(), Ok(10));
128     assert_eq!(q.pop(), Err(PopError::Closed));
129 }
130 
131 #[test]
spsc()132 fn spsc() {
133     const COUNT: usize = 100_000;
134 
135     let q = ConcurrentQueue::bounded(3);
136 
137     Parallel::new()
138         .add(|| {
139             for i in 0..COUNT {
140                 loop {
141                     if let Ok(x) = q.pop() {
142                         assert_eq!(x, i);
143                         break;
144                     }
145                 }
146             }
147             assert!(q.pop().is_err());
148         })
149         .add(|| {
150             for i in 0..COUNT {
151                 while q.push(i).is_err() {}
152             }
153         })
154         .run();
155 }
156 
157 #[test]
mpmc()158 fn mpmc() {
159     const COUNT: usize = 25_000;
160     const THREADS: usize = 4;
161 
162     let q = ConcurrentQueue::<usize>::bounded(3);
163     let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
164 
165     Parallel::new()
166         .each(0..THREADS, |_| {
167             for _ in 0..COUNT {
168                 let n = loop {
169                     if let Ok(x) = q.pop() {
170                         break x;
171                     }
172                 };
173                 v[n].fetch_add(1, Ordering::SeqCst);
174             }
175         })
176         .each(0..THREADS, |_| {
177             for i in 0..COUNT {
178                 while q.push(i).is_err() {}
179             }
180         })
181         .run();
182 
183     for c in v {
184         assert_eq!(c.load(Ordering::SeqCst), THREADS);
185     }
186 }
187 
188 #[test]
drops()189 fn drops() {
190     const RUNS: usize = 100;
191 
192     static DROPS: AtomicUsize = AtomicUsize::new(0);
193 
194     #[derive(Debug, PartialEq)]
195     struct DropCounter;
196 
197     impl Drop for DropCounter {
198         fn drop(&mut self) {
199             DROPS.fetch_add(1, Ordering::SeqCst);
200         }
201     }
202 
203     for _ in 0..RUNS {
204         let steps = fastrand::usize(..10_000);
205         let additional = fastrand::usize(..50);
206 
207         DROPS.store(0, Ordering::SeqCst);
208         let q = ConcurrentQueue::bounded(50);
209 
210         Parallel::new()
211             .add(|| {
212                 for _ in 0..steps {
213                     while q.pop().is_err() {}
214                 }
215             })
216             .add(|| {
217                 for _ in 0..steps {
218                     while q.push(DropCounter).is_err() {
219                         DROPS.fetch_sub(1, Ordering::SeqCst);
220                     }
221                 }
222             })
223             .run();
224 
225         for _ in 0..additional {
226             q.push(DropCounter).unwrap();
227         }
228 
229         assert_eq!(DROPS.load(Ordering::SeqCst), steps);
230         drop(q);
231         assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
232     }
233 }
234 
235 #[test]
linearizable()236 fn linearizable() {
237     const COUNT: usize = 25_000;
238     const THREADS: usize = 4;
239 
240     let q = ConcurrentQueue::bounded(THREADS);
241 
242     Parallel::new()
243         .each(0..THREADS, |_| {
244             for _ in 0..COUNT {
245                 while q.push(0).is_err() {}
246                 q.pop().unwrap();
247             }
248         })
249         .run();
250 }
251