1 //! Tests for channel readiness using the `Select` struct.
2 
3 use std::any::Any;
4 use std::cell::Cell;
5 use std::thread;
6 use std::time::{Duration, Instant};
7 
8 use crossbeam_channel::{after, bounded, tick, unbounded};
9 use crossbeam_channel::{Receiver, Select, TryRecvError, TrySendError};
10 use crossbeam_utils::thread::scope;
11 
ms(ms: u64) -> Duration12 fn ms(ms: u64) -> Duration {
13     Duration::from_millis(ms)
14 }
15 
16 #[test]
smoke1()17 fn smoke1() {
18     let (s1, r1) = unbounded::<usize>();
19     let (s2, r2) = unbounded::<usize>();
20 
21     s1.send(1).unwrap();
22 
23     let mut sel = Select::new();
24     sel.recv(&r1);
25     sel.recv(&r2);
26     assert_eq!(sel.ready(), 0);
27     assert_eq!(r1.try_recv(), Ok(1));
28 
29     s2.send(2).unwrap();
30 
31     let mut sel = Select::new();
32     sel.recv(&r1);
33     sel.recv(&r2);
34     assert_eq!(sel.ready(), 1);
35     assert_eq!(r2.try_recv(), Ok(2));
36 }
37 
38 #[test]
smoke2()39 fn smoke2() {
40     let (_s1, r1) = unbounded::<i32>();
41     let (_s2, r2) = unbounded::<i32>();
42     let (_s3, r3) = unbounded::<i32>();
43     let (_s4, r4) = unbounded::<i32>();
44     let (s5, r5) = unbounded::<i32>();
45 
46     s5.send(5).unwrap();
47 
48     let mut sel = Select::new();
49     sel.recv(&r1);
50     sel.recv(&r2);
51     sel.recv(&r3);
52     sel.recv(&r4);
53     sel.recv(&r5);
54     assert_eq!(sel.ready(), 4);
55     assert_eq!(r5.try_recv(), Ok(5));
56 }
57 
58 #[test]
disconnected()59 fn disconnected() {
60     let (s1, r1) = unbounded::<i32>();
61     let (s2, r2) = unbounded::<i32>();
62 
63     scope(|scope| {
64         scope.spawn(|_| {
65             drop(s1);
66             thread::sleep(ms(500));
67             s2.send(5).unwrap();
68         });
69 
70         let mut sel = Select::new();
71         sel.recv(&r1);
72         sel.recv(&r2);
73         match sel.ready_timeout(ms(1000)) {
74             Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)),
75             _ => panic!(),
76         }
77 
78         r2.recv().unwrap();
79     })
80     .unwrap();
81 
82     let mut sel = Select::new();
83     sel.recv(&r1);
84     sel.recv(&r2);
85     match sel.ready_timeout(ms(1000)) {
86         Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)),
87         _ => panic!(),
88     }
89 
90     scope(|scope| {
91         scope.spawn(|_| {
92             thread::sleep(ms(500));
93             drop(s2);
94         });
95 
96         let mut sel = Select::new();
97         sel.recv(&r2);
98         match sel.ready_timeout(ms(1000)) {
99             Ok(0) => assert_eq!(r2.try_recv(), Err(TryRecvError::Disconnected)),
100             _ => panic!(),
101         }
102     })
103     .unwrap();
104 }
105 
106 #[test]
default()107 fn default() {
108     let (s1, r1) = unbounded::<i32>();
109     let (s2, r2) = unbounded::<i32>();
110 
111     let mut sel = Select::new();
112     sel.recv(&r1);
113     sel.recv(&r2);
114     assert!(sel.try_ready().is_err());
115 
116     drop(s1);
117 
118     let mut sel = Select::new();
119     sel.recv(&r1);
120     sel.recv(&r2);
121     match sel.try_ready() {
122         Ok(0) => assert!(r1.try_recv().is_err()),
123         _ => panic!(),
124     }
125 
126     s2.send(2).unwrap();
127 
128     let mut sel = Select::new();
129     sel.recv(&r2);
130     match sel.try_ready() {
131         Ok(0) => assert_eq!(r2.try_recv(), Ok(2)),
132         _ => panic!(),
133     }
134 
135     let mut sel = Select::new();
136     sel.recv(&r2);
137     assert!(sel.try_ready().is_err());
138 
139     let mut sel = Select::new();
140     assert!(sel.try_ready().is_err());
141 }
142 
143 #[test]
timeout()144 fn timeout() {
145     let (_s1, r1) = unbounded::<i32>();
146     let (s2, r2) = unbounded::<i32>();
147 
148     scope(|scope| {
149         scope.spawn(|_| {
150             thread::sleep(ms(1500));
151             s2.send(2).unwrap();
152         });
153 
154         let mut sel = Select::new();
155         sel.recv(&r1);
156         sel.recv(&r2);
157         assert!(sel.ready_timeout(ms(1000)).is_err());
158 
159         let mut sel = Select::new();
160         sel.recv(&r1);
161         sel.recv(&r2);
162         match sel.ready_timeout(ms(1000)) {
163             Ok(1) => assert_eq!(r2.try_recv(), Ok(2)),
164             _ => panic!(),
165         }
166     })
167     .unwrap();
168 
169     scope(|scope| {
170         let (s, r) = unbounded::<i32>();
171 
172         scope.spawn(move |_| {
173             thread::sleep(ms(500));
174             drop(s);
175         });
176 
177         let mut sel = Select::new();
178         assert!(sel.ready_timeout(ms(1000)).is_err());
179 
180         let mut sel = Select::new();
181         sel.recv(&r);
182         match sel.try_ready() {
183             Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
184             _ => panic!(),
185         }
186     })
187     .unwrap();
188 }
189 
190 #[test]
default_when_disconnected()191 fn default_when_disconnected() {
192     let (_, r) = unbounded::<i32>();
193 
194     let mut sel = Select::new();
195     sel.recv(&r);
196     match sel.try_ready() {
197         Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
198         _ => panic!(),
199     }
200 
201     let (_, r) = unbounded::<i32>();
202 
203     let mut sel = Select::new();
204     sel.recv(&r);
205     match sel.ready_timeout(ms(1000)) {
206         Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
207         _ => panic!(),
208     }
209 
210     let (s, _) = bounded::<i32>(0);
211 
212     let mut sel = Select::new();
213     sel.send(&s);
214     match sel.try_ready() {
215         Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))),
216         _ => panic!(),
217     }
218 
219     let (s, _) = bounded::<i32>(0);
220 
221     let mut sel = Select::new();
222     sel.send(&s);
223     match sel.ready_timeout(ms(1000)) {
224         Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))),
225         _ => panic!(),
226     }
227 }
228 
229 #[test]
default_only()230 fn default_only() {
231     let start = Instant::now();
232 
233     let mut sel = Select::new();
234     assert!(sel.try_ready().is_err());
235     let now = Instant::now();
236     assert!(now - start <= ms(50));
237 
238     let start = Instant::now();
239     let mut sel = Select::new();
240     assert!(sel.ready_timeout(ms(500)).is_err());
241     let now = Instant::now();
242     assert!(now - start >= ms(450));
243     assert!(now - start <= ms(550));
244 }
245 
246 #[test]
unblocks()247 fn unblocks() {
248     let (s1, r1) = bounded::<i32>(0);
249     let (s2, r2) = bounded::<i32>(0);
250 
251     scope(|scope| {
252         scope.spawn(|_| {
253             thread::sleep(ms(500));
254             s2.send(2).unwrap();
255         });
256 
257         let mut sel = Select::new();
258         sel.recv(&r1);
259         sel.recv(&r2);
260         match sel.ready_timeout(ms(1000)) {
261             Ok(1) => assert_eq!(r2.try_recv(), Ok(2)),
262             _ => panic!(),
263         }
264     })
265     .unwrap();
266 
267     scope(|scope| {
268         scope.spawn(|_| {
269             thread::sleep(ms(500));
270             assert_eq!(r1.recv().unwrap(), 1);
271         });
272 
273         let mut sel = Select::new();
274         let oper1 = sel.send(&s1);
275         let oper2 = sel.send(&s2);
276         let oper = sel.select_timeout(ms(1000));
277         match oper {
278             Err(_) => panic!(),
279             Ok(oper) => match oper.index() {
280                 i if i == oper1 => oper.send(&s1, 1).unwrap(),
281                 i if i == oper2 => panic!(),
282                 _ => unreachable!(),
283             },
284         }
285     })
286     .unwrap();
287 }
288 
289 #[test]
both_ready()290 fn both_ready() {
291     let (s1, r1) = bounded(0);
292     let (s2, r2) = bounded(0);
293 
294     scope(|scope| {
295         scope.spawn(|_| {
296             thread::sleep(ms(500));
297             s1.send(1).unwrap();
298             assert_eq!(r2.recv().unwrap(), 2);
299         });
300 
301         for _ in 0..2 {
302             let mut sel = Select::new();
303             sel.recv(&r1);
304             sel.send(&s2);
305             match sel.ready() {
306                 0 => assert_eq!(r1.try_recv(), Ok(1)),
307                 1 => s2.try_send(2).unwrap(),
308                 _ => panic!(),
309             }
310         }
311     })
312     .unwrap();
313 }
314 
315 #[test]
cloning1()316 fn cloning1() {
317     scope(|scope| {
318         let (s1, r1) = unbounded::<i32>();
319         let (_s2, r2) = unbounded::<i32>();
320         let (s3, r3) = unbounded::<()>();
321 
322         scope.spawn(move |_| {
323             r3.recv().unwrap();
324             drop(s1.clone());
325             assert!(r3.try_recv().is_err());
326             s1.send(1).unwrap();
327             r3.recv().unwrap();
328         });
329 
330         s3.send(()).unwrap();
331 
332         let mut sel = Select::new();
333         sel.recv(&r1);
334         sel.recv(&r2);
335         match sel.ready() {
336             0 => drop(r1.try_recv()),
337             1 => drop(r2.try_recv()),
338             _ => panic!(),
339         }
340 
341         s3.send(()).unwrap();
342     })
343     .unwrap();
344 }
345 
346 #[test]
cloning2()347 fn cloning2() {
348     let (s1, r1) = unbounded::<()>();
349     let (s2, r2) = unbounded::<()>();
350     let (_s3, _r3) = unbounded::<()>();
351 
352     scope(|scope| {
353         scope.spawn(move |_| {
354             let mut sel = Select::new();
355             sel.recv(&r1);
356             sel.recv(&r2);
357             match sel.ready() {
358                 0 => panic!(),
359                 1 => drop(r2.try_recv()),
360                 _ => panic!(),
361             }
362         });
363 
364         thread::sleep(ms(500));
365         drop(s1.clone());
366         s2.send(()).unwrap();
367     })
368     .unwrap();
369 }
370 
371 #[test]
preflight1()372 fn preflight1() {
373     let (s, r) = unbounded();
374     s.send(()).unwrap();
375 
376     let mut sel = Select::new();
377     sel.recv(&r);
378     match sel.ready() {
379         0 => drop(r.try_recv()),
380         _ => panic!(),
381     }
382 }
383 
384 #[test]
preflight2()385 fn preflight2() {
386     let (s, r) = unbounded();
387     drop(s.clone());
388     s.send(()).unwrap();
389     drop(s);
390 
391     let mut sel = Select::new();
392     sel.recv(&r);
393     match sel.ready() {
394         0 => assert_eq!(r.try_recv(), Ok(())),
395         _ => panic!(),
396     }
397 
398     assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
399 }
400 
401 #[test]
preflight3()402 fn preflight3() {
403     let (s, r) = unbounded();
404     drop(s.clone());
405     s.send(()).unwrap();
406     drop(s);
407     r.recv().unwrap();
408 
409     let mut sel = Select::new();
410     sel.recv(&r);
411     match sel.ready() {
412         0 => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
413         _ => panic!(),
414     }
415 }
416 
417 #[test]
duplicate_operations()418 fn duplicate_operations() {
419     let (s, r) = unbounded::<i32>();
420     let hit = vec![Cell::new(false); 4];
421 
422     while hit.iter().map(|h| h.get()).any(|hit| !hit) {
423         let mut sel = Select::new();
424         sel.recv(&r);
425         sel.recv(&r);
426         sel.send(&s);
427         sel.send(&s);
428         match sel.ready() {
429             0 => {
430                 assert!(r.try_recv().is_ok());
431                 hit[0].set(true);
432             }
433             1 => {
434                 assert!(r.try_recv().is_ok());
435                 hit[1].set(true);
436             }
437             2 => {
438                 assert!(s.try_send(0).is_ok());
439                 hit[2].set(true);
440             }
441             3 => {
442                 assert!(s.try_send(0).is_ok());
443                 hit[3].set(true);
444             }
445             _ => panic!(),
446         }
447     }
448 }
449 
450 #[test]
nesting()451 fn nesting() {
452     let (s, r) = unbounded::<i32>();
453 
454     let mut sel = Select::new();
455     sel.send(&s);
456     match sel.ready() {
457         0 => {
458             assert!(s.try_send(0).is_ok());
459 
460             let mut sel = Select::new();
461             sel.recv(&r);
462             match sel.ready() {
463                 0 => {
464                     assert_eq!(r.try_recv(), Ok(0));
465 
466                     let mut sel = Select::new();
467                     sel.send(&s);
468                     match sel.ready() {
469                         0 => {
470                             assert!(s.try_send(1).is_ok());
471 
472                             let mut sel = Select::new();
473                             sel.recv(&r);
474                             match sel.ready() {
475                                 0 => {
476                                     assert_eq!(r.try_recv(), Ok(1));
477                                 }
478                                 _ => panic!(),
479                             }
480                         }
481                         _ => panic!(),
482                     }
483                 }
484                 _ => panic!(),
485             }
486         }
487         _ => panic!(),
488     }
489 }
490 
491 #[test]
stress_recv()492 fn stress_recv() {
493     const COUNT: usize = 10_000;
494 
495     let (s1, r1) = unbounded();
496     let (s2, r2) = bounded(5);
497     let (s3, r3) = bounded(0);
498 
499     scope(|scope| {
500         scope.spawn(|_| {
501             for i in 0..COUNT {
502                 s1.send(i).unwrap();
503                 r3.recv().unwrap();
504 
505                 s2.send(i).unwrap();
506                 r3.recv().unwrap();
507             }
508         });
509 
510         for i in 0..COUNT {
511             for _ in 0..2 {
512                 let mut sel = Select::new();
513                 sel.recv(&r1);
514                 sel.recv(&r2);
515                 match sel.ready() {
516                     0 => assert_eq!(r1.try_recv(), Ok(i)),
517                     1 => assert_eq!(r2.try_recv(), Ok(i)),
518                     _ => panic!(),
519                 }
520 
521                 s3.send(()).unwrap();
522             }
523         }
524     })
525     .unwrap();
526 }
527 
528 #[test]
stress_send()529 fn stress_send() {
530     const COUNT: usize = 10_000;
531 
532     let (s1, r1) = bounded(0);
533     let (s2, r2) = bounded(0);
534     let (s3, r3) = bounded(100);
535 
536     scope(|scope| {
537         scope.spawn(|_| {
538             for i in 0..COUNT {
539                 assert_eq!(r1.recv().unwrap(), i);
540                 assert_eq!(r2.recv().unwrap(), i);
541                 r3.recv().unwrap();
542             }
543         });
544 
545         for i in 0..COUNT {
546             for _ in 0..2 {
547                 let mut sel = Select::new();
548                 sel.send(&s1);
549                 sel.send(&s2);
550                 match sel.ready() {
551                     0 => assert!(s1.try_send(i).is_ok()),
552                     1 => assert!(s2.try_send(i).is_ok()),
553                     _ => panic!(),
554                 }
555             }
556             s3.send(()).unwrap();
557         }
558     })
559     .unwrap();
560 }
561 
562 #[test]
stress_mixed()563 fn stress_mixed() {
564     const COUNT: usize = 10_000;
565 
566     let (s1, r1) = bounded(0);
567     let (s2, r2) = bounded(0);
568     let (s3, r3) = bounded(100);
569 
570     scope(|scope| {
571         scope.spawn(|_| {
572             for i in 0..COUNT {
573                 s1.send(i).unwrap();
574                 assert_eq!(r2.recv().unwrap(), i);
575                 r3.recv().unwrap();
576             }
577         });
578 
579         for i in 0..COUNT {
580             for _ in 0..2 {
581                 let mut sel = Select::new();
582                 sel.recv(&r1);
583                 sel.send(&s2);
584                 match sel.ready() {
585                     0 => assert_eq!(r1.try_recv(), Ok(i)),
586                     1 => assert!(s2.try_send(i).is_ok()),
587                     _ => panic!(),
588                 }
589             }
590             s3.send(()).unwrap();
591         }
592     })
593     .unwrap();
594 }
595 
596 #[test]
stress_timeout_two_threads()597 fn stress_timeout_two_threads() {
598     const COUNT: usize = 20;
599 
600     let (s, r) = bounded(2);
601 
602     scope(|scope| {
603         scope.spawn(|_| {
604             for i in 0..COUNT {
605                 if i % 2 == 0 {
606                     thread::sleep(ms(500));
607                 }
608 
609                 loop {
610                     let mut sel = Select::new();
611                     sel.send(&s);
612                     match sel.ready_timeout(ms(100)) {
613                         Err(_) => {}
614                         Ok(0) => {
615                             assert!(s.try_send(i).is_ok());
616                             break;
617                         }
618                         Ok(_) => panic!(),
619                     }
620                 }
621             }
622         });
623 
624         scope.spawn(|_| {
625             for i in 0..COUNT {
626                 if i % 2 == 0 {
627                     thread::sleep(ms(500));
628                 }
629 
630                 loop {
631                     let mut sel = Select::new();
632                     sel.recv(&r);
633                     match sel.ready_timeout(ms(100)) {
634                         Err(_) => {}
635                         Ok(0) => {
636                             assert_eq!(r.try_recv(), Ok(i));
637                             break;
638                         }
639                         Ok(_) => panic!(),
640                     }
641                 }
642             }
643         });
644     })
645     .unwrap();
646 }
647 
648 #[test]
send_recv_same_channel()649 fn send_recv_same_channel() {
650     let (s, r) = bounded::<i32>(0);
651     let mut sel = Select::new();
652     sel.send(&s);
653     sel.recv(&r);
654     assert!(sel.ready_timeout(ms(100)).is_err());
655 
656     let (s, r) = unbounded::<i32>();
657     let mut sel = Select::new();
658     sel.send(&s);
659     sel.recv(&r);
660     match sel.ready_timeout(ms(100)) {
661         Err(_) => panic!(),
662         Ok(0) => assert!(s.try_send(0).is_ok()),
663         Ok(_) => panic!(),
664     }
665 }
666 
667 #[test]
channel_through_channel()668 fn channel_through_channel() {
669     const COUNT: usize = 1000;
670 
671     type T = Box<dyn Any + Send>;
672 
673     for cap in 1..4 {
674         let (s, r) = bounded::<T>(cap);
675 
676         scope(|scope| {
677             scope.spawn(move |_| {
678                 let mut s = s;
679 
680                 for _ in 0..COUNT {
681                     let (new_s, new_r) = bounded(cap);
682                     let new_r: T = Box::new(Some(new_r));
683 
684                     {
685                         let mut sel = Select::new();
686                         sel.send(&s);
687                         match sel.ready() {
688                             0 => assert!(s.try_send(new_r).is_ok()),
689                             _ => panic!(),
690                         }
691                     }
692 
693                     s = new_s;
694                 }
695             });
696 
697             scope.spawn(move |_| {
698                 let mut r = r;
699 
700                 for _ in 0..COUNT {
701                     let new = {
702                         let mut sel = Select::new();
703                         sel.recv(&r);
704                         match sel.ready() {
705                             0 => r
706                                 .try_recv()
707                                 .unwrap()
708                                 .downcast_mut::<Option<Receiver<T>>>()
709                                 .unwrap()
710                                 .take()
711                                 .unwrap(),
712                             _ => panic!(),
713                         }
714                     };
715                     r = new;
716                 }
717             });
718         })
719         .unwrap();
720     }
721 }
722 
723 #[test]
fairness1()724 fn fairness1() {
725     const COUNT: usize = 10_000;
726 
727     let (s1, r1) = bounded::<()>(COUNT);
728     let (s2, r2) = unbounded::<()>();
729 
730     for _ in 0..COUNT {
731         s1.send(()).unwrap();
732         s2.send(()).unwrap();
733     }
734 
735     let hits = vec![Cell::new(0usize); 4];
736     for _ in 0..COUNT {
737         let after = after(ms(0));
738         let tick = tick(ms(0));
739 
740         let mut sel = Select::new();
741         sel.recv(&r1);
742         sel.recv(&r2);
743         sel.recv(&after);
744         sel.recv(&tick);
745         match sel.ready() {
746             0 => {
747                 r1.try_recv().unwrap();
748                 hits[0].set(hits[0].get() + 1);
749             }
750             1 => {
751                 r2.try_recv().unwrap();
752                 hits[1].set(hits[1].get() + 1);
753             }
754             2 => {
755                 after.try_recv().unwrap();
756                 hits[2].set(hits[2].get() + 1);
757             }
758             3 => {
759                 tick.try_recv().unwrap();
760                 hits[3].set(hits[3].get() + 1);
761             }
762             _ => panic!(),
763         }
764     }
765     assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2));
766 }
767 
768 #[test]
fairness2()769 fn fairness2() {
770     const COUNT: usize = 100_000;
771 
772     let (s1, r1) = unbounded::<()>();
773     let (s2, r2) = bounded::<()>(1);
774     let (s3, r3) = bounded::<()>(0);
775 
776     scope(|scope| {
777         scope.spawn(|_| {
778             for _ in 0..COUNT {
779                 let mut sel = Select::new();
780                 let mut oper1 = None;
781                 let mut oper2 = None;
782                 if s1.is_empty() {
783                     oper1 = Some(sel.send(&s1));
784                 }
785                 if s2.is_empty() {
786                     oper2 = Some(sel.send(&s2));
787                 }
788                 let oper3 = sel.send(&s3);
789                 let oper = sel.select();
790                 match oper.index() {
791                     i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()),
792                     i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()),
793                     i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()),
794                     _ => unreachable!(),
795                 }
796             }
797         });
798 
799         let hits = vec![Cell::new(0usize); 3];
800         for _ in 0..COUNT {
801             let mut sel = Select::new();
802             sel.recv(&r1);
803             sel.recv(&r2);
804             sel.recv(&r3);
805             loop {
806                 match sel.ready() {
807                     0 => {
808                         if r1.try_recv().is_ok() {
809                             hits[0].set(hits[0].get() + 1);
810                             break;
811                         }
812                     }
813                     1 => {
814                         if r2.try_recv().is_ok() {
815                             hits[1].set(hits[1].get() + 1);
816                             break;
817                         }
818                     }
819                     2 => {
820                         if r3.try_recv().is_ok() {
821                             hits[2].set(hits[2].get() + 1);
822                             break;
823                         }
824                     }
825                     _ => unreachable!(),
826                 }
827             }
828         }
829         assert!(hits.iter().all(|x| x.get() > 0));
830     })
831     .unwrap();
832 }
833