1 //! Tests for channel readiness using the `Select` struct.
2 
3 #![allow(clippy::drop_copy)]
4 
5 use std::any::Any;
6 use std::cell::Cell;
7 use std::thread;
8 use std::time::{Duration, Instant};
9 
10 use crossbeam_channel::{after, bounded, tick, unbounded};
11 use crossbeam_channel::{Receiver, Select, TryRecvError, TrySendError};
12 use crossbeam_utils::thread::scope;
13 
ms(ms: u64) -> Duration14 fn ms(ms: u64) -> Duration {
15     Duration::from_millis(ms)
16 }
17 
18 #[test]
smoke1()19 fn smoke1() {
20     let (s1, r1) = unbounded::<usize>();
21     let (s2, r2) = unbounded::<usize>();
22 
23     s1.send(1).unwrap();
24 
25     let mut sel = Select::new();
26     sel.recv(&r1);
27     sel.recv(&r2);
28     assert_eq!(sel.ready(), 0);
29     assert_eq!(r1.try_recv(), Ok(1));
30 
31     s2.send(2).unwrap();
32 
33     let mut sel = Select::new();
34     sel.recv(&r1);
35     sel.recv(&r2);
36     assert_eq!(sel.ready(), 1);
37     assert_eq!(r2.try_recv(), Ok(2));
38 }
39 
40 #[test]
smoke2()41 fn smoke2() {
42     let (_s1, r1) = unbounded::<i32>();
43     let (_s2, r2) = unbounded::<i32>();
44     let (_s3, r3) = unbounded::<i32>();
45     let (_s4, r4) = unbounded::<i32>();
46     let (s5, r5) = unbounded::<i32>();
47 
48     s5.send(5).unwrap();
49 
50     let mut sel = Select::new();
51     sel.recv(&r1);
52     sel.recv(&r2);
53     sel.recv(&r3);
54     sel.recv(&r4);
55     sel.recv(&r5);
56     assert_eq!(sel.ready(), 4);
57     assert_eq!(r5.try_recv(), Ok(5));
58 }
59 
60 #[test]
disconnected()61 fn disconnected() {
62     let (s1, r1) = unbounded::<i32>();
63     let (s2, r2) = unbounded::<i32>();
64 
65     scope(|scope| {
66         scope.spawn(|_| {
67             drop(s1);
68             thread::sleep(ms(500));
69             s2.send(5).unwrap();
70         });
71 
72         let mut sel = Select::new();
73         sel.recv(&r1);
74         sel.recv(&r2);
75         match sel.ready_timeout(ms(1000)) {
76             Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)),
77             _ => panic!(),
78         }
79 
80         r2.recv().unwrap();
81     })
82     .unwrap();
83 
84     let mut sel = Select::new();
85     sel.recv(&r1);
86     sel.recv(&r2);
87     match sel.ready_timeout(ms(1000)) {
88         Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)),
89         _ => panic!(),
90     }
91 
92     scope(|scope| {
93         scope.spawn(|_| {
94             thread::sleep(ms(500));
95             drop(s2);
96         });
97 
98         let mut sel = Select::new();
99         sel.recv(&r2);
100         match sel.ready_timeout(ms(1000)) {
101             Ok(0) => assert_eq!(r2.try_recv(), Err(TryRecvError::Disconnected)),
102             _ => panic!(),
103         }
104     })
105     .unwrap();
106 }
107 
108 #[test]
default()109 fn default() {
110     let (s1, r1) = unbounded::<i32>();
111     let (s2, r2) = unbounded::<i32>();
112 
113     let mut sel = Select::new();
114     sel.recv(&r1);
115     sel.recv(&r2);
116     assert!(sel.try_ready().is_err());
117 
118     drop(s1);
119 
120     let mut sel = Select::new();
121     sel.recv(&r1);
122     sel.recv(&r2);
123     match sel.try_ready() {
124         Ok(0) => assert!(r1.try_recv().is_err()),
125         _ => panic!(),
126     }
127 
128     s2.send(2).unwrap();
129 
130     let mut sel = Select::new();
131     sel.recv(&r2);
132     match sel.try_ready() {
133         Ok(0) => assert_eq!(r2.try_recv(), Ok(2)),
134         _ => panic!(),
135     }
136 
137     let mut sel = Select::new();
138     sel.recv(&r2);
139     assert!(sel.try_ready().is_err());
140 
141     let mut sel = Select::new();
142     assert!(sel.try_ready().is_err());
143 }
144 
145 #[test]
timeout()146 fn timeout() {
147     let (_s1, r1) = unbounded::<i32>();
148     let (s2, r2) = unbounded::<i32>();
149 
150     scope(|scope| {
151         scope.spawn(|_| {
152             thread::sleep(ms(1500));
153             s2.send(2).unwrap();
154         });
155 
156         let mut sel = Select::new();
157         sel.recv(&r1);
158         sel.recv(&r2);
159         assert!(sel.ready_timeout(ms(1000)).is_err());
160 
161         let mut sel = Select::new();
162         sel.recv(&r1);
163         sel.recv(&r2);
164         match sel.ready_timeout(ms(1000)) {
165             Ok(1) => assert_eq!(r2.try_recv(), Ok(2)),
166             _ => panic!(),
167         }
168     })
169     .unwrap();
170 
171     scope(|scope| {
172         let (s, r) = unbounded::<i32>();
173 
174         scope.spawn(move |_| {
175             thread::sleep(ms(500));
176             drop(s);
177         });
178 
179         let mut sel = Select::new();
180         assert!(sel.ready_timeout(ms(1000)).is_err());
181 
182         let mut sel = Select::new();
183         sel.recv(&r);
184         match sel.try_ready() {
185             Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
186             _ => panic!(),
187         }
188     })
189     .unwrap();
190 }
191 
192 #[test]
default_when_disconnected()193 fn default_when_disconnected() {
194     let (_, r) = unbounded::<i32>();
195 
196     let mut sel = Select::new();
197     sel.recv(&r);
198     match sel.try_ready() {
199         Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
200         _ => panic!(),
201     }
202 
203     let (_, r) = unbounded::<i32>();
204 
205     let mut sel = Select::new();
206     sel.recv(&r);
207     match sel.ready_timeout(ms(1000)) {
208         Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
209         _ => panic!(),
210     }
211 
212     let (s, _) = bounded::<i32>(0);
213 
214     let mut sel = Select::new();
215     sel.send(&s);
216     match sel.try_ready() {
217         Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))),
218         _ => panic!(),
219     }
220 
221     let (s, _) = bounded::<i32>(0);
222 
223     let mut sel = Select::new();
224     sel.send(&s);
225     match sel.ready_timeout(ms(1000)) {
226         Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))),
227         _ => panic!(),
228     }
229 }
230 
231 #[test]
default_only()232 fn default_only() {
233     let start = Instant::now();
234 
235     let mut sel = Select::new();
236     assert!(sel.try_ready().is_err());
237     let now = Instant::now();
238     assert!(now - start <= ms(50));
239 
240     let start = Instant::now();
241     let mut sel = Select::new();
242     assert!(sel.ready_timeout(ms(500)).is_err());
243     let now = Instant::now();
244     assert!(now - start >= ms(450));
245     assert!(now - start <= ms(550));
246 }
247 
248 #[test]
unblocks()249 fn unblocks() {
250     let (s1, r1) = bounded::<i32>(0);
251     let (s2, r2) = bounded::<i32>(0);
252 
253     scope(|scope| {
254         scope.spawn(|_| {
255             thread::sleep(ms(500));
256             s2.send(2).unwrap();
257         });
258 
259         let mut sel = Select::new();
260         sel.recv(&r1);
261         sel.recv(&r2);
262         match sel.ready_timeout(ms(1000)) {
263             Ok(1) => assert_eq!(r2.try_recv(), Ok(2)),
264             _ => panic!(),
265         }
266     })
267     .unwrap();
268 
269     scope(|scope| {
270         scope.spawn(|_| {
271             thread::sleep(ms(500));
272             assert_eq!(r1.recv().unwrap(), 1);
273         });
274 
275         let mut sel = Select::new();
276         let oper1 = sel.send(&s1);
277         let oper2 = sel.send(&s2);
278         let oper = sel.select_timeout(ms(1000));
279         match oper {
280             Err(_) => panic!(),
281             Ok(oper) => match oper.index() {
282                 i if i == oper1 => oper.send(&s1, 1).unwrap(),
283                 i if i == oper2 => panic!(),
284                 _ => unreachable!(),
285             },
286         }
287     })
288     .unwrap();
289 }
290 
291 #[test]
both_ready()292 fn both_ready() {
293     let (s1, r1) = bounded(0);
294     let (s2, r2) = bounded(0);
295 
296     scope(|scope| {
297         scope.spawn(|_| {
298             thread::sleep(ms(500));
299             s1.send(1).unwrap();
300             assert_eq!(r2.recv().unwrap(), 2);
301         });
302 
303         for _ in 0..2 {
304             let mut sel = Select::new();
305             sel.recv(&r1);
306             sel.send(&s2);
307             match sel.ready() {
308                 0 => assert_eq!(r1.try_recv(), Ok(1)),
309                 1 => s2.try_send(2).unwrap(),
310                 _ => panic!(),
311             }
312         }
313     })
314     .unwrap();
315 }
316 
317 #[test]
cloning1()318 fn cloning1() {
319     scope(|scope| {
320         let (s1, r1) = unbounded::<i32>();
321         let (_s2, r2) = unbounded::<i32>();
322         let (s3, r3) = unbounded::<()>();
323 
324         scope.spawn(move |_| {
325             r3.recv().unwrap();
326             drop(s1.clone());
327             assert!(r3.try_recv().is_err());
328             s1.send(1).unwrap();
329             r3.recv().unwrap();
330         });
331 
332         s3.send(()).unwrap();
333 
334         let mut sel = Select::new();
335         sel.recv(&r1);
336         sel.recv(&r2);
337         match sel.ready() {
338             0 => drop(r1.try_recv()),
339             1 => drop(r2.try_recv()),
340             _ => panic!(),
341         }
342 
343         s3.send(()).unwrap();
344     })
345     .unwrap();
346 }
347 
348 #[test]
cloning2()349 fn cloning2() {
350     let (s1, r1) = unbounded::<()>();
351     let (s2, r2) = unbounded::<()>();
352     let (_s3, _r3) = unbounded::<()>();
353 
354     scope(|scope| {
355         scope.spawn(move |_| {
356             let mut sel = Select::new();
357             sel.recv(&r1);
358             sel.recv(&r2);
359             match sel.ready() {
360                 0 => panic!(),
361                 1 => drop(r2.try_recv()),
362                 _ => panic!(),
363             }
364         });
365 
366         thread::sleep(ms(500));
367         drop(s1.clone());
368         s2.send(()).unwrap();
369     })
370     .unwrap();
371 }
372 
373 #[test]
preflight1()374 fn preflight1() {
375     let (s, r) = unbounded();
376     s.send(()).unwrap();
377 
378     let mut sel = Select::new();
379     sel.recv(&r);
380     match sel.ready() {
381         0 => drop(r.try_recv()),
382         _ => panic!(),
383     }
384 }
385 
386 #[test]
preflight2()387 fn preflight2() {
388     let (s, r) = unbounded();
389     drop(s.clone());
390     s.send(()).unwrap();
391     drop(s);
392 
393     let mut sel = Select::new();
394     sel.recv(&r);
395     match sel.ready() {
396         0 => assert_eq!(r.try_recv(), Ok(())),
397         _ => panic!(),
398     }
399 
400     assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
401 }
402 
403 #[test]
preflight3()404 fn preflight3() {
405     let (s, r) = unbounded();
406     drop(s.clone());
407     s.send(()).unwrap();
408     drop(s);
409     r.recv().unwrap();
410 
411     let mut sel = Select::new();
412     sel.recv(&r);
413     match sel.ready() {
414         0 => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
415         _ => panic!(),
416     }
417 }
418 
419 #[test]
duplicate_operations()420 fn duplicate_operations() {
421     let (s, r) = unbounded::<i32>();
422     let hit = vec![Cell::new(false); 4];
423 
424     while hit.iter().map(|h| h.get()).any(|hit| !hit) {
425         let mut sel = Select::new();
426         sel.recv(&r);
427         sel.recv(&r);
428         sel.send(&s);
429         sel.send(&s);
430         match sel.ready() {
431             0 => {
432                 assert!(r.try_recv().is_ok());
433                 hit[0].set(true);
434             }
435             1 => {
436                 assert!(r.try_recv().is_ok());
437                 hit[1].set(true);
438             }
439             2 => {
440                 assert!(s.try_send(0).is_ok());
441                 hit[2].set(true);
442             }
443             3 => {
444                 assert!(s.try_send(0).is_ok());
445                 hit[3].set(true);
446             }
447             _ => panic!(),
448         }
449     }
450 }
451 
452 #[test]
nesting()453 fn nesting() {
454     let (s, r) = unbounded::<i32>();
455 
456     let mut sel = Select::new();
457     sel.send(&s);
458     match sel.ready() {
459         0 => {
460             assert!(s.try_send(0).is_ok());
461 
462             let mut sel = Select::new();
463             sel.recv(&r);
464             match sel.ready() {
465                 0 => {
466                     assert_eq!(r.try_recv(), Ok(0));
467 
468                     let mut sel = Select::new();
469                     sel.send(&s);
470                     match sel.ready() {
471                         0 => {
472                             assert!(s.try_send(1).is_ok());
473 
474                             let mut sel = Select::new();
475                             sel.recv(&r);
476                             match sel.ready() {
477                                 0 => {
478                                     assert_eq!(r.try_recv(), Ok(1));
479                                 }
480                                 _ => panic!(),
481                             }
482                         }
483                         _ => panic!(),
484                     }
485                 }
486                 _ => panic!(),
487             }
488         }
489         _ => panic!(),
490     }
491 }
492 
493 #[test]
stress_recv()494 fn stress_recv() {
495     #[cfg(miri)]
496     const COUNT: usize = 100;
497     #[cfg(not(miri))]
498     const COUNT: usize = 10_000;
499 
500     let (s1, r1) = unbounded();
501     let (s2, r2) = bounded(5);
502     let (s3, r3) = bounded(0);
503 
504     scope(|scope| {
505         scope.spawn(|_| {
506             for i in 0..COUNT {
507                 s1.send(i).unwrap();
508                 r3.recv().unwrap();
509 
510                 s2.send(i).unwrap();
511                 r3.recv().unwrap();
512             }
513         });
514 
515         for i in 0..COUNT {
516             for _ in 0..2 {
517                 let mut sel = Select::new();
518                 sel.recv(&r1);
519                 sel.recv(&r2);
520                 match sel.ready() {
521                     0 => assert_eq!(r1.try_recv(), Ok(i)),
522                     1 => assert_eq!(r2.try_recv(), Ok(i)),
523                     _ => panic!(),
524                 }
525 
526                 s3.send(()).unwrap();
527             }
528         }
529     })
530     .unwrap();
531 }
532 
533 #[test]
stress_send()534 fn stress_send() {
535     #[cfg(miri)]
536     const COUNT: usize = 100;
537     #[cfg(not(miri))]
538     const COUNT: usize = 10_000;
539 
540     let (s1, r1) = bounded(0);
541     let (s2, r2) = bounded(0);
542     let (s3, r3) = bounded(100);
543 
544     scope(|scope| {
545         scope.spawn(|_| {
546             for i in 0..COUNT {
547                 assert_eq!(r1.recv().unwrap(), i);
548                 assert_eq!(r2.recv().unwrap(), i);
549                 r3.recv().unwrap();
550             }
551         });
552 
553         for i in 0..COUNT {
554             for _ in 0..2 {
555                 let mut sel = Select::new();
556                 sel.send(&s1);
557                 sel.send(&s2);
558                 match sel.ready() {
559                     0 => assert!(s1.try_send(i).is_ok()),
560                     1 => assert!(s2.try_send(i).is_ok()),
561                     _ => panic!(),
562                 }
563             }
564             s3.send(()).unwrap();
565         }
566     })
567     .unwrap();
568 }
569 
570 #[test]
stress_mixed()571 fn stress_mixed() {
572     #[cfg(miri)]
573     const COUNT: usize = 100;
574     #[cfg(not(miri))]
575     const COUNT: usize = 10_000;
576 
577     let (s1, r1) = bounded(0);
578     let (s2, r2) = bounded(0);
579     let (s3, r3) = bounded(100);
580 
581     scope(|scope| {
582         scope.spawn(|_| {
583             for i in 0..COUNT {
584                 s1.send(i).unwrap();
585                 assert_eq!(r2.recv().unwrap(), i);
586                 r3.recv().unwrap();
587             }
588         });
589 
590         for i in 0..COUNT {
591             for _ in 0..2 {
592                 let mut sel = Select::new();
593                 sel.recv(&r1);
594                 sel.send(&s2);
595                 match sel.ready() {
596                     0 => assert_eq!(r1.try_recv(), Ok(i)),
597                     1 => assert!(s2.try_send(i).is_ok()),
598                     _ => panic!(),
599                 }
600             }
601             s3.send(()).unwrap();
602         }
603     })
604     .unwrap();
605 }
606 
607 #[test]
stress_timeout_two_threads()608 fn stress_timeout_two_threads() {
609     const COUNT: usize = 20;
610 
611     let (s, r) = bounded(2);
612 
613     scope(|scope| {
614         scope.spawn(|_| {
615             for i in 0..COUNT {
616                 if i % 2 == 0 {
617                     thread::sleep(ms(500));
618                 }
619 
620                 loop {
621                     let mut sel = Select::new();
622                     sel.send(&s);
623                     match sel.ready_timeout(ms(100)) {
624                         Err(_) => {}
625                         Ok(0) => {
626                             assert!(s.try_send(i).is_ok());
627                             break;
628                         }
629                         Ok(_) => panic!(),
630                     }
631                 }
632             }
633         });
634 
635         scope.spawn(|_| {
636             for i in 0..COUNT {
637                 if i % 2 == 0 {
638                     thread::sleep(ms(500));
639                 }
640 
641                 loop {
642                     let mut sel = Select::new();
643                     sel.recv(&r);
644                     match sel.ready_timeout(ms(100)) {
645                         Err(_) => {}
646                         Ok(0) => {
647                             assert_eq!(r.try_recv(), Ok(i));
648                             break;
649                         }
650                         Ok(_) => panic!(),
651                     }
652                 }
653             }
654         });
655     })
656     .unwrap();
657 }
658 
659 #[test]
send_recv_same_channel()660 fn send_recv_same_channel() {
661     let (s, r) = bounded::<i32>(0);
662     let mut sel = Select::new();
663     sel.send(&s);
664     sel.recv(&r);
665     assert!(sel.ready_timeout(ms(100)).is_err());
666 
667     let (s, r) = unbounded::<i32>();
668     let mut sel = Select::new();
669     sel.send(&s);
670     sel.recv(&r);
671     match sel.ready_timeout(ms(100)) {
672         Err(_) => panic!(),
673         Ok(0) => assert!(s.try_send(0).is_ok()),
674         Ok(_) => panic!(),
675     }
676 }
677 
678 #[test]
channel_through_channel()679 fn channel_through_channel() {
680     #[cfg(miri)]
681     const COUNT: usize = 100;
682     #[cfg(not(miri))]
683     const COUNT: usize = 1000;
684 
685     type T = Box<dyn Any + Send>;
686 
687     for cap in 1..4 {
688         let (s, r) = bounded::<T>(cap);
689 
690         scope(|scope| {
691             scope.spawn(move |_| {
692                 let mut s = s;
693 
694                 for _ in 0..COUNT {
695                     let (new_s, new_r) = bounded(cap);
696                     let new_r: T = Box::new(Some(new_r));
697 
698                     {
699                         let mut sel = Select::new();
700                         sel.send(&s);
701                         match sel.ready() {
702                             0 => assert!(s.try_send(new_r).is_ok()),
703                             _ => panic!(),
704                         }
705                     }
706 
707                     s = new_s;
708                 }
709             });
710 
711             scope.spawn(move |_| {
712                 let mut r = r;
713 
714                 for _ in 0..COUNT {
715                     let new = {
716                         let mut sel = Select::new();
717                         sel.recv(&r);
718                         match sel.ready() {
719                             0 => r
720                                 .try_recv()
721                                 .unwrap()
722                                 .downcast_mut::<Option<Receiver<T>>>()
723                                 .unwrap()
724                                 .take()
725                                 .unwrap(),
726                             _ => panic!(),
727                         }
728                     };
729                     r = new;
730                 }
731             });
732         })
733         .unwrap();
734     }
735 }
736 
737 #[test]
fairness1()738 fn fairness1() {
739     #[cfg(miri)]
740     const COUNT: usize = 100;
741     #[cfg(not(miri))]
742     const COUNT: usize = 10_000;
743 
744     let (s1, r1) = bounded::<()>(COUNT);
745     let (s2, r2) = unbounded::<()>();
746 
747     for _ in 0..COUNT {
748         s1.send(()).unwrap();
749         s2.send(()).unwrap();
750     }
751 
752     let hits = vec![Cell::new(0usize); 4];
753     for _ in 0..COUNT {
754         let after = after(ms(0));
755         let tick = tick(ms(0));
756 
757         let mut sel = Select::new();
758         sel.recv(&r1);
759         sel.recv(&r2);
760         sel.recv(&after);
761         sel.recv(&tick);
762         match sel.ready() {
763             0 => {
764                 r1.try_recv().unwrap();
765                 hits[0].set(hits[0].get() + 1);
766             }
767             1 => {
768                 r2.try_recv().unwrap();
769                 hits[1].set(hits[1].get() + 1);
770             }
771             2 => {
772                 after.try_recv().unwrap();
773                 hits[2].set(hits[2].get() + 1);
774             }
775             3 => {
776                 tick.try_recv().unwrap();
777                 hits[3].set(hits[3].get() + 1);
778             }
779             _ => panic!(),
780         }
781     }
782     assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2));
783 }
784 
785 #[test]
fairness2()786 fn fairness2() {
787     #[cfg(miri)]
788     const COUNT: usize = 100;
789     #[cfg(not(miri))]
790     const COUNT: usize = 100_000;
791 
792     let (s1, r1) = unbounded::<()>();
793     let (s2, r2) = bounded::<()>(1);
794     let (s3, r3) = bounded::<()>(0);
795 
796     scope(|scope| {
797         scope.spawn(|_| {
798             for _ in 0..COUNT {
799                 let mut sel = Select::new();
800                 let mut oper1 = None;
801                 let mut oper2 = None;
802                 if s1.is_empty() {
803                     oper1 = Some(sel.send(&s1));
804                 }
805                 if s2.is_empty() {
806                     oper2 = Some(sel.send(&s2));
807                 }
808                 let oper3 = sel.send(&s3);
809                 let oper = sel.select();
810                 match oper.index() {
811                     i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()),
812                     i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()),
813                     i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()),
814                     _ => unreachable!(),
815                 }
816             }
817         });
818 
819         let hits = vec![Cell::new(0usize); 3];
820         for _ in 0..COUNT {
821             let mut sel = Select::new();
822             sel.recv(&r1);
823             sel.recv(&r2);
824             sel.recv(&r3);
825             loop {
826                 match sel.ready() {
827                     0 => {
828                         if r1.try_recv().is_ok() {
829                             hits[0].set(hits[0].get() + 1);
830                             break;
831                         }
832                     }
833                     1 => {
834                         if r2.try_recv().is_ok() {
835                             hits[1].set(hits[1].get() + 1);
836                             break;
837                         }
838                     }
839                     2 => {
840                         if r3.try_recv().is_ok() {
841                             hits[2].set(hits[2].get() + 1);
842                             break;
843                         }
844                     }
845                     _ => unreachable!(),
846                 }
847             }
848         }
849         assert!(hits.iter().all(|x| x.get() > 0));
850     })
851     .unwrap();
852 }
853