1 use std::sync::atomic::{AtomicUsize, Ordering};
2
3 use concurrent_queue::{ConcurrentQueue, PopError, PushError};
4 use easy_parallel::Parallel;
5
6 #[test]
smoke()7 fn smoke() {
8 let q = ConcurrentQueue::unbounded();
9 q.push(7).unwrap();
10 assert_eq!(q.pop(), Ok(7));
11
12 q.push(8).unwrap();
13 assert_eq!(q.pop(), Ok(8));
14 assert!(q.pop().is_err());
15 }
16
17 #[test]
len_empty_full()18 fn len_empty_full() {
19 let q = ConcurrentQueue::unbounded();
20
21 assert_eq!(q.len(), 0);
22 assert_eq!(q.is_empty(), true);
23
24 q.push(()).unwrap();
25
26 assert_eq!(q.len(), 1);
27 assert_eq!(q.is_empty(), false);
28
29 q.pop().unwrap();
30
31 assert_eq!(q.len(), 0);
32 assert_eq!(q.is_empty(), true);
33 }
34
35 #[test]
len()36 fn len() {
37 let q = ConcurrentQueue::unbounded();
38
39 assert_eq!(q.len(), 0);
40
41 for i in 0..50 {
42 q.push(i).unwrap();
43 assert_eq!(q.len(), i + 1);
44 }
45
46 for i in 0..50 {
47 q.pop().unwrap();
48 assert_eq!(q.len(), 50 - i - 1);
49 }
50
51 assert_eq!(q.len(), 0);
52 }
53
54 #[test]
close()55 fn close() {
56 let q = ConcurrentQueue::unbounded();
57 assert_eq!(q.push(10), Ok(()));
58
59 assert!(!q.is_closed());
60 assert!(q.close());
61
62 assert!(q.is_closed());
63 assert!(!q.close());
64
65 assert_eq!(q.push(20), Err(PushError::Closed(20)));
66 assert_eq!(q.pop(), Ok(10));
67 assert_eq!(q.pop(), Err(PopError::Closed));
68 }
69
70 #[test]
spsc()71 fn spsc() {
72 const COUNT: usize = 100_000;
73
74 let q = ConcurrentQueue::unbounded();
75
76 Parallel::new()
77 .add(|| {
78 for i in 0..COUNT {
79 loop {
80 if let Ok(x) = q.pop() {
81 assert_eq!(x, i);
82 break;
83 }
84 }
85 }
86 assert!(q.pop().is_err());
87 })
88 .add(|| {
89 for i in 0..COUNT {
90 q.push(i).unwrap();
91 }
92 })
93 .run();
94 }
95
96 #[test]
mpmc()97 fn mpmc() {
98 const COUNT: usize = 25_000;
99 const THREADS: usize = 4;
100
101 let q = ConcurrentQueue::<usize>::unbounded();
102 let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
103
104 Parallel::new()
105 .each(0..THREADS, |_| {
106 for _ in 0..COUNT {
107 let n = loop {
108 if let Ok(x) = q.pop() {
109 break x;
110 }
111 };
112 v[n].fetch_add(1, Ordering::SeqCst);
113 }
114 })
115 .each(0..THREADS, |_| {
116 for i in 0..COUNT {
117 q.push(i).unwrap();
118 }
119 })
120 .run();
121
122 for c in v {
123 assert_eq!(c.load(Ordering::SeqCst), THREADS);
124 }
125 }
126
127 #[test]
drops()128 fn drops() {
129 static DROPS: AtomicUsize = AtomicUsize::new(0);
130
131 #[derive(Debug, PartialEq)]
132 struct DropCounter;
133
134 impl Drop for DropCounter {
135 fn drop(&mut self) {
136 DROPS.fetch_add(1, Ordering::SeqCst);
137 }
138 }
139
140 for _ in 0..100 {
141 let steps = fastrand::usize(0..10_000);
142 let additional = fastrand::usize(0..1000);
143
144 DROPS.store(0, Ordering::SeqCst);
145 let q = ConcurrentQueue::unbounded();
146
147 Parallel::new()
148 .add(|| {
149 for _ in 0..steps {
150 while q.pop().is_err() {}
151 }
152 })
153 .add(|| {
154 for _ in 0..steps {
155 q.push(DropCounter).unwrap();
156 }
157 })
158 .run();
159
160 for _ in 0..additional {
161 q.push(DropCounter).unwrap();
162 }
163
164 assert_eq!(DROPS.load(Ordering::SeqCst), steps);
165 drop(q);
166 assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
167 }
168 }
169