1 //! Tests for the zero channel flavor.
2 
3 use std::any::Any;
4 use std::sync::atomic::AtomicUsize;
5 use std::sync::atomic::Ordering;
6 use std::thread;
7 use std::time::Duration;
8 
9 use crossbeam_channel::{bounded, select, Receiver};
10 use crossbeam_channel::{RecvError, RecvTimeoutError, TryRecvError};
11 use crossbeam_channel::{SendError, SendTimeoutError, TrySendError};
12 use crossbeam_utils::thread::scope;
13 use rand::{thread_rng, Rng};
14 
ms(ms: u64) -> Duration15 fn ms(ms: u64) -> Duration {
16     Duration::from_millis(ms)
17 }
18 
19 #[test]
smoke()20 fn smoke() {
21     let (s, r) = bounded(0);
22     assert_eq!(s.try_send(7), Err(TrySendError::Full(7)));
23     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
24 }
25 
26 #[test]
capacity()27 fn capacity() {
28     let (s, r) = bounded::<()>(0);
29     assert_eq!(s.capacity(), Some(0));
30     assert_eq!(r.capacity(), Some(0));
31 }
32 
33 #[test]
len_empty_full()34 fn len_empty_full() {
35     let (s, r) = bounded(0);
36 
37     assert_eq!(s.len(), 0);
38     assert_eq!(s.is_empty(), true);
39     assert_eq!(s.is_full(), true);
40     assert_eq!(r.len(), 0);
41     assert_eq!(r.is_empty(), true);
42     assert_eq!(r.is_full(), true);
43 
44     scope(|scope| {
45         scope.spawn(|_| s.send(0).unwrap());
46         scope.spawn(|_| r.recv().unwrap());
47     })
48     .unwrap();
49 
50     assert_eq!(s.len(), 0);
51     assert_eq!(s.is_empty(), true);
52     assert_eq!(s.is_full(), true);
53     assert_eq!(r.len(), 0);
54     assert_eq!(r.is_empty(), true);
55     assert_eq!(r.is_full(), true);
56 }
57 
58 #[test]
try_recv()59 fn try_recv() {
60     let (s, r) = bounded(0);
61 
62     scope(|scope| {
63         scope.spawn(move |_| {
64             assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
65             thread::sleep(ms(1500));
66             assert_eq!(r.try_recv(), Ok(7));
67             thread::sleep(ms(500));
68             assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
69         });
70         scope.spawn(move |_| {
71             thread::sleep(ms(1000));
72             s.send(7).unwrap();
73         });
74     })
75     .unwrap();
76 }
77 
78 #[test]
recv()79 fn recv() {
80     let (s, r) = bounded(0);
81 
82     scope(|scope| {
83         scope.spawn(move |_| {
84             assert_eq!(r.recv(), Ok(7));
85             thread::sleep(ms(1000));
86             assert_eq!(r.recv(), Ok(8));
87             thread::sleep(ms(1000));
88             assert_eq!(r.recv(), Ok(9));
89             assert_eq!(r.recv(), Err(RecvError));
90         });
91         scope.spawn(move |_| {
92             thread::sleep(ms(1500));
93             s.send(7).unwrap();
94             s.send(8).unwrap();
95             s.send(9).unwrap();
96         });
97     })
98     .unwrap();
99 }
100 
101 #[test]
recv_timeout()102 fn recv_timeout() {
103     let (s, r) = bounded::<i32>(0);
104 
105     scope(|scope| {
106         scope.spawn(move |_| {
107             assert_eq!(r.recv_timeout(ms(1000)), Err(RecvTimeoutError::Timeout));
108             assert_eq!(r.recv_timeout(ms(1000)), Ok(7));
109             assert_eq!(
110                 r.recv_timeout(ms(1000)),
111                 Err(RecvTimeoutError::Disconnected)
112             );
113         });
114         scope.spawn(move |_| {
115             thread::sleep(ms(1500));
116             s.send(7).unwrap();
117         });
118     })
119     .unwrap();
120 }
121 
122 #[test]
try_send()123 fn try_send() {
124     let (s, r) = bounded(0);
125 
126     scope(|scope| {
127         scope.spawn(move |_| {
128             assert_eq!(s.try_send(7), Err(TrySendError::Full(7)));
129             thread::sleep(ms(1500));
130             assert_eq!(s.try_send(8), Ok(()));
131             thread::sleep(ms(500));
132             assert_eq!(s.try_send(9), Err(TrySendError::Disconnected(9)));
133         });
134         scope.spawn(move |_| {
135             thread::sleep(ms(1000));
136             assert_eq!(r.recv(), Ok(8));
137         });
138     })
139     .unwrap();
140 }
141 
142 #[test]
send()143 fn send() {
144     let (s, r) = bounded(0);
145 
146     scope(|scope| {
147         scope.spawn(move |_| {
148             s.send(7).unwrap();
149             thread::sleep(ms(1000));
150             s.send(8).unwrap();
151             thread::sleep(ms(1000));
152             s.send(9).unwrap();
153         });
154         scope.spawn(move |_| {
155             thread::sleep(ms(1500));
156             assert_eq!(r.recv(), Ok(7));
157             assert_eq!(r.recv(), Ok(8));
158             assert_eq!(r.recv(), Ok(9));
159         });
160     })
161     .unwrap();
162 }
163 
164 #[test]
send_timeout()165 fn send_timeout() {
166     let (s, r) = bounded(0);
167 
168     scope(|scope| {
169         scope.spawn(move |_| {
170             assert_eq!(
171                 s.send_timeout(7, ms(1000)),
172                 Err(SendTimeoutError::Timeout(7))
173             );
174             assert_eq!(s.send_timeout(8, ms(1000)), Ok(()));
175             assert_eq!(
176                 s.send_timeout(9, ms(1000)),
177                 Err(SendTimeoutError::Disconnected(9))
178             );
179         });
180         scope.spawn(move |_| {
181             thread::sleep(ms(1500));
182             assert_eq!(r.recv(), Ok(8));
183         });
184     })
185     .unwrap();
186 }
187 
188 #[test]
len()189 fn len() {
190     const COUNT: usize = 25_000;
191 
192     let (s, r) = bounded(0);
193 
194     assert_eq!(s.len(), 0);
195     assert_eq!(r.len(), 0);
196 
197     scope(|scope| {
198         scope.spawn(|_| {
199             for i in 0..COUNT {
200                 assert_eq!(r.recv(), Ok(i));
201                 assert_eq!(r.len(), 0);
202             }
203         });
204 
205         scope.spawn(|_| {
206             for i in 0..COUNT {
207                 s.send(i).unwrap();
208                 assert_eq!(s.len(), 0);
209             }
210         });
211     })
212     .unwrap();
213 
214     assert_eq!(s.len(), 0);
215     assert_eq!(r.len(), 0);
216 }
217 
218 #[test]
disconnect_wakes_sender()219 fn disconnect_wakes_sender() {
220     let (s, r) = bounded(0);
221 
222     scope(|scope| {
223         scope.spawn(move |_| {
224             assert_eq!(s.send(()), Err(SendError(())));
225         });
226         scope.spawn(move |_| {
227             thread::sleep(ms(1000));
228             drop(r);
229         });
230     })
231     .unwrap();
232 }
233 
234 #[test]
disconnect_wakes_receiver()235 fn disconnect_wakes_receiver() {
236     let (s, r) = bounded::<()>(0);
237 
238     scope(|scope| {
239         scope.spawn(move |_| {
240             assert_eq!(r.recv(), Err(RecvError));
241         });
242         scope.spawn(move |_| {
243             thread::sleep(ms(1000));
244             drop(s);
245         });
246     })
247     .unwrap();
248 }
249 
250 #[test]
spsc()251 fn spsc() {
252     const COUNT: usize = 100_000;
253 
254     let (s, r) = bounded(0);
255 
256     scope(|scope| {
257         scope.spawn(move |_| {
258             for i in 0..COUNT {
259                 assert_eq!(r.recv(), Ok(i));
260             }
261             assert_eq!(r.recv(), Err(RecvError));
262         });
263         scope.spawn(move |_| {
264             for i in 0..COUNT {
265                 s.send(i).unwrap();
266             }
267         });
268     })
269     .unwrap();
270 }
271 
272 #[test]
mpmc()273 fn mpmc() {
274     const COUNT: usize = 25_000;
275     const THREADS: usize = 4;
276 
277     let (s, r) = bounded::<usize>(0);
278     let v = (0..COUNT).map(|_| AtomicUsize::new(0)).collect::<Vec<_>>();
279 
280     scope(|scope| {
281         for _ in 0..THREADS {
282             scope.spawn(|_| {
283                 for _ in 0..COUNT {
284                     let n = r.recv().unwrap();
285                     v[n].fetch_add(1, Ordering::SeqCst);
286                 }
287             });
288         }
289         for _ in 0..THREADS {
290             scope.spawn(|_| {
291                 for i in 0..COUNT {
292                     s.send(i).unwrap();
293                 }
294             });
295         }
296     })
297     .unwrap();
298 
299     for c in v {
300         assert_eq!(c.load(Ordering::SeqCst), THREADS);
301     }
302 }
303 
304 #[test]
stress_oneshot()305 fn stress_oneshot() {
306     const COUNT: usize = 10_000;
307 
308     for _ in 0..COUNT {
309         let (s, r) = bounded(1);
310 
311         scope(|scope| {
312             scope.spawn(|_| r.recv().unwrap());
313             scope.spawn(|_| s.send(0).unwrap());
314         })
315         .unwrap();
316     }
317 }
318 
319 #[test]
stress_iter()320 fn stress_iter() {
321     const COUNT: usize = 1000;
322 
323     let (request_s, request_r) = bounded(0);
324     let (response_s, response_r) = bounded(0);
325 
326     scope(|scope| {
327         scope.spawn(move |_| {
328             let mut count = 0;
329             loop {
330                 for x in response_r.try_iter() {
331                     count += x;
332                     if count == COUNT {
333                         return;
334                     }
335                 }
336                 let _ = request_s.try_send(());
337             }
338         });
339 
340         for _ in request_r.iter() {
341             if response_s.send(1).is_err() {
342                 break;
343             }
344         }
345     })
346     .unwrap();
347 }
348 
349 #[test]
stress_timeout_two_threads()350 fn stress_timeout_two_threads() {
351     const COUNT: usize = 100;
352 
353     let (s, r) = bounded(0);
354 
355     scope(|scope| {
356         scope.spawn(|_| {
357             for i in 0..COUNT {
358                 if i % 2 == 0 {
359                     thread::sleep(ms(50));
360                 }
361                 loop {
362                     if let Ok(()) = s.send_timeout(i, ms(10)) {
363                         break;
364                     }
365                 }
366             }
367         });
368 
369         scope.spawn(|_| {
370             for i in 0..COUNT {
371                 if i % 2 == 0 {
372                     thread::sleep(ms(50));
373                 }
374                 loop {
375                     if let Ok(x) = r.recv_timeout(ms(10)) {
376                         assert_eq!(x, i);
377                         break;
378                     }
379                 }
380             }
381         });
382     })
383     .unwrap();
384 }
385 
386 #[test]
drops()387 fn drops() {
388     static DROPS: AtomicUsize = AtomicUsize::new(0);
389 
390     #[derive(Debug, PartialEq)]
391     struct DropCounter;
392 
393     impl Drop for DropCounter {
394         fn drop(&mut self) {
395             DROPS.fetch_add(1, Ordering::SeqCst);
396         }
397     }
398 
399     let mut rng = thread_rng();
400 
401     for _ in 0..100 {
402         let steps = rng.gen_range(0..3_000);
403 
404         DROPS.store(0, Ordering::SeqCst);
405         let (s, r) = bounded::<DropCounter>(0);
406 
407         scope(|scope| {
408             scope.spawn(|_| {
409                 for _ in 0..steps {
410                     r.recv().unwrap();
411                 }
412             });
413 
414             scope.spawn(|_| {
415                 for _ in 0..steps {
416                     s.send(DropCounter).unwrap();
417                 }
418             });
419         })
420         .unwrap();
421 
422         assert_eq!(DROPS.load(Ordering::SeqCst), steps);
423         drop(s);
424         drop(r);
425         assert_eq!(DROPS.load(Ordering::SeqCst), steps);
426     }
427 }
428 
429 #[test]
fairness()430 fn fairness() {
431     const COUNT: usize = 10_000;
432 
433     let (s1, r1) = bounded::<()>(0);
434     let (s2, r2) = bounded::<()>(0);
435 
436     scope(|scope| {
437         scope.spawn(|_| {
438             let mut hits = [0usize; 2];
439             for _ in 0..COUNT {
440                 select! {
441                     recv(r1) -> _ => hits[0] += 1,
442                     recv(r2) -> _ => hits[1] += 1,
443                 }
444             }
445             assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
446         });
447 
448         let mut hits = [0usize; 2];
449         for _ in 0..COUNT {
450             select! {
451                 send(s1, ()) -> _ => hits[0] += 1,
452                 send(s2, ()) -> _ => hits[1] += 1,
453             }
454         }
455         assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
456     })
457     .unwrap();
458 }
459 
460 #[test]
fairness_duplicates()461 fn fairness_duplicates() {
462     const COUNT: usize = 10_000;
463 
464     let (s, r) = bounded::<()>(0);
465 
466     scope(|scope| {
467         scope.spawn(|_| {
468             let mut hits = [0usize; 5];
469             for _ in 0..COUNT {
470                 select! {
471                     recv(r) -> _ => hits[0] += 1,
472                     recv(r) -> _ => hits[1] += 1,
473                     recv(r) -> _ => hits[2] += 1,
474                     recv(r) -> _ => hits[3] += 1,
475                     recv(r) -> _ => hits[4] += 1,
476                 }
477             }
478             assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
479         });
480 
481         let mut hits = [0usize; 5];
482         for _ in 0..COUNT {
483             select! {
484                 send(s, ()) -> _ => hits[0] += 1,
485                 send(s, ()) -> _ => hits[1] += 1,
486                 send(s, ()) -> _ => hits[2] += 1,
487                 send(s, ()) -> _ => hits[3] += 1,
488                 send(s, ()) -> _ => hits[4] += 1,
489             }
490         }
491         assert!(hits.iter().all(|x| *x >= COUNT / hits.len() / 2));
492     })
493     .unwrap();
494 }
495 
496 #[test]
recv_in_send()497 fn recv_in_send() {
498     let (s, r) = bounded(0);
499 
500     scope(|scope| {
501         scope.spawn(|_| {
502             thread::sleep(ms(100));
503             r.recv()
504         });
505 
506         scope.spawn(|_| {
507             thread::sleep(ms(500));
508             s.send(()).unwrap();
509         });
510 
511         select! {
512             send(s, r.recv().unwrap()) -> _ => {}
513         }
514     })
515     .unwrap();
516 }
517 
518 #[test]
channel_through_channel()519 fn channel_through_channel() {
520     const COUNT: usize = 1000;
521 
522     type T = Box<dyn Any + Send>;
523 
524     let (s, r) = bounded::<T>(0);
525 
526     scope(|scope| {
527         scope.spawn(move |_| {
528             let mut s = s;
529 
530             for _ in 0..COUNT {
531                 let (new_s, new_r) = bounded(0);
532                 let new_r: T = Box::new(Some(new_r));
533 
534                 s.send(new_r).unwrap();
535                 s = new_s;
536             }
537         });
538 
539         scope.spawn(move |_| {
540             let mut r = r;
541 
542             for _ in 0..COUNT {
543                 r = r
544                     .recv()
545                     .unwrap()
546                     .downcast_mut::<Option<Receiver<T>>>()
547                     .unwrap()
548                     .take()
549                     .unwrap()
550             }
551         });
552     })
553     .unwrap();
554 }
555