1 use std::sync::atomic::{AtomicUsize, Ordering};
2 
3 use concurrent_queue::{ConcurrentQueue, PopError, PushError};
4 use easy_parallel::Parallel;
5 
6 #[test]
smoke()7 fn smoke() {
8     let q = ConcurrentQueue::bounded(1);
9 
10     q.push(7).unwrap();
11     assert_eq!(q.pop(), Ok(7));
12 
13     q.push(8).unwrap();
14     assert_eq!(q.pop(), Ok(8));
15     assert!(q.pop().is_err());
16 }
17 
18 #[test]
capacity()19 fn capacity() {
20     let q = ConcurrentQueue::<i32>::bounded(1);
21     assert_eq!(q.capacity(), Some(1));
22 }
23 
24 #[test]
len_empty_full()25 fn len_empty_full() {
26     let q = ConcurrentQueue::bounded(1);
27 
28     assert_eq!(q.len(), 0);
29     assert_eq!(q.is_empty(), true);
30     assert_eq!(q.is_full(), false);
31 
32     q.push(()).unwrap();
33 
34     assert_eq!(q.len(), 1);
35     assert_eq!(q.is_empty(), false);
36     assert_eq!(q.is_full(), true);
37 
38     q.pop().unwrap();
39 
40     assert_eq!(q.len(), 0);
41     assert_eq!(q.is_empty(), true);
42     assert_eq!(q.is_full(), false);
43 }
44 
45 #[test]
close()46 fn close() {
47     let q = ConcurrentQueue::<i32>::bounded(1);
48     assert_eq!(q.push(10), Ok(()));
49 
50     assert!(!q.is_closed());
51     assert!(q.close());
52 
53     assert!(q.is_closed());
54     assert!(!q.close());
55 
56     assert_eq!(q.push(20), Err(PushError::Closed(20)));
57     assert_eq!(q.pop(), Ok(10));
58     assert_eq!(q.pop(), Err(PopError::Closed));
59 }
60 
61 #[test]
spsc()62 fn spsc() {
63     const COUNT: usize = 100_000;
64 
65     let q = ConcurrentQueue::bounded(1);
66 
67     Parallel::new()
68         .add(|| {
69             for i in 0..COUNT {
70                 loop {
71                     if let Ok(x) = q.pop() {
72                         assert_eq!(x, i);
73                         break;
74                     }
75                 }
76             }
77             assert!(q.pop().is_err());
78         })
79         .add(|| {
80             for i in 0..COUNT {
81                 while q.push(i).is_err() {}
82             }
83         })
84         .run();
85 }
86 
87 #[test]
mpmc()88 fn mpmc() {
89     const COUNT: usize = 25_000;
90     const THREADS: usize = 1;
91 
92     let q = ConcurrentQueue::<usize>::bounded(THREADS);
93     let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
94 
95     Parallel::new()
96         .each(0..THREADS, |_| {
97             for _ in 0..COUNT {
98                 let n = loop {
99                     if let Ok(x) = q.pop() {
100                         break x;
101                     }
102                 };
103                 v[n].fetch_add(1, Ordering::SeqCst);
104             }
105         })
106         .each(0..THREADS, |_| {
107             for i in 0..COUNT {
108                 while q.push(i).is_err() {}
109             }
110         })
111         .run();
112 
113     for c in v {
114         assert_eq!(c.load(Ordering::SeqCst), THREADS);
115     }
116 }
117 
118 #[test]
drops()119 fn drops() {
120     const RUNS: usize = 100;
121 
122     static DROPS: AtomicUsize = AtomicUsize::new(0);
123 
124     #[derive(Debug, PartialEq)]
125     struct DropCounter;
126 
127     impl Drop for DropCounter {
128         fn drop(&mut self) {
129             DROPS.fetch_add(1, Ordering::SeqCst);
130         }
131     }
132 
133     for _ in 0..RUNS {
134         let steps = fastrand::usize(..10_000);
135         let additional = fastrand::usize(0..=1);
136 
137         DROPS.store(0, Ordering::SeqCst);
138         let q = ConcurrentQueue::bounded(1);
139 
140         Parallel::new()
141             .add(|| {
142                 for _ in 0..steps {
143                     while q.pop().is_err() {}
144                 }
145             })
146             .add(|| {
147                 for _ in 0..steps {
148                     while q.push(DropCounter).is_err() {
149                         DROPS.fetch_sub(1, Ordering::SeqCst);
150                     }
151                 }
152             })
153             .run();
154 
155         for _ in 0..additional {
156             q.push(DropCounter).unwrap();
157         }
158 
159         assert_eq!(DROPS.load(Ordering::SeqCst), steps);
160         drop(q);
161         assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
162     }
163 }
164 
165 #[test]
linearizable()166 fn linearizable() {
167     const COUNT: usize = 25_000;
168     const THREADS: usize = 4;
169 
170     let q = ConcurrentQueue::bounded(1);
171 
172     Parallel::new()
173         .each(0..THREADS, |_| {
174             for _ in 0..COUNT {
175                 while q.push(0).is_err() {}
176                 q.pop().unwrap();
177             }
178         })
179         .run();
180 }
181