1 //! Tests for the list channel flavor.
2 
3 use std::any::Any;
4 use std::sync::atomic::AtomicUsize;
5 use std::sync::atomic::Ordering;
6 use std::thread;
7 use std::time::Duration;
8 
9 use crossbeam_channel::{select, unbounded, Receiver};
10 use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError};
11 use crossbeam_channel::{SendError, SendTimeoutError, TrySendError};
12 use crossbeam_utils::thread::scope;
13 use rand::{thread_rng, Rng};
14 
ms(ms: u64) -> Duration15 fn ms(ms: u64) -> Duration {
16     Duration::from_millis(ms)
17 }
18 
19 #[test]
smoke()20 fn smoke() {
21     let (s, r) = unbounded();
22     s.try_send(7).unwrap();
23     assert_eq!(r.try_recv(), Ok(7));
24 
25     s.send(8).unwrap();
26     assert_eq!(r.recv(), Ok(8));
27 
28     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
29     assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
30 }
31 
32 #[test]
capacity()33 fn capacity() {
34     let (s, r) = unbounded::<()>();
35     assert_eq!(s.capacity(), None);
36     assert_eq!(r.capacity(), None);
37 }
38 
39 #[test]
len_empty_full()40 fn len_empty_full() {
41     let (s, r) = unbounded();
42 
43     assert_eq!(s.len(), 0);
44     assert!(s.is_empty());
45     assert!(!s.is_full());
46     assert_eq!(r.len(), 0);
47     assert!(r.is_empty());
48     assert!(!r.is_full());
49 
50     s.send(()).unwrap();
51 
52     assert_eq!(s.len(), 1);
53     assert!(!s.is_empty());
54     assert!(!s.is_full());
55     assert_eq!(r.len(), 1);
56     assert!(!r.is_empty());
57     assert!(!r.is_full());
58 
59     r.recv().unwrap();
60 
61     assert_eq!(s.len(), 0);
62     assert!(s.is_empty());
63     assert!(!s.is_full());
64     assert_eq!(r.len(), 0);
65     assert!(r.is_empty());
66     assert!(!r.is_full());
67 }
68 
69 #[test]
try_recv()70 fn try_recv() {
71     let (s, r) = unbounded();
72 
73     scope(|scope| {
74         scope.spawn(move |_| {
75             assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
76             thread::sleep(ms(1500));
77             assert_eq!(r.try_recv(), Ok(7));
78             thread::sleep(ms(500));
79             assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
80         });
81         scope.spawn(move |_| {
82             thread::sleep(ms(1000));
83             s.send(7).unwrap();
84         });
85     })
86     .unwrap();
87 }
88 
89 #[test]
recv()90 fn recv() {
91     let (s, r) = unbounded();
92 
93     scope(|scope| {
94         scope.spawn(move |_| {
95             assert_eq!(r.recv(), Ok(7));
96             thread::sleep(ms(1000));
97             assert_eq!(r.recv(), Ok(8));
98             thread::sleep(ms(1000));
99             assert_eq!(r.recv(), Ok(9));
100             assert_eq!(r.recv(), Err(RecvError));
101         });
102         scope.spawn(move |_| {
103             thread::sleep(ms(1500));
104             s.send(7).unwrap();
105             s.send(8).unwrap();
106             s.send(9).unwrap();
107         });
108     })
109     .unwrap();
110 }
111 
112 #[test]
recv_timeout()113 fn recv_timeout() {
114     let (s, r) = unbounded::<i32>();
115 
116     scope(|scope| {
117         scope.spawn(move |_| {
118             assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
119             assert_eq!(r.recv_timeout(ms(1000)), Ok(7));
120             assert_eq!(
121                 r.recv_timeout(ms(1000)),
122                 Err(RecvTimeoutError::Disconnected)
123             );
124         });
125         scope.spawn(move |_| {
126             thread::sleep(ms(1500));
127             s.send(7).unwrap();
128         });
129     })
130     .unwrap();
131 }
132 
133 #[test]
try_send()134 fn try_send() {
135     let (s, r) = unbounded();
136     for i in 0..1000 {
137         assert_eq!(s.try_send(i), Ok(()));
138     }
139 
140     drop(r);
141     assert_eq!(s.try_send(777), Err(TrySendError::Disconnected(777)));
142 }
143 
144 #[test]
send()145 fn send() {
146     let (s, r) = unbounded();
147     for i in 0..1000 {
148         assert_eq!(s.send(i), Ok(()));
149     }
150 
151     drop(r);
152     assert_eq!(s.send(777), Err(SendError(777)));
153 }
154 
155 #[test]
send_timeout()156 fn send_timeout() {
157     let (s, r) = unbounded();
158     for i in 0..1000 {
159         assert_eq!(s.send_timeout(i, ms(i as u64)), Ok(()));
160     }
161 
162     drop(r);
163     assert_eq!(
164         s.send_timeout(777, ms(0)),
165         Err(SendTimeoutError::Disconnected(777))
166     );
167 }
168 
169 #[test]
send_after_disconnect()170 fn send_after_disconnect() {
171     let (s, r) = unbounded();
172 
173     s.send(1).unwrap();
174     s.send(2).unwrap();
175     s.send(3).unwrap();
176 
177     drop(r);
178 
179     assert_eq!(s.send(4), Err(SendError(4)));
180     assert_eq!(s.try_send(5), Err(TrySendError::Disconnected(5)));
181     assert_eq!(
182         s.send_timeout(6, ms(0)),
183         Err(SendTimeoutError::Disconnected(6))
184     );
185 }
186 
187 #[test]
recv_after_disconnect()188 fn recv_after_disconnect() {
189     let (s, r) = unbounded();
190 
191     s.send(1).unwrap();
192     s.send(2).unwrap();
193     s.send(3).unwrap();
194 
195     drop(s);
196 
197     assert_eq!(r.recv(), Ok(1));
198     assert_eq!(r.recv(), Ok(2));
199     assert_eq!(r.recv(), Ok(3));
200     assert_eq!(r.recv(), Err(RecvError));
201 }
202 
203 #[test]
len()204 fn len() {
205     let (s, r) = unbounded();
206 
207     assert_eq!(s.len(), 0);
208     assert_eq!(r.len(), 0);
209 
210     for i in 0..50 {
211         s.send(i).unwrap();
212         assert_eq!(s.len(), i + 1);
213     }
214 
215     for i in 0..50 {
216         r.recv().unwrap();
217         assert_eq!(r.len(), 50 - i - 1);
218     }
219 
220     assert_eq!(s.len(), 0);
221     assert_eq!(r.len(), 0);
222 }
223 
224 #[test]
disconnect_wakes_receiver()225 fn disconnect_wakes_receiver() {
226     let (s, r) = unbounded::<()>();
227 
228     scope(|scope| {
229         scope.spawn(move |_| {
230             assert_eq!(r.recv(), Err(RecvError));
231         });
232         scope.spawn(move |_| {
233             thread::sleep(ms(1000));
234             drop(s);
235         });
236     })
237     .unwrap();
238 }
239 
240 #[test]
spsc()241 fn spsc() {
242     #[cfg(miri)]
243     const COUNT: usize = 100;
244     #[cfg(not(miri))]
245     const COUNT: usize = 100_000;
246 
247     let (s, r) = unbounded();
248 
249     scope(|scope| {
250         scope.spawn(move |_| {
251             for i in 0..COUNT {
252                 assert_eq!(r.recv(), Ok(i));
253             }
254             assert_eq!(r.recv(), Err(RecvError));
255         });
256         scope.spawn(move |_| {
257             for i in 0..COUNT {
258                 s.send(i).unwrap();
259             }
260         });
261     })
262     .unwrap();
263 }
264 
265 #[test]
mpmc()266 fn mpmc() {
267     #[cfg(miri)]
268     const COUNT: usize = 100;
269     #[cfg(not(miri))]
270     const COUNT: usize = 25_000;
271     const THREADS: usize = 4;
272 
273     let (s, r) = unbounded::<usize>();
274     let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
275 
276     scope(|scope| {
277         for _ in 0..THREADS {
278             scope.spawn(|_| {
279                 for _ in 0..COUNT {
280                     let n = r.recv().unwrap();
281                     v[n].fetch_add(1, Ordering::SeqCst);
282                 }
283             });
284         }
285         for _ in 0..THREADS {
286             scope.spawn(|_| {
287                 for i in 0..COUNT {
288                     s.send(i).unwrap();
289                 }
290             });
291         }
292     })
293     .unwrap();
294 
295     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
296 
297     for c in v {
298         assert_eq!(c.load(Ordering::SeqCst), THREADS);
299     }
300 }
301 
302 #[test]
stress_oneshot()303 fn stress_oneshot() {
304     #[cfg(miri)]
305     const COUNT: usize = 100;
306     #[cfg(not(miri))]
307     const COUNT: usize = 10_000;
308 
309     for _ in 0..COUNT {
310         let (s, r) = unbounded();
311 
312         scope(|scope| {
313             scope.spawn(|_| r.recv().unwrap());
314             scope.spawn(|_| s.send(0).unwrap());
315         })
316         .unwrap();
317     }
318 }
319 
320 #[test]
stress_iter()321 fn stress_iter() {
322     #[cfg(miri)]
323     const COUNT: usize = 100;
324     #[cfg(not(miri))]
325     const COUNT: usize = 100_000;
326 
327     let (request_s, request_r) = unbounded();
328     let (response_s, response_r) = unbounded();
329 
330     scope(|scope| {
331         scope.spawn(move |_| {
332             let mut count = 0;
333             loop {
334                 for x in response_r.try_iter() {
335                     count += x;
336                     if count == COUNT {
337                         return;
338                     }
339                 }
340                 request_s.send(()).unwrap();
341             }
342         });
343 
344         for _ in request_r.iter() {
345             if response_s.send(1).is_err() {
346                 break;
347             }
348         }
349     })
350     .unwrap();
351 }
352 
353 #[test]
stress_timeout_two_threads()354 fn stress_timeout_two_threads() {
355     const COUNT: usize = 100;
356 
357     let (s, r) = unbounded();
358 
359     scope(|scope| {
360         scope.spawn(|_| {
361             for i in 0..COUNT {
362                 if i % 2 == 0 {
363                     thread::sleep(ms(50));
364                 }
365                 s.send(i).unwrap();
366             }
367         });
368 
369         scope.spawn(|_| {
370             for i in 0..COUNT {
371                 if i % 2 == 0 {
372                     thread::sleep(ms(50));
373                 }
374                 loop {
375                     if let Ok(x) = r.recv_timeout(ms(10)) {
376                         assert_eq!(x, i);
377                         break;
378                     }
379                 }
380             }
381         });
382     })
383     .unwrap();
384 }
385 
386 #[cfg_attr(miri, ignore)] // Miri is too slow
387 #[test]
drops()388 fn drops() {
389     const RUNS: usize = 100;
390 
391     static DROPS: AtomicUsize = AtomicUsize::new(0);
392 
393     #[derive(Debug, PartialEq)]
394     struct DropCounter;
395 
396     impl Drop for DropCounter {
397         fn drop(&mut self) {
398             DROPS.fetch_add(1, Ordering::SeqCst);
399         }
400     }
401 
402     let mut rng = thread_rng();
403 
404     for _ in 0..RUNS {
405         let steps = rng.gen_range(0..10_000);
406         let additional = rng.gen_range(0..1000);
407 
408         DROPS.store(0, Ordering::SeqCst);
409         let (s, r) = unbounded::<DropCounter>();
410 
411         scope(|scope| {
412             scope.spawn(|_| {
413                 for _ in 0..steps {
414                     r.recv().unwrap();
415                 }
416             });
417 
418             scope.spawn(|_| {
419                 for _ in 0..steps {
420                     s.send(DropCounter).unwrap();
421                 }
422             });
423         })
424         .unwrap();
425 
426         for _ in 0..additional {
427             s.try_send(DropCounter).unwrap();
428         }
429 
430         assert_eq!(DROPS.load(Ordering::SeqCst), steps);
431         drop(s);
432         drop(r);
433         assert_eq!(DROPS.load(Ordering::SeqCst), steps + additional);
434     }
435 }
436 
437 #[test]
linearizable()438 fn linearizable() {
439     #[cfg(miri)]
440     const COUNT: usize = 100;
441     #[cfg(not(miri))]
442     const COUNT: usize = 25_000;
443     const THREADS: usize = 4;
444 
445     let (s, r) = unbounded();
446 
447     scope(|scope| {
448         for _ in 0..THREADS {
449             scope.spawn(|_| {
450                 for _ in 0..COUNT {
451                     s.send(0).unwrap();
452                     r.try_recv().unwrap();
453                 }
454             });
455         }
456     })
457     .unwrap();
458 }
459 
460 #[test]
fairness()461 fn fairness() {
462     #[cfg(miri)]
463     const COUNT: usize = 100;
464     #[cfg(not(miri))]
465     const COUNT: usize = 10_000;
466 
467     let (s1, r1) = unbounded::<()>();
468     let (s2, r2) = unbounded::<()>();
469 
470     for _ in 0..COUNT {
471         s1.send(()).unwrap();
472         s2.send(()).unwrap();
473     }
474 
475     let mut hits = [0usize; 2];
476     for _ in 0..COUNT {
477         select! {
478             recv(r1) -> _ => hits[0] += 1,
479             recv(r2) -> _ => hits[1] += 1,
480         }
481     }
482     assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
483 }
484 
485 #[test]
fairness_duplicates()486 fn fairness_duplicates() {
487     #[cfg(miri)]
488     const COUNT: usize = 100;
489     #[cfg(not(miri))]
490     const COUNT: usize = 10_000;
491 
492     let (s, r) = unbounded();
493 
494     for _ in 0..COUNT {
495         s.send(()).unwrap();
496     }
497 
498     let mut hits = [0usize; 5];
499     for _ in 0..COUNT {
500         select! {
501             recv(r) -> _ => hits[0] += 1,
502             recv(r) -> _ => hits[1] += 1,
503             recv(r) -> _ => hits[2] += 1,
504             recv(r) -> _ => hits[3] += 1,
505             recv(r) -> _ => hits[4] += 1,
506         }
507     }
508     assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
509 }
510 
511 #[test]
recv_in_send()512 fn recv_in_send() {
513     let (s, r) = unbounded();
514     s.send(()).unwrap();
515 
516     select! {
517         send(s, assert_eq!(r.recv(), Ok(()))) -> _ => {}
518     }
519 }
520 
521 #[test]
channel_through_channel()522 fn channel_through_channel() {
523     #[cfg(miri)]
524     const COUNT: usize = 100;
525     #[cfg(not(miri))]
526     const COUNT: usize = 1000;
527 
528     type T = Box<dyn Any + Send>;
529 
530     let (s, r) = unbounded::<T>();
531 
532     scope(|scope| {
533         scope.spawn(move |_| {
534             let mut s = s;
535 
536             for _ in 0..COUNT {
537                 let (new_s, new_r) = unbounded();
538                 let new_r: T = Box::new(Some(new_r));
539 
540                 s.send(new_r).unwrap();
541                 s = new_s;
542             }
543         });
544 
545         scope.spawn(move |_| {
546             let mut r = r;
547 
548             for _ in 0..COUNT {
549                 r = r
550                     .recv()
551                     .unwrap()
552                     .downcast_mut::<Option<Receiver<T>>>()
553                     .unwrap()
554                     .take()
555                     .unwrap()
556             }
557         });
558     })
559     .unwrap();
560 }
561