1 use std::sync::atomic::{AtomicUsize, Ordering};
2
3 use concurrent_queue::{ConcurrentQueue, PopError, PushError};
4 use easy_parallel::Parallel;
5
6 #[test]
smoke()7 fn smoke() {
8 let q = ConcurrentQueue::bounded(1);
9
10 q.push(7).unwrap();
11 assert_eq!(q.pop(), Ok(7));
12
13 q.push(8).unwrap();
14 assert_eq!(q.pop(), Ok(8));
15 assert!(q.pop().is_err());
16 }
17
18 #[test]
capacity()19 fn capacity() {
20 let q = ConcurrentQueue::<i32>::bounded(1);
21 assert_eq!(q.capacity(), Some(1));
22 }
23
24 #[test]
len_empty_full()25 fn len_empty_full() {
26 let q = ConcurrentQueue::bounded(1);
27
28 assert_eq!(q.len(), 0);
29 assert_eq!(q.is_empty(), true);
30 assert_eq!(q.is_full(), false);
31
32 q.push(()).unwrap();
33
34 assert_eq!(q.len(), 1);
35 assert_eq!(q.is_empty(), false);
36 assert_eq!(q.is_full(), true);
37
38 q.pop().unwrap();
39
40 assert_eq!(q.len(), 0);
41 assert_eq!(q.is_empty(), true);
42 assert_eq!(q.is_full(), false);
43 }
44
45 #[test]
close()46 fn close() {
47 let q = ConcurrentQueue::<i32>::bounded(1);
48 assert_eq!(q.push(10), Ok(()));
49
50 assert!(!q.is_closed());
51 assert!(q.close());
52
53 assert!(q.is_closed());
54 assert!(!q.close());
55
56 assert_eq!(q.push(20), Err(PushError::Closed(20)));
57 assert_eq!(q.pop(), Ok(10));
58 assert_eq!(q.pop(), Err(PopError::Closed));
59 }
60
61 #[test]
spsc()62 fn spsc() {
63 const COUNT: usize = 100_000;
64
65 let q = ConcurrentQueue::bounded(1);
66
67 Parallel::new()
68 .add(|| {
69 for i in 0..COUNT {
70 loop {
71 if let Ok(x) = q.pop() {
72 assert_eq!(x, i);
73 break;
74 }
75 }
76 }
77 assert!(q.pop().is_err());
78 })
79 .add(|| {
80 for i in 0..COUNT {
81 while q.push(i).is_err() {}
82 }
83 })
84 .run();
85 }
86
87 #[test]
mpmc()88 fn mpmc() {
89 const COUNT: usize = 25_000;
90 const THREADS: usize = 1;
91
92 let q = ConcurrentQueue::<usize>::bounded(THREADS);
93 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
94
95 Parallel::new()
96 .each(0..THREADS, |_| {
97 for _ in 0..COUNT {
98 let n = loop {
99 if let Ok(x) = q.pop() {
100 break x;
101 }
102 };
103 v[n].fetch_add(1, Ordering::SeqCst);
104 }
105 })
106 .each(0..THREADS, |_| {
107 for i in 0..COUNT {
108 while q.push(i).is_err() {}
109 }
110 })
111 .run();
112
113 for c in v {
114 assert_eq!(c.load(Ordering::SeqCst), THREADS);
115 }
116 }
117
118 #[test]
drops()119 fn drops() {
120 const RUNS: usize = 100;
121
122 static DROPS: AtomicUsize = AtomicUsize::new(0);
123
124 #[derive(Debug, PartialEq)]
125 struct DropCounter;
126
127 impl Drop for DropCounter {
128 fn drop(&mut self) {
129 DROPS.fetch_add(1, Ordering::SeqCst);
130 }
131 }
132
133 for _ in 0..RUNS {
134 let steps = fastrand::usize(..10_000);
135 let additional = fastrand::usize(0..=1);
136
137 DROPS.store(0, Ordering::SeqCst);
138 let q = ConcurrentQueue::bounded(1);
139
140 Parallel::new()
141 .add(|| {
142 for _ in 0..steps {
143 while q.pop().is_err() {}
144 }
145 })
146 .add(|| {
147 for _ in 0..steps {
148 while q.push(DropCounter).is_err() {
149 DROPS.fetch_sub(1, Ordering::SeqCst);
150 }
151 }
152 })
153 .run();
154
155 for _ in 0..additional {
156 q.push(DropCounter).unwrap();
157 }
158
159 assert_eq!(DROPS.load(Ordering::SeqCst), steps);
160 drop(q);
161 assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
162 }
163 }
164
165 #[test]
linearizable()166 fn linearizable() {
167 const COUNT: usize = 25_000;
168 const THREADS: usize = 4;
169
170 let q = ConcurrentQueue::bounded(1);
171
172 Parallel::new()
173 .each(0..THREADS, |_| {
174 for _ in 0..COUNT {
175 while q.push(0).is_err() {}
176 q.pop().unwrap();
177 }
178 })
179 .run();
180 }
181