1 //! Tests for the list channel flavor.
2 
3 use std::any::Any;
4 use std::sync::atomic::AtomicUsize;
5 use std::sync::atomic::Ordering;
6 use std::thread;
7 use std::time::Duration;
8 
9 use crossbeam_channel::{select, unbounded, Receiver};
10 use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError};
11 use crossbeam_channel::{SendError, SendTimeoutError, TrySendError};
12 use crossbeam_utils::thread::scope;
13 use rand::{thread_rng, Rng};
14 
ms(ms: u64) -> Duration15 fn ms(ms: u64) -> Duration {
16     Duration::from_millis(ms)
17 }
18 
19 #[test]
smoke()20 fn smoke() {
21     let (s, r) = unbounded();
22     s.try_send(7).unwrap();
23     assert_eq!(r.try_recv(), Ok(7));
24 
25     s.send(8).unwrap();
26     assert_eq!(r.recv(), Ok(8));
27 
28     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
29     assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
30 }
31 
32 #[test]
capacity()33 fn capacity() {
34     let (s, r) = unbounded::<()>();
35     assert_eq!(s.capacity(), None);
36     assert_eq!(r.capacity(), None);
37 }
38 
39 #[test]
len_empty_full()40 fn len_empty_full() {
41     let (s, r) = unbounded();
42 
43     assert_eq!(s.len(), 0);
44     assert_eq!(s.is_empty(), true);
45     assert_eq!(s.is_full(), false);
46     assert_eq!(r.len(), 0);
47     assert_eq!(r.is_empty(), true);
48     assert_eq!(r.is_full(), false);
49 
50     s.send(()).unwrap();
51 
52     assert_eq!(s.len(), 1);
53     assert_eq!(s.is_empty(), false);
54     assert_eq!(s.is_full(), false);
55     assert_eq!(r.len(), 1);
56     assert_eq!(r.is_empty(), false);
57     assert_eq!(r.is_full(), false);
58 
59     r.recv().unwrap();
60 
61     assert_eq!(s.len(), 0);
62     assert_eq!(s.is_empty(), true);
63     assert_eq!(s.is_full(), false);
64     assert_eq!(r.len(), 0);
65     assert_eq!(r.is_empty(), true);
66     assert_eq!(r.is_full(), false);
67 }
68 
69 #[test]
try_recv()70 fn try_recv() {
71     let (s, r) = unbounded();
72 
73     scope(|scope| {
74         scope.spawn(move |_| {
75             assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
76             thread::sleep(ms(1500));
77             assert_eq!(r.try_recv(), Ok(7));
78             thread::sleep(ms(500));
79             assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
80         });
81         scope.spawn(move |_| {
82             thread::sleep(ms(1000));
83             s.send(7).unwrap();
84         });
85     })
86     .unwrap();
87 }
88 
89 #[test]
recv()90 fn recv() {
91     let (s, r) = unbounded();
92 
93     scope(|scope| {
94         scope.spawn(move |_| {
95             assert_eq!(r.recv(), Ok(7));
96             thread::sleep(ms(1000));
97             assert_eq!(r.recv(), Ok(8));
98             thread::sleep(ms(1000));
99             assert_eq!(r.recv(), Ok(9));
100             assert_eq!(r.recv(), Err(RecvError));
101         });
102         scope.spawn(move |_| {
103             thread::sleep(ms(1500));
104             s.send(7).unwrap();
105             s.send(8).unwrap();
106             s.send(9).unwrap();
107         });
108     })
109     .unwrap();
110 }
111 
112 #[test]
recv_timeout()113 fn recv_timeout() {
114     let (s, r) = unbounded::<i32>();
115 
116     scope(|scope| {
117         scope.spawn(move |_| {
118             assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
119             assert_eq!(r.recv_timeout(ms(1000)), Ok(7));
120             assert_eq!(
121                 r.recv_timeout(ms(1000)),
122                 Err(RecvTimeoutError::Disconnected)
123             );
124         });
125         scope.spawn(move |_| {
126             thread::sleep(ms(1500));
127             s.send(7).unwrap();
128         });
129     })
130     .unwrap();
131 }
132 
133 #[test]
try_send()134 fn try_send() {
135     let (s, r) = unbounded();
136     for i in 0..1000 {
137         assert_eq!(s.try_send(i), Ok(()));
138     }
139 
140     drop(r);
141     assert_eq!(s.try_send(777), Err(TrySendError::Disconnected(777)));
142 }
143 
144 #[test]
send()145 fn send() {
146     let (s, r) = unbounded();
147     for i in 0..1000 {
148         assert_eq!(s.send(i), Ok(()));
149     }
150 
151     drop(r);
152     assert_eq!(s.send(777), Err(SendError(777)));
153 }
154 
155 #[test]
send_timeout()156 fn send_timeout() {
157     let (s, r) = unbounded();
158     for i in 0..1000 {
159         assert_eq!(s.send_timeout(i, ms(i as u64)), Ok(()));
160     }
161 
162     drop(r);
163     assert_eq!(
164         s.send_timeout(777, ms(0)),
165         Err(SendTimeoutError::Disconnected(777))
166     );
167 }
168 
169 #[test]
send_after_disconnect()170 fn send_after_disconnect() {
171     let (s, r) = unbounded();
172 
173     s.send(1).unwrap();
174     s.send(2).unwrap();
175     s.send(3).unwrap();
176 
177     drop(r);
178 
179     assert_eq!(s.send(4), Err(SendError(4)));
180     assert_eq!(s.try_send(5), Err(TrySendError::Disconnected(5)));
181     assert_eq!(
182         s.send_timeout(6, ms(0)),
183         Err(SendTimeoutError::Disconnected(6))
184     );
185 }
186 
187 #[test]
recv_after_disconnect()188 fn recv_after_disconnect() {
189     let (s, r) = unbounded();
190 
191     s.send(1).unwrap();
192     s.send(2).unwrap();
193     s.send(3).unwrap();
194 
195     drop(s);
196 
197     assert_eq!(r.recv(), Ok(1));
198     assert_eq!(r.recv(), Ok(2));
199     assert_eq!(r.recv(), Ok(3));
200     assert_eq!(r.recv(), Err(RecvError));
201 }
202 
203 #[test]
len()204 fn len() {
205     let (s, r) = unbounded();
206 
207     assert_eq!(s.len(), 0);
208     assert_eq!(r.len(), 0);
209 
210     for i in 0..50 {
211         s.send(i).unwrap();
212         assert_eq!(s.len(), i + 1);
213     }
214 
215     for i in 0..50 {
216         r.recv().unwrap();
217         assert_eq!(r.len(), 50 - i - 1);
218     }
219 
220     assert_eq!(s.len(), 0);
221     assert_eq!(r.len(), 0);
222 }
223 
224 #[test]
disconnect_wakes_receiver()225 fn disconnect_wakes_receiver() {
226     let (s, r) = unbounded::<()>();
227 
228     scope(|scope| {
229         scope.spawn(move |_| {
230             assert_eq!(r.recv(), Err(RecvError));
231         });
232         scope.spawn(move |_| {
233             thread::sleep(ms(1000));
234             drop(s);
235         });
236     })
237     .unwrap();
238 }
239 
240 #[test]
spsc()241 fn spsc() {
242     const COUNT: usize = 100_000;
243 
244     let (s, r) = unbounded();
245 
246     scope(|scope| {
247         scope.spawn(move |_| {
248             for i in 0..COUNT {
249                 assert_eq!(r.recv(), Ok(i));
250             }
251             assert_eq!(r.recv(), Err(RecvError));
252         });
253         scope.spawn(move |_| {
254             for i in 0..COUNT {
255                 s.send(i).unwrap();
256             }
257         });
258     })
259     .unwrap();
260 }
261 
262 #[test]
mpmc()263 fn mpmc() {
264     const COUNT: usize = 25_000;
265     const THREADS: usize = 4;
266 
267     let (s, r) = unbounded::<usize>();
268     let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
269 
270     scope(|scope| {
271         for _ in 0..THREADS {
272             scope.spawn(|_| {
273                 for _ in 0..COUNT {
274                     let n = r.recv().unwrap();
275                     v[n].fetch_add(1, Ordering::SeqCst);
276                 }
277             });
278         }
279         for _ in 0..THREADS {
280             scope.spawn(|_| {
281                 for i in 0..COUNT {
282                     s.send(i).unwrap();
283                 }
284             });
285         }
286     })
287     .unwrap();
288 
289     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
290 
291     for c in v {
292         assert_eq!(c.load(Ordering::SeqCst), THREADS);
293     }
294 }
295 
296 #[test]
stress_oneshot()297 fn stress_oneshot() {
298     const COUNT: usize = 10_000;
299 
300     for _ in 0..COUNT {
301         let (s, r) = unbounded();
302 
303         scope(|scope| {
304             scope.spawn(|_| r.recv().unwrap());
305             scope.spawn(|_| s.send(0).unwrap());
306         })
307         .unwrap();
308     }
309 }
310 
311 #[test]
stress_iter()312 fn stress_iter() {
313     const COUNT: usize = 100_000;
314 
315     let (request_s, request_r) = unbounded();
316     let (response_s, response_r) = unbounded();
317 
318     scope(|scope| {
319         scope.spawn(move |_| {
320             let mut count = 0;
321             loop {
322                 for x in response_r.try_iter() {
323                     count += x;
324                     if count == COUNT {
325                         return;
326                     }
327                 }
328                 request_s.send(()).unwrap();
329             }
330         });
331 
332         for _ in request_r.iter() {
333             if response_s.send(1).is_err() {
334                 break;
335             }
336         }
337     })
338     .unwrap();
339 }
340 
341 #[test]
stress_timeout_two_threads()342 fn stress_timeout_two_threads() {
343     const COUNT: usize = 100;
344 
345     let (s, r) = unbounded();
346 
347     scope(|scope| {
348         scope.spawn(|_| {
349             for i in 0..COUNT {
350                 if i % 2 == 0 {
351                     thread::sleep(ms(50));
352                 }
353                 s.send(i).unwrap();
354             }
355         });
356 
357         scope.spawn(|_| {
358             for i in 0..COUNT {
359                 if i % 2 == 0 {
360                     thread::sleep(ms(50));
361                 }
362                 loop {
363                     if let Ok(x) = r.recv_timeout(ms(10)) {
364                         assert_eq!(x, i);
365                         break;
366                     }
367                 }
368             }
369         });
370     })
371     .unwrap();
372 }
373 
374 #[test]
drops()375 fn drops() {
376     static DROPS: AtomicUsize = AtomicUsize::new(0);
377 
378     #[derive(Debug, PartialEq)]
379     struct DropCounter;
380 
381     impl Drop for DropCounter {
382         fn drop(&mut self) {
383             DROPS.fetch_add(1, Ordering::SeqCst);
384         }
385     }
386 
387     let mut rng = thread_rng();
388 
389     for _ in 0..100 {
390         let steps = rng.gen_range(0..10_000);
391         let additional = rng.gen_range(0..1000);
392 
393         DROPS.store(0, Ordering::SeqCst);
394         let (s, r) = unbounded::<DropCounter>();
395 
396         scope(|scope| {
397             scope.spawn(|_| {
398                 for _ in 0..steps {
399                     r.recv().unwrap();
400                 }
401             });
402 
403             scope.spawn(|_| {
404                 for _ in 0..steps {
405                     s.send(DropCounter).unwrap();
406                 }
407             });
408         })
409         .unwrap();
410 
411         for _ in 0..additional {
412             s.try_send(DropCounter).unwrap();
413         }
414 
415         assert_eq!(DROPS.load(Ordering::SeqCst), steps);
416         drop(s);
417         drop(r);
418         assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
419     }
420 }
421 
422 #[test]
linearizable()423 fn linearizable() {
424     const COUNT: usize = 25_000;
425     const THREADS: usize = 4;
426 
427     let (s, r) = unbounded();
428 
429     scope(|scope| {
430         for _ in 0..THREADS {
431             scope.spawn(|_| {
432                 for _ in 0..COUNT {
433                     s.send(0).unwrap();
434                     r.try_recv().unwrap();
435                 }
436             });
437         }
438     })
439     .unwrap();
440 }
441 
442 #[test]
fairness()443 fn fairness() {
444     const COUNT: usize = 10_000;
445 
446     let (s1, r1) = unbounded::<()>();
447     let (s2, r2) = unbounded::<()>();
448 
449     for _ in 0..COUNT {
450         s1.send(()).unwrap();
451         s2.send(()).unwrap();
452     }
453 
454     let mut hits = [0usize; 2];
455     for _ in 0..COUNT {
456         select! {
457             recv(r1) -> _ => hits[0] += 1,
458             recv(r2) -> _ => hits[1] += 1,
459         }
460     }
461     assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
462 }
463 
464 #[test]
fairness_duplicates()465 fn fairness_duplicates() {
466     const COUNT: usize = 10_000;
467 
468     let (s, r) = unbounded();
469 
470     for _ in 0..COUNT {
471         s.send(()).unwrap();
472     }
473 
474     let mut hits = [0usize; 5];
475     for _ in 0..COUNT {
476         select! {
477             recv(r) -> _ => hits[0] += 1,
478             recv(r) -> _ => hits[1] += 1,
479             recv(r) -> _ => hits[2] += 1,
480             recv(r) -> _ => hits[3] += 1,
481             recv(r) -> _ => hits[4] += 1,
482         }
483     }
484     assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
485 }
486 
487 #[test]
recv_in_send()488 fn recv_in_send() {
489     let (s, r) = unbounded();
490     s.send(()).unwrap();
491 
492     select! {
493         send(s, assert_eq!(r.recv(), Ok(()))) -> _ => {}
494     }
495 }
496 
497 #[test]
channel_through_channel()498 fn channel_through_channel() {
499     const COUNT: usize = 1000;
500 
501     type T = Box<dyn Any + Send>;
502 
503     let (s, r) = unbounded::<T>();
504 
505     scope(|scope| {
506         scope.spawn(move |_| {
507             let mut s = s;
508 
509             for _ in 0..COUNT {
510                 let (new_s, new_r) = unbounded();
511                 let new_r: T = Box::new(Some(new_r));
512 
513                 s.send(new_r).unwrap();
514                 s = new_s;
515             }
516         });
517 
518         scope.spawn(move |_| {
519             let mut r = r;
520 
521             for _ in 0..COUNT {
522                 r = r
523                     .recv()
524                     .unwrap()
525                     .downcast_mut::<Option<Receiver<T>>>()
526                     .unwrap()
527                     .take()
528                     .unwrap()
529             }
530         });
531     })
532     .unwrap();
533 }
534