1 use std::sync::atomic::{AtomicUsize, Ordering};
2
3 use concurrent_queue::{ConcurrentQueue, PopError, PushError};
4 use easy_parallel::Parallel;
5
6 #[test]
smoke()7 fn smoke() {
8 let q = ConcurrentQueue::bounded(2);
9
10 q.push(7).unwrap();
11 assert_eq!(q.pop(), Ok(7));
12
13 q.push(8).unwrap();
14 assert_eq!(q.pop(), Ok(8));
15 assert!(q.pop().is_err());
16 }
17
18 #[test]
capacity()19 fn capacity() {
20 for i in 1..10 {
21 let q = ConcurrentQueue::<i32>::bounded(i);
22 assert_eq!(q.capacity(), Some(i));
23 }
24 }
25
26 #[test]
27 #[should_panic(expected = "capacity must be positive")]
zero_capacity()28 fn zero_capacity() {
29 let _ = ConcurrentQueue::<i32>::bounded(0);
30 }
31
32 #[test]
len_empty_full()33 fn len_empty_full() {
34 let q = ConcurrentQueue::bounded(2);
35
36 assert_eq!(q.len(), 0);
37 assert_eq!(q.is_empty(), true);
38 assert_eq!(q.is_full(), false);
39
40 q.push(()).unwrap();
41
42 assert_eq!(q.len(), 1);
43 assert_eq!(q.is_empty(), false);
44 assert_eq!(q.is_full(), false);
45
46 q.push(()).unwrap();
47
48 assert_eq!(q.len(), 2);
49 assert_eq!(q.is_empty(), false);
50 assert_eq!(q.is_full(), true);
51
52 q.pop().unwrap();
53
54 assert_eq!(q.len(), 1);
55 assert_eq!(q.is_empty(), false);
56 assert_eq!(q.is_full(), false);
57 }
58
59 #[test]
len()60 fn len() {
61 const COUNT: usize = 25_000;
62 const CAP: usize = 1000;
63
64 let q = ConcurrentQueue::bounded(CAP);
65 assert_eq!(q.len(), 0);
66
67 for _ in 0..CAP / 10 {
68 for i in 0..50 {
69 q.push(i).unwrap();
70 assert_eq!(q.len(), i + 1);
71 }
72
73 for i in 0..50 {
74 q.pop().unwrap();
75 assert_eq!(q.len(), 50 - i - 1);
76 }
77 }
78 assert_eq!(q.len(), 0);
79
80 for i in 0..CAP {
81 q.push(i).unwrap();
82 assert_eq!(q.len(), i + 1);
83 }
84
85 for _ in 0..CAP {
86 q.pop().unwrap();
87 }
88 assert_eq!(q.len(), 0);
89
90 Parallel::new()
91 .add(|| {
92 for i in 0..COUNT {
93 loop {
94 if let Ok(x) = q.pop() {
95 assert_eq!(x, i);
96 break;
97 }
98 }
99 let len = q.len();
100 assert!(len <= CAP);
101 }
102 })
103 .add(|| {
104 for i in 0..COUNT {
105 while q.push(i).is_err() {}
106 let len = q.len();
107 assert!(len <= CAP);
108 }
109 })
110 .run();
111
112 assert_eq!(q.len(), 0);
113 }
114
115 #[test]
close()116 fn close() {
117 let q = ConcurrentQueue::bounded(2);
118 assert_eq!(q.push(10), Ok(()));
119
120 assert!(!q.is_closed());
121 assert!(q.close());
122
123 assert!(q.is_closed());
124 assert!(!q.close());
125
126 assert_eq!(q.push(20), Err(PushError::Closed(20)));
127 assert_eq!(q.pop(), Ok(10));
128 assert_eq!(q.pop(), Err(PopError::Closed));
129 }
130
131 #[test]
spsc()132 fn spsc() {
133 const COUNT: usize = 100_000;
134
135 let q = ConcurrentQueue::bounded(3);
136
137 Parallel::new()
138 .add(|| {
139 for i in 0..COUNT {
140 loop {
141 if let Ok(x) = q.pop() {
142 assert_eq!(x, i);
143 break;
144 }
145 }
146 }
147 assert!(q.pop().is_err());
148 })
149 .add(|| {
150 for i in 0..COUNT {
151 while q.push(i).is_err() {}
152 }
153 })
154 .run();
155 }
156
157 #[test]
mpmc()158 fn mpmc() {
159 const COUNT: usize = 25_000;
160 const THREADS: usize = 4;
161
162 let q = ConcurrentQueue::<usize>::bounded(3);
163 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
164
165 Parallel::new()
166 .each(0..THREADS, |_| {
167 for _ in 0..COUNT {
168 let n = loop {
169 if let Ok(x) = q.pop() {
170 break x;
171 }
172 };
173 v[n].fetch_add(1, Ordering::SeqCst);
174 }
175 })
176 .each(0..THREADS, |_| {
177 for i in 0..COUNT {
178 while q.push(i).is_err() {}
179 }
180 })
181 .run();
182
183 for c in v {
184 assert_eq!(c.load(Ordering::SeqCst), THREADS);
185 }
186 }
187
188 #[test]
drops()189 fn drops() {
190 const RUNS: usize = 100;
191
192 static DROPS: AtomicUsize = AtomicUsize::new(0);
193
194 #[derive(Debug, PartialEq)]
195 struct DropCounter;
196
197 impl Drop for DropCounter {
198 fn drop(&mut self) {
199 DROPS.fetch_add(1, Ordering::SeqCst);
200 }
201 }
202
203 for _ in 0..RUNS {
204 let steps = fastrand::usize(..10_000);
205 let additional = fastrand::usize(..50);
206
207 DROPS.store(0, Ordering::SeqCst);
208 let q = ConcurrentQueue::bounded(50);
209
210 Parallel::new()
211 .add(|| {
212 for _ in 0..steps {
213 while q.pop().is_err() {}
214 }
215 })
216 .add(|| {
217 for _ in 0..steps {
218 while q.push(DropCounter).is_err() {
219 DROPS.fetch_sub(1, Ordering::SeqCst);
220 }
221 }
222 })
223 .run();
224
225 for _ in 0..additional {
226 q.push(DropCounter).unwrap();
227 }
228
229 assert_eq!(DROPS.load(Ordering::SeqCst), steps);
230 drop(q);
231 assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
232 }
233 }
234
235 #[test]
linearizable()236 fn linearizable() {
237 const COUNT: usize = 25_000;
238 const THREADS: usize = 4;
239
240 let q = ConcurrentQueue::bounded(THREADS);
241
242 Parallel::new()
243 .each(0..THREADS, |_| {
244 for _ in 0..COUNT {
245 while q.push(0).is_err() {}
246 q.pop().unwrap();
247 }
248 })
249 .run();
250 }
251