1 //! Tests for the after channel flavor.
2 
3 use std::sync::atomic::AtomicUsize;
4 use std::sync::atomic::Ordering;
5 use std::thread;
6 use std::time::{Duration, Instant};
7 
8 use crossbeam_channel::{after, select, Select, TryRecvError};
9 use crossbeam_utils::thread::scope;
10 
ms(ms: u64) -> Duration11 fn ms(ms: u64) -> Duration {
12     Duration::from_millis(ms)
13 }
14 
15 #[test]
fire()16 fn fire() {
17     let start = Instant::now();
18     let r = after(ms(50));
19 
20     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
21     thread::sleep(ms(100));
22 
23     let fired = r.try_recv().unwrap();
24     assert!(start < fired);
25     assert!(fired - start >= ms(50));
26 
27     let now = Instant::now();
28     assert!(fired < now);
29     assert!(now - fired >= ms(50));
30 
31     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
32 
33     select! {
34         recv(r) -> _ => panic!(),
35         default => {}
36     }
37 
38     select! {
39         recv(r) -> _ => panic!(),
40         recv(after(ms(200))) -> _ => {}
41     }
42 }
43 
44 #[test]
capacity()45 fn capacity() {
46     const COUNT: usize = 10;
47 
48     for i in 0..COUNT {
49         let r = after(ms(i as u64));
50         assert_eq!(r.capacity(), Some(1));
51     }
52 }
53 
54 #[test]
len_empty_full()55 fn len_empty_full() {
56     let r = after(ms(50));
57 
58     assert_eq!(r.len(), 0);
59     assert_eq!(r.is_empty(), true);
60     assert_eq!(r.is_full(), false);
61 
62     thread::sleep(ms(100));
63 
64     assert_eq!(r.len(), 1);
65     assert_eq!(r.is_empty(), false);
66     assert_eq!(r.is_full(), true);
67 
68     r.try_recv().unwrap();
69 
70     assert_eq!(r.len(), 0);
71     assert_eq!(r.is_empty(), true);
72     assert_eq!(r.is_full(), false);
73 }
74 
75 #[test]
try_recv()76 fn try_recv() {
77     let r = after(ms(200));
78     assert!(r.try_recv().is_err());
79 
80     thread::sleep(ms(100));
81     assert!(r.try_recv().is_err());
82 
83     thread::sleep(ms(200));
84     assert!(r.try_recv().is_ok());
85     assert!(r.try_recv().is_err());
86 
87     thread::sleep(ms(200));
88     assert!(r.try_recv().is_err());
89 }
90 
91 #[test]
recv()92 fn recv() {
93     let start = Instant::now();
94     let r = after(ms(50));
95 
96     let fired = r.recv().unwrap();
97     assert!(start < fired);
98     assert!(fired - start >= ms(50));
99 
100     let now = Instant::now();
101     assert!(fired < now);
102     assert!(now - fired < fired - start);
103 
104     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
105 }
106 
107 #[test]
recv_timeout()108 fn recv_timeout() {
109     let start = Instant::now();
110     let r = after(ms(200));
111 
112     assert!(r.recv_timeout(ms(100)).is_err());
113     let now = Instant::now();
114     assert!(now - start >= ms(100));
115     assert!(now - start <= ms(150));
116 
117     let fired = r.recv_timeout(ms(200)).unwrap();
118     assert!(fired - start >= ms(200));
119     assert!(fired - start <= ms(250));
120 
121     assert!(r.recv_timeout(ms(200)).is_err());
122     let now = Instant::now();
123     assert!(now - start >= ms(400));
124     assert!(now - start <= ms(450));
125 
126     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
127 }
128 
129 #[test]
recv_two()130 fn recv_two() {
131     let r1 = after(ms(50));
132     let r2 = after(ms(50));
133 
134     scope(|scope| {
135         scope.spawn(|_| {
136             select! {
137                 recv(r1) -> _ => {}
138                 recv(r2) -> _ => {}
139             }
140         });
141         scope.spawn(|_| {
142             select! {
143                 recv(r1) -> _ => {}
144                 recv(r2) -> _ => {}
145             }
146         });
147     })
148     .unwrap();
149 }
150 
151 #[test]
recv_race()152 fn recv_race() {
153     select! {
154         recv(after(ms(50))) -> _ => {}
155         recv(after(ms(100))) -> _ => panic!(),
156     }
157 
158     select! {
159         recv(after(ms(100))) -> _ => panic!(),
160         recv(after(ms(50))) -> _ => {}
161     }
162 }
163 
164 #[test]
stress_default()165 fn stress_default() {
166     const COUNT: usize = 10;
167 
168     for _ in 0..COUNT {
169         select! {
170             recv(after(ms(0))) -> _ => {}
171             default => panic!(),
172         }
173     }
174 
175     for _ in 0..COUNT {
176         select! {
177             recv(after(ms(100))) -> _ => panic!(),
178             default => {}
179         }
180     }
181 }
182 
183 #[test]
select()184 fn select() {
185     const THREADS: usize = 4;
186     const COUNT: usize = 1000;
187     const TIMEOUT_MS: u64 = 100;
188 
189     let v = (0..COUNT)
190         .map(|i| after(ms(i as u64 / TIMEOUT_MS / 2)))
191         .collect::<Vec<_>>();
192     let hits = AtomicUsize::new(0);
193 
194     scope(|scope| {
195         for _ in 0..THREADS {
196             scope.spawn(|_| {
197                 let v: Vec<&_> = v.iter().collect();
198 
199                 loop {
200                     let timeout = after(ms(TIMEOUT_MS));
201                     let mut sel = Select::new();
202                     for r in &v {
203                         sel.recv(r);
204                     }
205                     let oper_timeout = sel.recv(&timeout);
206 
207                     let oper = sel.select();
208                     match oper.index() {
209                         i if i == oper_timeout => {
210                             oper.recv(&timeout).unwrap();
211                             break;
212                         }
213                         i => {
214                             oper.recv(&v[i]).unwrap();
215                             hits.fetch_add(1, Ordering::SeqCst);
216                         }
217                     }
218                 }
219             });
220         }
221     })
222     .unwrap();
223 
224     assert_eq!(hits.load(Ordering::SeqCst), COUNT);
225 }
226 
227 #[test]
ready()228 fn ready() {
229     const THREADS: usize = 4;
230     const COUNT: usize = 1000;
231     const TIMEOUT_MS: u64 = 100;
232 
233     let v = (0..COUNT)
234         .map(|i| after(ms(i as u64 / TIMEOUT_MS / 2)))
235         .collect::<Vec<_>>();
236     let hits = AtomicUsize::new(0);
237 
238     scope(|scope| {
239         for _ in 0..THREADS {
240             scope.spawn(|_| {
241                 let v: Vec<&_> = v.iter().collect();
242 
243                 loop {
244                     let timeout = after(ms(TIMEOUT_MS));
245                     let mut sel = Select::new();
246                     for r in &v {
247                         sel.recv(r);
248                     }
249                     let oper_timeout = sel.recv(&timeout);
250 
251                     loop {
252                         let i = sel.ready();
253                         if i == oper_timeout {
254                             timeout.try_recv().unwrap();
255                             return;
256                         } else if v[i].try_recv().is_ok() {
257                             hits.fetch_add(1, Ordering::SeqCst);
258                             break;
259                         }
260                     }
261                 }
262             });
263         }
264     })
265     .unwrap();
266 
267     assert_eq!(hits.load(Ordering::SeqCst), COUNT);
268 }
269 
270 #[test]
stress_clone()271 fn stress_clone() {
272     const RUNS: usize = 1000;
273     const THREADS: usize = 10;
274     const COUNT: usize = 50;
275 
276     for i in 0..RUNS {
277         let r = after(ms(i as u64));
278 
279         scope(|scope| {
280             for _ in 0..THREADS {
281                 scope.spawn(|_| {
282                     let r = r.clone();
283                     let _ = r.try_recv();
284 
285                     for _ in 0..COUNT {
286                         drop(r.clone());
287                         thread::yield_now();
288                     }
289                 });
290             }
291         })
292         .unwrap();
293     }
294 }
295 
296 #[test]
fairness()297 fn fairness() {
298     const COUNT: usize = 1000;
299 
300     for &dur in &[0, 1] {
301         let mut hits = [0usize; 2];
302 
303         for _ in 0..COUNT {
304             select! {
305                 recv(after(ms(dur))) -> _ => hits[0] += 1,
306                 recv(after(ms(dur))) -> _ => hits[1] += 1,
307             }
308         }
309 
310         assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
311     }
312 }
313 
314 #[test]
fairness_duplicates()315 fn fairness_duplicates() {
316     const COUNT: usize = 1000;
317 
318     for &dur in &[0, 1] {
319         let mut hits = [0usize; 5];
320 
321         for _ in 0..COUNT {
322             let r = after(ms(dur));
323             select! {
324                 recv(r) -> _ => hits[0] += 1,
325                 recv(r) -> _ => hits[1] += 1,
326                 recv(r) -> _ => hits[2] += 1,
327                 recv(r) -> _ => hits[3] += 1,
328                 recv(r) -> _ => hits[4] += 1,
329             }
330         }
331 
332         assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
333     }
334 }
335