1 use std::sync::atomic::{AtomicUsize, Ordering};
2 
3 use concurrent_queue::{ConcurrentQueue, PopError, PushError};
4 use easy_parallel::Parallel;
5 
6 #[test]
smoke()7 fn smoke() {
8     let q = ConcurrentQueue::unbounded();
9     q.push(7).unwrap();
10     assert_eq!(q.pop(), Ok(7));
11 
12     q.push(8).unwrap();
13     assert_eq!(q.pop(), Ok(8));
14     assert!(q.pop().is_err());
15 }
16 
17 #[test]
len_empty_full()18 fn len_empty_full() {
19     let q = ConcurrentQueue::unbounded();
20 
21     assert_eq!(q.len(), 0);
22     assert_eq!(q.is_empty(), true);
23 
24     q.push(()).unwrap();
25 
26     assert_eq!(q.len(), 1);
27     assert_eq!(q.is_empty(), false);
28 
29     q.pop().unwrap();
30 
31     assert_eq!(q.len(), 0);
32     assert_eq!(q.is_empty(), true);
33 }
34 
35 #[test]
len()36 fn len() {
37     let q = ConcurrentQueue::unbounded();
38 
39     assert_eq!(q.len(), 0);
40 
41     for i in 0..50 {
42         q.push(i).unwrap();
43         assert_eq!(q.len(), i + 1);
44     }
45 
46     for i in 0..50 {
47         q.pop().unwrap();
48         assert_eq!(q.len(), 50 - i - 1);
49     }
50 
51     assert_eq!(q.len(), 0);
52 }
53 
54 #[test]
close()55 fn close() {
56     let q = ConcurrentQueue::unbounded();
57     assert_eq!(q.push(10), Ok(()));
58 
59     assert!(!q.is_closed());
60     assert!(q.close());
61 
62     assert!(q.is_closed());
63     assert!(!q.close());
64 
65     assert_eq!(q.push(20), Err(PushError::Closed(20)));
66     assert_eq!(q.pop(), Ok(10));
67     assert_eq!(q.pop(), Err(PopError::Closed));
68 }
69 
70 #[test]
spsc()71 fn spsc() {
72     const COUNT: usize = 100_000;
73 
74     let q = ConcurrentQueue::unbounded();
75 
76     Parallel::new()
77         .add(|| {
78             for i in 0..COUNT {
79                 loop {
80                     if let Ok(x) = q.pop() {
81                         assert_eq!(x, i);
82                         break;
83                     }
84                 }
85             }
86             assert!(q.pop().is_err());
87         })
88         .add(|| {
89             for i in 0..COUNT {
90                 q.push(i).unwrap();
91             }
92         })
93         .run();
94 }
95 
96 #[test]
mpmc()97 fn mpmc() {
98     const COUNT: usize = 25_000;
99     const THREADS: usize = 4;
100 
101     let q = ConcurrentQueue::<usize>::unbounded();
102     let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
103 
104     Parallel::new()
105         .each(0..THREADS, |_| {
106             for _ in 0..COUNT {
107                 let n = loop {
108                     if let Ok(x) = q.pop() {
109                         break x;
110                     }
111                 };
112                 v[n].fetch_add(1, Ordering::SeqCst);
113             }
114         })
115         .each(0..THREADS, |_| {
116             for i in 0..COUNT {
117                 q.push(i).unwrap();
118             }
119         })
120         .run();
121 
122     for c in v {
123         assert_eq!(c.load(Ordering::SeqCst), THREADS);
124     }
125 }
126 
127 #[test]
drops()128 fn drops() {
129     static DROPS: AtomicUsize = AtomicUsize::new(0);
130 
131     #[derive(Debug, PartialEq)]
132     struct DropCounter;
133 
134     impl Drop for DropCounter {
135         fn drop(&mut self) {
136             DROPS.fetch_add(1, Ordering::SeqCst);
137         }
138     }
139 
140     for _ in 0..100 {
141         let steps = fastrand::usize(0..10_000);
142         let additional = fastrand::usize(0..1000);
143 
144         DROPS.store(0, Ordering::SeqCst);
145         let q = ConcurrentQueue::unbounded();
146 
147         Parallel::new()
148             .add(|| {
149                 for _ in 0..steps {
150                     while q.pop().is_err() {}
151                 }
152             })
153             .add(|| {
154                 for _ in 0..steps {
155                     q.push(DropCounter).unwrap();
156                 }
157             })
158             .run();
159 
160         for _ in 0..additional {
161             q.push(DropCounter).unwrap();
162         }
163 
164         assert_eq!(DROPS.load(Ordering::SeqCst), steps);
165         drop(q);
166         assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
167     }
168 }
169