1 //! Tests for channel selection using the `Select` struct.
2 
3 use std::any::Any;
4 use std::cell::Cell;
5 use std::thread;
6 use std::time::{Duration, Instant};
7 
8 use crossbeam_channel::{after, bounded, tick, unbounded, Receiver, Select, TryRecvError};
9 use crossbeam_utils::thread::scope;
10 
ms(ms: u64) -> Duration11 fn ms(ms: u64) -> Duration {
12     Duration::from_millis(ms)
13 }
14 
15 #[test]
smoke1()16 fn smoke1() {
17     let (s1, r1) = unbounded::<usize>();
18     let (s2, r2) = unbounded::<usize>();
19 
20     s1.send(1).unwrap();
21 
22     let mut sel = Select::new();
23     let oper1 = sel.recv(&r1);
24     let oper2 = sel.recv(&r2);
25     let oper = sel.select();
26     match oper.index() {
27         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
28         i if i == oper2 => panic!(),
29         _ => unreachable!(),
30     }
31 
32     s2.send(2).unwrap();
33 
34     let mut sel = Select::new();
35     let oper1 = sel.recv(&r1);
36     let oper2 = sel.recv(&r2);
37     let oper = sel.select();
38     match oper.index() {
39         i if i == oper1 => panic!(),
40         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
41         _ => unreachable!(),
42     }
43 }
44 
45 #[test]
smoke2()46 fn smoke2() {
47     let (_s1, r1) = unbounded::<i32>();
48     let (_s2, r2) = unbounded::<i32>();
49     let (_s3, r3) = unbounded::<i32>();
50     let (_s4, r4) = unbounded::<i32>();
51     let (s5, r5) = unbounded::<i32>();
52 
53     s5.send(5).unwrap();
54 
55     let mut sel = Select::new();
56     let oper1 = sel.recv(&r1);
57     let oper2 = sel.recv(&r2);
58     let oper3 = sel.recv(&r3);
59     let oper4 = sel.recv(&r4);
60     let oper5 = sel.recv(&r5);
61     let oper = sel.select();
62     match oper.index() {
63         i if i == oper1 => panic!(),
64         i if i == oper2 => panic!(),
65         i if i == oper3 => panic!(),
66         i if i == oper4 => panic!(),
67         i if i == oper5 => assert_eq!(oper.recv(&r5), Ok(5)),
68         _ => unreachable!(),
69     }
70 }
71 
72 #[test]
disconnected()73 fn disconnected() {
74     let (s1, r1) = unbounded::<i32>();
75     let (s2, r2) = unbounded::<i32>();
76 
77     scope(|scope| {
78         scope.spawn(|_| {
79             drop(s1);
80             thread::sleep(ms(500));
81             s2.send(5).unwrap();
82         });
83 
84         let mut sel = Select::new();
85         let oper1 = sel.recv(&r1);
86         let oper2 = sel.recv(&r2);
87         let oper = sel.select_timeout(ms(1000));
88         match oper {
89             Err(_) => panic!(),
90             Ok(oper) => match oper.index() {
91                 i if i == oper1 => assert!(oper.recv(&r1).is_err()),
92                 i if i == oper2 => panic!(),
93                 _ => unreachable!(),
94             },
95         }
96 
97         r2.recv().unwrap();
98     })
99     .unwrap();
100 
101     let mut sel = Select::new();
102     let oper1 = sel.recv(&r1);
103     let oper2 = sel.recv(&r2);
104     let oper = sel.select_timeout(ms(1000));
105     match oper {
106         Err(_) => panic!(),
107         Ok(oper) => match oper.index() {
108             i if i == oper1 => assert!(oper.recv(&r1).is_err()),
109             i if i == oper2 => panic!(),
110             _ => unreachable!(),
111         },
112     }
113 
114     scope(|scope| {
115         scope.spawn(|_| {
116             thread::sleep(ms(500));
117             drop(s2);
118         });
119 
120         let mut sel = Select::new();
121         let oper1 = sel.recv(&r2);
122         let oper = sel.select_timeout(ms(1000));
123         match oper {
124             Err(_) => panic!(),
125             Ok(oper) => match oper.index() {
126                 i if i == oper1 => assert!(oper.recv(&r2).is_err()),
127                 _ => unreachable!(),
128             },
129         }
130     })
131     .unwrap();
132 }
133 
134 #[test]
default()135 fn default() {
136     let (s1, r1) = unbounded::<i32>();
137     let (s2, r2) = unbounded::<i32>();
138 
139     let mut sel = Select::new();
140     let _oper1 = sel.recv(&r1);
141     let _oper2 = sel.recv(&r2);
142     let oper = sel.try_select();
143     match oper {
144         Err(_) => {}
145         Ok(_) => panic!(),
146     }
147 
148     drop(s1);
149 
150     let mut sel = Select::new();
151     let oper1 = sel.recv(&r1);
152     let oper2 = sel.recv(&r2);
153     let oper = sel.try_select();
154     match oper {
155         Err(_) => panic!(),
156         Ok(oper) => match oper.index() {
157             i if i == oper1 => assert!(oper.recv(&r1).is_err()),
158             i if i == oper2 => panic!(),
159             _ => unreachable!(),
160         },
161     }
162 
163     s2.send(2).unwrap();
164 
165     let mut sel = Select::new();
166     let oper1 = sel.recv(&r2);
167     let oper = sel.try_select();
168     match oper {
169         Err(_) => panic!(),
170         Ok(oper) => match oper.index() {
171             i if i == oper1 => assert_eq!(oper.recv(&r2), Ok(2)),
172             _ => unreachable!(),
173         },
174     }
175 
176     let mut sel = Select::new();
177     let _oper1 = sel.recv(&r2);
178     let oper = sel.try_select();
179     match oper {
180         Err(_) => {}
181         Ok(_) => panic!(),
182     }
183 
184     let mut sel = Select::new();
185     let oper = sel.try_select();
186     match oper {
187         Err(_) => {}
188         Ok(_) => panic!(),
189     }
190 }
191 
192 #[test]
timeout()193 fn timeout() {
194     let (_s1, r1) = unbounded::<i32>();
195     let (s2, r2) = unbounded::<i32>();
196 
197     scope(|scope| {
198         scope.spawn(|_| {
199             thread::sleep(ms(1500));
200             s2.send(2).unwrap();
201         });
202 
203         let mut sel = Select::new();
204         let oper1 = sel.recv(&r1);
205         let oper2 = sel.recv(&r2);
206         let oper = sel.select_timeout(ms(1000));
207         match oper {
208             Err(_) => {}
209             Ok(oper) => match oper.index() {
210                 i if i == oper1 => panic!(),
211                 i if i == oper2 => panic!(),
212                 _ => unreachable!(),
213             },
214         }
215 
216         let mut sel = Select::new();
217         let oper1 = sel.recv(&r1);
218         let oper2 = sel.recv(&r2);
219         let oper = sel.select_timeout(ms(1000));
220         match oper {
221             Err(_) => panic!(),
222             Ok(oper) => match oper.index() {
223                 i if i == oper1 => panic!(),
224                 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
225                 _ => unreachable!(),
226             },
227         }
228     })
229     .unwrap();
230 
231     scope(|scope| {
232         let (s, r) = unbounded::<i32>();
233 
234         scope.spawn(move |_| {
235             thread::sleep(ms(500));
236             drop(s);
237         });
238 
239         let mut sel = Select::new();
240         let oper = sel.select_timeout(ms(1000));
241         match oper {
242             Err(_) => {
243                 let mut sel = Select::new();
244                 let oper1 = sel.recv(&r);
245                 let oper = sel.try_select();
246                 match oper {
247                     Err(_) => panic!(),
248                     Ok(oper) => match oper.index() {
249                         i if i == oper1 => assert!(oper.recv(&r).is_err()),
250                         _ => unreachable!(),
251                     },
252                 }
253             }
254             Ok(_) => unreachable!(),
255         }
256     })
257     .unwrap();
258 }
259 
260 #[test]
default_when_disconnected()261 fn default_when_disconnected() {
262     let (_, r) = unbounded::<i32>();
263 
264     let mut sel = Select::new();
265     let oper1 = sel.recv(&r);
266     let oper = sel.try_select();
267     match oper {
268         Err(_) => panic!(),
269         Ok(oper) => match oper.index() {
270             i if i == oper1 => assert!(oper.recv(&r).is_err()),
271             _ => unreachable!(),
272         },
273     }
274 
275     let (_, r) = unbounded::<i32>();
276 
277     let mut sel = Select::new();
278     let oper1 = sel.recv(&r);
279     let oper = sel.select_timeout(ms(1000));
280     match oper {
281         Err(_) => panic!(),
282         Ok(oper) => match oper.index() {
283             i if i == oper1 => assert!(oper.recv(&r).is_err()),
284             _ => unreachable!(),
285         },
286     }
287 
288     let (s, _) = bounded::<i32>(0);
289 
290     let mut sel = Select::new();
291     let oper1 = sel.send(&s);
292     let oper = sel.try_select();
293     match oper {
294         Err(_) => panic!(),
295         Ok(oper) => match oper.index() {
296             i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
297             _ => unreachable!(),
298         },
299     }
300 
301     let (s, _) = bounded::<i32>(0);
302 
303     let mut sel = Select::new();
304     let oper1 = sel.send(&s);
305     let oper = sel.select_timeout(ms(1000));
306     match oper {
307         Err(_) => panic!(),
308         Ok(oper) => match oper.index() {
309             i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
310             _ => unreachable!(),
311         },
312     }
313 }
314 
315 #[test]
default_only()316 fn default_only() {
317     let start = Instant::now();
318 
319     let mut sel = Select::new();
320     let oper = sel.try_select();
321     assert!(oper.is_err());
322     let now = Instant::now();
323     assert!(now - start <= ms(50));
324 
325     let start = Instant::now();
326     let mut sel = Select::new();
327     let oper = sel.select_timeout(ms(500));
328     assert!(oper.is_err());
329     let now = Instant::now();
330     assert!(now - start >= ms(450));
331     assert!(now - start <= ms(550));
332 }
333 
334 #[test]
unblocks()335 fn unblocks() {
336     let (s1, r1) = bounded::<i32>(0);
337     let (s2, r2) = bounded::<i32>(0);
338 
339     scope(|scope| {
340         scope.spawn(|_| {
341             thread::sleep(ms(500));
342             s2.send(2).unwrap();
343         });
344 
345         let mut sel = Select::new();
346         let oper1 = sel.recv(&r1);
347         let oper2 = sel.recv(&r2);
348         let oper = sel.select_timeout(ms(1000));
349         match oper {
350             Err(_) => panic!(),
351             Ok(oper) => match oper.index() {
352                 i if i == oper1 => panic!(),
353                 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
354                 _ => unreachable!(),
355             },
356         }
357     })
358     .unwrap();
359 
360     scope(|scope| {
361         scope.spawn(|_| {
362             thread::sleep(ms(500));
363             assert_eq!(r1.recv().unwrap(), 1);
364         });
365 
366         let mut sel = Select::new();
367         let oper1 = sel.send(&s1);
368         let oper2 = sel.send(&s2);
369         let oper = sel.select_timeout(ms(1000));
370         match oper {
371             Err(_) => panic!(),
372             Ok(oper) => match oper.index() {
373                 i if i == oper1 => oper.send(&s1, 1).unwrap(),
374                 i if i == oper2 => panic!(),
375                 _ => unreachable!(),
376             },
377         }
378     })
379     .unwrap();
380 }
381 
382 #[test]
both_ready()383 fn both_ready() {
384     let (s1, r1) = bounded(0);
385     let (s2, r2) = bounded(0);
386 
387     scope(|scope| {
388         scope.spawn(|_| {
389             thread::sleep(ms(500));
390             s1.send(1).unwrap();
391             assert_eq!(r2.recv().unwrap(), 2);
392         });
393 
394         for _ in 0..2 {
395             let mut sel = Select::new();
396             let oper1 = sel.recv(&r1);
397             let oper2 = sel.send(&s2);
398             let oper = sel.select();
399             match oper.index() {
400                 i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
401                 i if i == oper2 => oper.send(&s2, 2).unwrap(),
402                 _ => unreachable!(),
403             }
404         }
405     })
406     .unwrap();
407 }
408 
409 #[test]
loop_try()410 fn loop_try() {
411     const RUNS: usize = 20;
412 
413     for _ in 0..RUNS {
414         let (s1, r1) = bounded::<i32>(0);
415         let (s2, r2) = bounded::<i32>(0);
416         let (s_end, r_end) = bounded::<()>(0);
417 
418         scope(|scope| {
419             scope.spawn(|_| loop {
420                 let mut done = false;
421 
422                 let mut sel = Select::new();
423                 let oper1 = sel.send(&s1);
424                 let oper = sel.try_select();
425                 match oper {
426                     Err(_) => {}
427                     Ok(oper) => match oper.index() {
428                         i if i == oper1 => {
429                             let _ = oper.send(&s1, 1);
430                             done = true;
431                         }
432                         _ => unreachable!(),
433                     },
434                 }
435                 if done {
436                     break;
437                 }
438 
439                 let mut sel = Select::new();
440                 let oper1 = sel.recv(&r_end);
441                 let oper = sel.try_select();
442                 match oper {
443                     Err(_) => {}
444                     Ok(oper) => match oper.index() {
445                         i if i == oper1 => {
446                             let _ = oper.recv(&r_end);
447                             done = true;
448                         }
449                         _ => unreachable!(),
450                     },
451                 }
452                 if done {
453                     break;
454                 }
455             });
456 
457             scope.spawn(|_| loop {
458                 if let Ok(x) = r2.try_recv() {
459                     assert_eq!(x, 2);
460                     break;
461                 }
462 
463                 let mut done = false;
464                 let mut sel = Select::new();
465                 let oper1 = sel.recv(&r_end);
466                 let oper = sel.try_select();
467                 match oper {
468                     Err(_) => {}
469                     Ok(oper) => match oper.index() {
470                         i if i == oper1 => {
471                             let _ = oper.recv(&r_end);
472                             done = true;
473                         }
474                         _ => unreachable!(),
475                     },
476                 }
477                 if done {
478                     break;
479                 }
480             });
481 
482             scope.spawn(|_| {
483                 thread::sleep(ms(500));
484 
485                 let mut sel = Select::new();
486                 let oper1 = sel.recv(&r1);
487                 let oper2 = sel.send(&s2);
488                 let oper = sel.select_timeout(ms(1000));
489                 match oper {
490                     Err(_) => {}
491                     Ok(oper) => match oper.index() {
492                         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
493                         i if i == oper2 => assert!(oper.send(&s2, 2).is_ok()),
494                         _ => unreachable!(),
495                     },
496                 }
497 
498                 drop(s_end);
499             });
500         })
501         .unwrap();
502     }
503 }
504 
505 #[test]
cloning1()506 fn cloning1() {
507     scope(|scope| {
508         let (s1, r1) = unbounded::<i32>();
509         let (_s2, r2) = unbounded::<i32>();
510         let (s3, r3) = unbounded::<()>();
511 
512         scope.spawn(move |_| {
513             r3.recv().unwrap();
514             drop(s1.clone());
515             assert!(r3.try_recv().is_err());
516             s1.send(1).unwrap();
517             r3.recv().unwrap();
518         });
519 
520         s3.send(()).unwrap();
521 
522         let mut sel = Select::new();
523         let oper1 = sel.recv(&r1);
524         let oper2 = sel.recv(&r2);
525         let oper = sel.select();
526         match oper.index() {
527             i if i == oper1 => drop(oper.recv(&r1)),
528             i if i == oper2 => drop(oper.recv(&r2)),
529             _ => unreachable!(),
530         }
531 
532         s3.send(()).unwrap();
533     })
534     .unwrap();
535 }
536 
537 #[test]
cloning2()538 fn cloning2() {
539     let (s1, r1) = unbounded::<()>();
540     let (s2, r2) = unbounded::<()>();
541     let (_s3, _r3) = unbounded::<()>();
542 
543     scope(|scope| {
544         scope.spawn(move |_| {
545             let mut sel = Select::new();
546             let oper1 = sel.recv(&r1);
547             let oper2 = sel.recv(&r2);
548             let oper = sel.select();
549             match oper.index() {
550                 i if i == oper1 => panic!(),
551                 i if i == oper2 => drop(oper.recv(&r2)),
552                 _ => unreachable!(),
553             }
554         });
555 
556         thread::sleep(ms(500));
557         drop(s1.clone());
558         s2.send(()).unwrap();
559     })
560     .unwrap();
561 }
562 
563 #[test]
preflight1()564 fn preflight1() {
565     let (s, r) = unbounded();
566     s.send(()).unwrap();
567 
568     let mut sel = Select::new();
569     let oper1 = sel.recv(&r);
570     let oper = sel.select();
571     match oper.index() {
572         i if i == oper1 => drop(oper.recv(&r)),
573         _ => unreachable!(),
574     }
575 }
576 
577 #[test]
preflight2()578 fn preflight2() {
579     let (s, r) = unbounded();
580     drop(s.clone());
581     s.send(()).unwrap();
582     drop(s);
583 
584     let mut sel = Select::new();
585     let oper1 = sel.recv(&r);
586     let oper = sel.select();
587     match oper.index() {
588         i if i == oper1 => assert_eq!(oper.recv(&r), Ok(())),
589         _ => unreachable!(),
590     }
591 
592     assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
593 }
594 
595 #[test]
preflight3()596 fn preflight3() {
597     let (s, r) = unbounded();
598     drop(s.clone());
599     s.send(()).unwrap();
600     drop(s);
601     r.recv().unwrap();
602 
603     let mut sel = Select::new();
604     let oper1 = sel.recv(&r);
605     let oper = sel.select();
606     match oper.index() {
607         i if i == oper1 => assert!(oper.recv(&r).is_err()),
608         _ => unreachable!(),
609     }
610 }
611 
612 #[test]
duplicate_operations()613 fn duplicate_operations() {
614     let (s, r) = unbounded::<i32>();
615     let hit = vec![Cell::new(false); 4];
616 
617     while hit.iter().map(|h| h.get()).any(|hit| !hit) {
618         let mut sel = Select::new();
619         let oper0 = sel.recv(&r);
620         let oper1 = sel.recv(&r);
621         let oper2 = sel.send(&s);
622         let oper3 = sel.send(&s);
623         let oper = sel.select();
624         match oper.index() {
625             i if i == oper0 => {
626                 assert!(oper.recv(&r).is_ok());
627                 hit[0].set(true);
628             }
629             i if i == oper1 => {
630                 assert!(oper.recv(&r).is_ok());
631                 hit[1].set(true);
632             }
633             i if i == oper2 => {
634                 assert!(oper.send(&s, 0).is_ok());
635                 hit[2].set(true);
636             }
637             i if i == oper3 => {
638                 assert!(oper.send(&s, 0).is_ok());
639                 hit[3].set(true);
640             }
641             _ => unreachable!(),
642         }
643     }
644 }
645 
646 #[test]
nesting()647 fn nesting() {
648     let (s, r) = unbounded::<i32>();
649 
650     let mut sel = Select::new();
651     let oper1 = sel.send(&s);
652     let oper = sel.select();
653     match oper.index() {
654         i if i == oper1 => {
655             assert!(oper.send(&s, 0).is_ok());
656 
657             let mut sel = Select::new();
658             let oper1 = sel.recv(&r);
659             let oper = sel.select();
660             match oper.index() {
661                 i if i == oper1 => {
662                     assert_eq!(oper.recv(&r), Ok(0));
663 
664                     let mut sel = Select::new();
665                     let oper1 = sel.send(&s);
666                     let oper = sel.select();
667                     match oper.index() {
668                         i if i == oper1 => {
669                             assert!(oper.send(&s, 1).is_ok());
670 
671                             let mut sel = Select::new();
672                             let oper1 = sel.recv(&r);
673                             let oper = sel.select();
674                             match oper.index() {
675                                 i if i == oper1 => {
676                                     assert_eq!(oper.recv(&r), Ok(1));
677                                 }
678                                 _ => unreachable!(),
679                             }
680                         }
681                         _ => unreachable!(),
682                     }
683                 }
684                 _ => unreachable!(),
685             }
686         }
687         _ => unreachable!(),
688     }
689 }
690 
691 #[test]
stress_recv()692 fn stress_recv() {
693     const COUNT: usize = 10_000;
694 
695     let (s1, r1) = unbounded();
696     let (s2, r2) = bounded(5);
697     let (s3, r3) = bounded(100);
698 
699     scope(|scope| {
700         scope.spawn(|_| {
701             for i in 0..COUNT {
702                 s1.send(i).unwrap();
703                 r3.recv().unwrap();
704 
705                 s2.send(i).unwrap();
706                 r3.recv().unwrap();
707             }
708         });
709 
710         for i in 0..COUNT {
711             for _ in 0..2 {
712                 let mut sel = Select::new();
713                 let oper1 = sel.recv(&r1);
714                 let oper2 = sel.recv(&r2);
715                 let oper = sel.select();
716                 match oper.index() {
717                     ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
718                     ix if ix == oper2 => assert_eq!(oper.recv(&r2), Ok(i)),
719                     _ => unreachable!(),
720                 }
721 
722                 s3.send(()).unwrap();
723             }
724         }
725     })
726     .unwrap();
727 }
728 
729 #[test]
stress_send()730 fn stress_send() {
731     const COUNT: usize = 10_000;
732 
733     let (s1, r1) = bounded(0);
734     let (s2, r2) = bounded(0);
735     let (s3, r3) = bounded(100);
736 
737     scope(|scope| {
738         scope.spawn(|_| {
739             for i in 0..COUNT {
740                 assert_eq!(r1.recv().unwrap(), i);
741                 assert_eq!(r2.recv().unwrap(), i);
742                 r3.recv().unwrap();
743             }
744         });
745 
746         for i in 0..COUNT {
747             for _ in 0..2 {
748                 let mut sel = Select::new();
749                 let oper1 = sel.send(&s1);
750                 let oper2 = sel.send(&s2);
751                 let oper = sel.select();
752                 match oper.index() {
753                     ix if ix == oper1 => assert!(oper.send(&s1, i).is_ok()),
754                     ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
755                     _ => unreachable!(),
756                 }
757             }
758             s3.send(()).unwrap();
759         }
760     })
761     .unwrap();
762 }
763 
764 #[test]
stress_mixed()765 fn stress_mixed() {
766     const COUNT: usize = 10_000;
767 
768     let (s1, r1) = bounded(0);
769     let (s2, r2) = bounded(0);
770     let (s3, r3) = bounded(100);
771 
772     scope(|scope| {
773         scope.spawn(|_| {
774             for i in 0..COUNT {
775                 s1.send(i).unwrap();
776                 assert_eq!(r2.recv().unwrap(), i);
777                 r3.recv().unwrap();
778             }
779         });
780 
781         for i in 0..COUNT {
782             for _ in 0..2 {
783                 let mut sel = Select::new();
784                 let oper1 = sel.recv(&r1);
785                 let oper2 = sel.send(&s2);
786                 let oper = sel.select();
787                 match oper.index() {
788                     ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
789                     ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
790                     _ => unreachable!(),
791                 }
792             }
793             s3.send(()).unwrap();
794         }
795     })
796     .unwrap();
797 }
798 
799 #[test]
stress_timeout_two_threads()800 fn stress_timeout_two_threads() {
801     const COUNT: usize = 20;
802 
803     let (s, r) = bounded(2);
804 
805     scope(|scope| {
806         scope.spawn(|_| {
807             for i in 0..COUNT {
808                 if i % 2 == 0 {
809                     thread::sleep(ms(500));
810                 }
811 
812                 loop {
813                     let mut sel = Select::new();
814                     let oper1 = sel.send(&s);
815                     let oper = sel.select_timeout(ms(100));
816                     match oper {
817                         Err(_) => {}
818                         Ok(oper) => match oper.index() {
819                             ix if ix == oper1 => {
820                                 assert!(oper.send(&s, i).is_ok());
821                                 break;
822                             }
823                             _ => unreachable!(),
824                         },
825                     }
826                 }
827             }
828         });
829 
830         scope.spawn(|_| {
831             for i in 0..COUNT {
832                 if i % 2 == 0 {
833                     thread::sleep(ms(500));
834                 }
835 
836                 loop {
837                     let mut sel = Select::new();
838                     let oper1 = sel.recv(&r);
839                     let oper = sel.select_timeout(ms(100));
840                     match oper {
841                         Err(_) => {}
842                         Ok(oper) => match oper.index() {
843                             ix if ix == oper1 => {
844                                 assert_eq!(oper.recv(&r), Ok(i));
845                                 break;
846                             }
847                             _ => unreachable!(),
848                         },
849                     }
850                 }
851             }
852         });
853     })
854     .unwrap();
855 }
856 
857 #[test]
send_recv_same_channel()858 fn send_recv_same_channel() {
859     let (s, r) = bounded::<i32>(0);
860     let mut sel = Select::new();
861     let oper1 = sel.send(&s);
862     let oper2 = sel.recv(&r);
863     let oper = sel.select_timeout(ms(100));
864     match oper {
865         Err(_) => {}
866         Ok(oper) => match oper.index() {
867             ix if ix == oper1 => panic!(),
868             ix if ix == oper2 => panic!(),
869             _ => unreachable!(),
870         },
871     }
872 
873     let (s, r) = unbounded::<i32>();
874     let mut sel = Select::new();
875     let oper1 = sel.send(&s);
876     let oper2 = sel.recv(&r);
877     let oper = sel.select_timeout(ms(100));
878     match oper {
879         Err(_) => panic!(),
880         Ok(oper) => match oper.index() {
881             ix if ix == oper1 => assert!(oper.send(&s, 0).is_ok()),
882             ix if ix == oper2 => panic!(),
883             _ => unreachable!(),
884         },
885     }
886 }
887 
888 #[test]
matching()889 fn matching() {
890     const THREADS: usize = 44;
891 
892     let (s, r) = &bounded::<usize>(0);
893 
894     scope(|scope| {
895         for i in 0..THREADS {
896             scope.spawn(move |_| {
897                 let mut sel = Select::new();
898                 let oper1 = sel.recv(&r);
899                 let oper2 = sel.send(&s);
900                 let oper = sel.select();
901                 match oper.index() {
902                     ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
903                     ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
904                     _ => unreachable!(),
905                 }
906             });
907         }
908     })
909     .unwrap();
910 
911     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
912 }
913 
914 #[test]
matching_with_leftover()915 fn matching_with_leftover() {
916     const THREADS: usize = 55;
917 
918     let (s, r) = &bounded::<usize>(0);
919 
920     scope(|scope| {
921         for i in 0..THREADS {
922             scope.spawn(move |_| {
923                 let mut sel = Select::new();
924                 let oper1 = sel.recv(&r);
925                 let oper2 = sel.send(&s);
926                 let oper = sel.select();
927                 match oper.index() {
928                     ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
929                     ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
930                     _ => unreachable!(),
931                 }
932             });
933         }
934         s.send(!0).unwrap();
935     })
936     .unwrap();
937 
938     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
939 }
940 
941 #[test]
channel_through_channel()942 fn channel_through_channel() {
943     const COUNT: usize = 1000;
944 
945     type T = Box<dyn Any + Send>;
946 
947     for cap in 0..3 {
948         let (s, r) = bounded::<T>(cap);
949 
950         scope(|scope| {
951             scope.spawn(move |_| {
952                 let mut s = s;
953 
954                 for _ in 0..COUNT {
955                     let (new_s, new_r) = bounded(cap);
956                     let new_r: T = Box::new(Some(new_r));
957 
958                     {
959                         let mut sel = Select::new();
960                         let oper1 = sel.send(&s);
961                         let oper = sel.select();
962                         match oper.index() {
963                             ix if ix == oper1 => assert!(oper.send(&s, new_r).is_ok()),
964                             _ => unreachable!(),
965                         }
966                     }
967 
968                     s = new_s;
969                 }
970             });
971 
972             scope.spawn(move |_| {
973                 let mut r = r;
974 
975                 for _ in 0..COUNT {
976                     let new = {
977                         let mut sel = Select::new();
978                         let oper1 = sel.recv(&r);
979                         let oper = sel.select();
980                         match oper.index() {
981                             ix if ix == oper1 => oper
982                                 .recv(&r)
983                                 .unwrap()
984                                 .downcast_mut::<Option<Receiver<T>>>()
985                                 .unwrap()
986                                 .take()
987                                 .unwrap(),
988                             _ => unreachable!(),
989                         }
990                     };
991                     r = new;
992                 }
993             });
994         })
995         .unwrap();
996     }
997 }
998 
999 #[test]
linearizable_try()1000 fn linearizable_try() {
1001     const COUNT: usize = 100_000;
1002 
1003     for step in 0..2 {
1004         let (start_s, start_r) = bounded::<()>(0);
1005         let (end_s, end_r) = bounded::<()>(0);
1006 
1007         let ((s1, r1), (s2, r2)) = if step == 0 {
1008             (bounded::<i32>(1), bounded::<i32>(1))
1009         } else {
1010             (unbounded::<i32>(), unbounded::<i32>())
1011         };
1012 
1013         scope(|scope| {
1014             scope.spawn(|_| {
1015                 for _ in 0..COUNT {
1016                     start_s.send(()).unwrap();
1017 
1018                     s1.send(1).unwrap();
1019 
1020                     let mut sel = Select::new();
1021                     let oper1 = sel.recv(&r1);
1022                     let oper2 = sel.recv(&r2);
1023                     let oper = sel.try_select();
1024                     match oper {
1025                         Err(_) => unreachable!(),
1026                         Ok(oper) => match oper.index() {
1027                             ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
1028                             ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
1029                             _ => unreachable!(),
1030                         },
1031                     }
1032 
1033                     end_s.send(()).unwrap();
1034                     let _ = r2.try_recv();
1035                 }
1036             });
1037 
1038             for _ in 0..COUNT {
1039                 start_r.recv().unwrap();
1040 
1041                 s2.send(1).unwrap();
1042                 let _ = r1.try_recv();
1043 
1044                 end_r.recv().unwrap();
1045             }
1046         })
1047         .unwrap();
1048     }
1049 }
1050 
1051 #[test]
linearizable_timeout()1052 fn linearizable_timeout() {
1053     const COUNT: usize = 100_000;
1054 
1055     for step in 0..2 {
1056         let (start_s, start_r) = bounded::<()>(0);
1057         let (end_s, end_r) = bounded::<()>(0);
1058 
1059         let ((s1, r1), (s2, r2)) = if step == 0 {
1060             (bounded::<i32>(1), bounded::<i32>(1))
1061         } else {
1062             (unbounded::<i32>(), unbounded::<i32>())
1063         };
1064 
1065         scope(|scope| {
1066             scope.spawn(|_| {
1067                 for _ in 0..COUNT {
1068                     start_s.send(()).unwrap();
1069 
1070                     s1.send(1).unwrap();
1071 
1072                     let mut sel = Select::new();
1073                     let oper1 = sel.recv(&r1);
1074                     let oper2 = sel.recv(&r2);
1075                     let oper = sel.select_timeout(ms(0));
1076                     match oper {
1077                         Err(_) => unreachable!(),
1078                         Ok(oper) => match oper.index() {
1079                             ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
1080                             ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
1081                             _ => unreachable!(),
1082                         },
1083                     }
1084 
1085                     end_s.send(()).unwrap();
1086                     let _ = r2.try_recv();
1087                 }
1088             });
1089 
1090             for _ in 0..COUNT {
1091                 start_r.recv().unwrap();
1092 
1093                 s2.send(1).unwrap();
1094                 let _ = r1.try_recv();
1095 
1096                 end_r.recv().unwrap();
1097             }
1098         })
1099         .unwrap();
1100     }
1101 }
1102 
1103 #[test]
fairness1()1104 fn fairness1() {
1105     const COUNT: usize = 10_000;
1106 
1107     let (s1, r1) = bounded::<()>(COUNT);
1108     let (s2, r2) = unbounded::<()>();
1109 
1110     for _ in 0..COUNT {
1111         s1.send(()).unwrap();
1112         s2.send(()).unwrap();
1113     }
1114 
1115     let hits = vec![Cell::new(0usize); 4];
1116     for _ in 0..COUNT {
1117         let after = after(ms(0));
1118         let tick = tick(ms(0));
1119 
1120         let mut sel = Select::new();
1121         let oper1 = sel.recv(&r1);
1122         let oper2 = sel.recv(&r2);
1123         let oper3 = sel.recv(&after);
1124         let oper4 = sel.recv(&tick);
1125         let oper = sel.select();
1126         match oper.index() {
1127             i if i == oper1 => {
1128                 oper.recv(&r1).unwrap();
1129                 hits[0].set(hits[0].get() + 1);
1130             }
1131             i if i == oper2 => {
1132                 oper.recv(&r2).unwrap();
1133                 hits[1].set(hits[1].get() + 1);
1134             }
1135             i if i == oper3 => {
1136                 oper.recv(&after).unwrap();
1137                 hits[2].set(hits[2].get() + 1);
1138             }
1139             i if i == oper4 => {
1140                 oper.recv(&tick).unwrap();
1141                 hits[3].set(hits[3].get() + 1);
1142             }
1143             _ => unreachable!(),
1144         }
1145     }
1146     assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2));
1147 }
1148 
1149 #[test]
fairness2()1150 fn fairness2() {
1151     const COUNT: usize = 10_000;
1152 
1153     let (s1, r1) = unbounded::<()>();
1154     let (s2, r2) = bounded::<()>(1);
1155     let (s3, r3) = bounded::<()>(0);
1156 
1157     scope(|scope| {
1158         scope.spawn(|_| {
1159             for _ in 0..COUNT {
1160                 let mut sel = Select::new();
1161                 let mut oper1 = None;
1162                 let mut oper2 = None;
1163                 if s1.is_empty() {
1164                     oper1 = Some(sel.send(&s1));
1165                 }
1166                 if s2.is_empty() {
1167                     oper2 = Some(sel.send(&s2));
1168                 }
1169                 let oper3 = sel.send(&s3);
1170                 let oper = sel.select();
1171                 match oper.index() {
1172                     i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()),
1173                     i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()),
1174                     i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()),
1175                     _ => unreachable!(),
1176                 }
1177             }
1178         });
1179 
1180         let hits = vec![Cell::new(0usize); 3];
1181         for _ in 0..COUNT {
1182             let mut sel = Select::new();
1183             let oper1 = sel.recv(&r1);
1184             let oper2 = sel.recv(&r2);
1185             let oper3 = sel.recv(&r3);
1186             let oper = sel.select();
1187             match oper.index() {
1188                 i if i == oper1 => {
1189                     oper.recv(&r1).unwrap();
1190                     hits[0].set(hits[0].get() + 1);
1191                 }
1192                 i if i == oper2 => {
1193                     oper.recv(&r2).unwrap();
1194                     hits[1].set(hits[1].get() + 1);
1195                 }
1196                 i if i == oper3 => {
1197                     oper.recv(&r3).unwrap();
1198                     hits[2].set(hits[2].get() + 1);
1199                 }
1200                 _ => unreachable!(),
1201             }
1202         }
1203         assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50));
1204     })
1205     .unwrap();
1206 }
1207 
1208 #[test]
sync_and_clone()1209 fn sync_and_clone() {
1210     const THREADS: usize = 20;
1211 
1212     let (s, r) = &bounded::<usize>(0);
1213 
1214     let mut sel = Select::new();
1215     let oper1 = sel.recv(&r);
1216     let oper2 = sel.send(&s);
1217     let sel = &sel;
1218 
1219     scope(|scope| {
1220         for i in 0..THREADS {
1221             scope.spawn(move |_| {
1222                 let mut sel = sel.clone();
1223                 let oper = sel.select();
1224                 match oper.index() {
1225                     ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
1226                     ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
1227                     _ => unreachable!(),
1228                 }
1229             });
1230         }
1231     })
1232     .unwrap();
1233 
1234     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
1235 }
1236 
1237 #[test]
send_and_clone()1238 fn send_and_clone() {
1239     const THREADS: usize = 20;
1240 
1241     let (s, r) = &bounded::<usize>(0);
1242 
1243     let mut sel = Select::new();
1244     let oper1 = sel.recv(&r);
1245     let oper2 = sel.send(&s);
1246 
1247     scope(|scope| {
1248         for i in 0..THREADS {
1249             let mut sel = sel.clone();
1250             scope.spawn(move |_| {
1251                 let oper = sel.select();
1252                 match oper.index() {
1253                     ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
1254                     ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
1255                     _ => unreachable!(),
1256                 }
1257             });
1258         }
1259     })
1260     .unwrap();
1261 
1262     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
1263 }
1264 
1265 #[test]
reuse()1266 fn reuse() {
1267     const COUNT: usize = 10_000;
1268 
1269     let (s1, r1) = bounded(0);
1270     let (s2, r2) = bounded(0);
1271     let (s3, r3) = bounded(100);
1272 
1273     scope(|scope| {
1274         scope.spawn(|_| {
1275             for i in 0..COUNT {
1276                 s1.send(i).unwrap();
1277                 assert_eq!(r2.recv().unwrap(), i);
1278                 r3.recv().unwrap();
1279             }
1280         });
1281 
1282         let mut sel = Select::new();
1283         let oper1 = sel.recv(&r1);
1284         let oper2 = sel.send(&s2);
1285 
1286         for i in 0..COUNT {
1287             for _ in 0..2 {
1288                 let oper = sel.select();
1289                 match oper.index() {
1290                     ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
1291                     ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
1292                     _ => unreachable!(),
1293                 }
1294             }
1295             s3.send(()).unwrap();
1296         }
1297     })
1298     .unwrap();
1299 }
1300