1 //! Tests for channel selection using the `Select` struct.
2 
3 extern crate crossbeam_channel;
4 extern crate crossbeam_utils;
5 
6 use std::any::Any;
7 use std::cell::Cell;
8 use std::thread;
9 use std::time::{Duration, Instant};
10 
11 use crossbeam_channel::{after, bounded, tick, unbounded, Receiver, Select, TryRecvError};
12 use crossbeam_utils::thread::scope;
13 
ms(ms: u64) -> Duration14 fn ms(ms: u64) -> Duration {
15     Duration::from_millis(ms)
16 }
17 
18 #[test]
smoke1()19 fn smoke1() {
20     let (s1, r1) = unbounded::<usize>();
21     let (s2, r2) = unbounded::<usize>();
22 
23     s1.send(1).unwrap();
24 
25     let mut sel = Select::new();
26     let oper1 = sel.recv(&r1);
27     let oper2 = sel.recv(&r2);
28     let oper = sel.select();
29     match oper.index() {
30         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
31         i if i == oper2 => panic!(),
32         _ => unreachable!(),
33     }
34 
35     s2.send(2).unwrap();
36 
37     let mut sel = Select::new();
38     let oper1 = sel.recv(&r1);
39     let oper2 = sel.recv(&r2);
40     let oper = sel.select();
41     match oper.index() {
42         i if i == oper1 => panic!(),
43         i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
44         _ => unreachable!(),
45     }
46 }
47 
48 #[test]
smoke2()49 fn smoke2() {
50     let (_s1, r1) = unbounded::<i32>();
51     let (_s2, r2) = unbounded::<i32>();
52     let (_s3, r3) = unbounded::<i32>();
53     let (_s4, r4) = unbounded::<i32>();
54     let (s5, r5) = unbounded::<i32>();
55 
56     s5.send(5).unwrap();
57 
58     let mut sel = Select::new();
59     let oper1 = sel.recv(&r1);
60     let oper2 = sel.recv(&r2);
61     let oper3 = sel.recv(&r3);
62     let oper4 = sel.recv(&r4);
63     let oper5 = sel.recv(&r5);
64     let oper = sel.select();
65     match oper.index() {
66         i if i == oper1 => panic!(),
67         i if i == oper2 => panic!(),
68         i if i == oper3 => panic!(),
69         i if i == oper4 => panic!(),
70         i if i == oper5 => assert_eq!(oper.recv(&r5), Ok(5)),
71         _ => unreachable!(),
72     }
73 }
74 
75 #[test]
disconnected()76 fn disconnected() {
77     let (s1, r1) = unbounded::<i32>();
78     let (s2, r2) = unbounded::<i32>();
79 
80     scope(|scope| {
81         scope.spawn(|_| {
82             drop(s1);
83             thread::sleep(ms(500));
84             s2.send(5).unwrap();
85         });
86 
87         let mut sel = Select::new();
88         let oper1 = sel.recv(&r1);
89         let oper2 = sel.recv(&r2);
90         let oper = sel.select_timeout(ms(1000));
91         match oper {
92             Err(_) => panic!(),
93             Ok(oper) => match oper.index() {
94                 i if i == oper1 => assert!(oper.recv(&r1).is_err()),
95                 i if i == oper2 => panic!(),
96                 _ => unreachable!(),
97             },
98         }
99 
100         r2.recv().unwrap();
101     })
102     .unwrap();
103 
104     let mut sel = Select::new();
105     let oper1 = sel.recv(&r1);
106     let oper2 = sel.recv(&r2);
107     let oper = sel.select_timeout(ms(1000));
108     match oper {
109         Err(_) => panic!(),
110         Ok(oper) => match oper.index() {
111             i if i == oper1 => assert!(oper.recv(&r1).is_err()),
112             i if i == oper2 => panic!(),
113             _ => unreachable!(),
114         },
115     }
116 
117     scope(|scope| {
118         scope.spawn(|_| {
119             thread::sleep(ms(500));
120             drop(s2);
121         });
122 
123         let mut sel = Select::new();
124         let oper1 = sel.recv(&r2);
125         let oper = sel.select_timeout(ms(1000));
126         match oper {
127             Err(_) => panic!(),
128             Ok(oper) => match oper.index() {
129                 i if i == oper1 => assert!(oper.recv(&r2).is_err()),
130                 _ => unreachable!(),
131             },
132         }
133     })
134     .unwrap();
135 }
136 
137 #[test]
default()138 fn default() {
139     let (s1, r1) = unbounded::<i32>();
140     let (s2, r2) = unbounded::<i32>();
141 
142     let mut sel = Select::new();
143     let _oper1 = sel.recv(&r1);
144     let _oper2 = sel.recv(&r2);
145     let oper = sel.try_select();
146     match oper {
147         Err(_) => {}
148         Ok(_) => panic!(),
149     }
150 
151     drop(s1);
152 
153     let mut sel = Select::new();
154     let oper1 = sel.recv(&r1);
155     let oper2 = sel.recv(&r2);
156     let oper = sel.try_select();
157     match oper {
158         Err(_) => panic!(),
159         Ok(oper) => match oper.index() {
160             i if i == oper1 => assert!(oper.recv(&r1).is_err()),
161             i if i == oper2 => panic!(),
162             _ => unreachable!(),
163         },
164     }
165 
166     s2.send(2).unwrap();
167 
168     let mut sel = Select::new();
169     let oper1 = sel.recv(&r2);
170     let oper = sel.try_select();
171     match oper {
172         Err(_) => panic!(),
173         Ok(oper) => match oper.index() {
174             i if i == oper1 => assert_eq!(oper.recv(&r2), Ok(2)),
175             _ => unreachable!(),
176         },
177     }
178 
179     let mut sel = Select::new();
180     let _oper1 = sel.recv(&r2);
181     let oper = sel.try_select();
182     match oper {
183         Err(_) => {}
184         Ok(_) => panic!(),
185     }
186 
187     let mut sel = Select::new();
188     let oper = sel.try_select();
189     match oper {
190         Err(_) => {}
191         Ok(_) => panic!(),
192     }
193 }
194 
195 #[test]
timeout()196 fn timeout() {
197     let (_s1, r1) = unbounded::<i32>();
198     let (s2, r2) = unbounded::<i32>();
199 
200     scope(|scope| {
201         scope.spawn(|_| {
202             thread::sleep(ms(1500));
203             s2.send(2).unwrap();
204         });
205 
206         let mut sel = Select::new();
207         let oper1 = sel.recv(&r1);
208         let oper2 = sel.recv(&r2);
209         let oper = sel.select_timeout(ms(1000));
210         match oper {
211             Err(_) => {}
212             Ok(oper) => match oper.index() {
213                 i if i == oper1 => panic!(),
214                 i if i == oper2 => panic!(),
215                 _ => unreachable!(),
216             },
217         }
218 
219         let mut sel = Select::new();
220         let oper1 = sel.recv(&r1);
221         let oper2 = sel.recv(&r2);
222         let oper = sel.select_timeout(ms(1000));
223         match oper {
224             Err(_) => panic!(),
225             Ok(oper) => match oper.index() {
226                 i if i == oper1 => panic!(),
227                 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
228                 _ => unreachable!(),
229             },
230         }
231     })
232     .unwrap();
233 
234     scope(|scope| {
235         let (s, r) = unbounded::<i32>();
236 
237         scope.spawn(move |_| {
238             thread::sleep(ms(500));
239             drop(s);
240         });
241 
242         let mut sel = Select::new();
243         let oper = sel.select_timeout(ms(1000));
244         match oper {
245             Err(_) => {
246                 let mut sel = Select::new();
247                 let oper1 = sel.recv(&r);
248                 let oper = sel.try_select();
249                 match oper {
250                     Err(_) => panic!(),
251                     Ok(oper) => match oper.index() {
252                         i if i == oper1 => assert!(oper.recv(&r).is_err()),
253                         _ => unreachable!(),
254                     },
255                 }
256             }
257             Ok(_) => unreachable!(),
258         }
259     })
260     .unwrap();
261 }
262 
263 #[test]
default_when_disconnected()264 fn default_when_disconnected() {
265     let (_, r) = unbounded::<i32>();
266 
267     let mut sel = Select::new();
268     let oper1 = sel.recv(&r);
269     let oper = sel.try_select();
270     match oper {
271         Err(_) => panic!(),
272         Ok(oper) => match oper.index() {
273             i if i == oper1 => assert!(oper.recv(&r).is_err()),
274             _ => unreachable!(),
275         },
276     }
277 
278     let (_, r) = unbounded::<i32>();
279 
280     let mut sel = Select::new();
281     let oper1 = sel.recv(&r);
282     let oper = sel.select_timeout(ms(1000));
283     match oper {
284         Err(_) => panic!(),
285         Ok(oper) => match oper.index() {
286             i if i == oper1 => assert!(oper.recv(&r).is_err()),
287             _ => unreachable!(),
288         },
289     }
290 
291     let (s, _) = bounded::<i32>(0);
292 
293     let mut sel = Select::new();
294     let oper1 = sel.send(&s);
295     let oper = sel.try_select();
296     match oper {
297         Err(_) => panic!(),
298         Ok(oper) => match oper.index() {
299             i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
300             _ => unreachable!(),
301         },
302     }
303 
304     let (s, _) = bounded::<i32>(0);
305 
306     let mut sel = Select::new();
307     let oper1 = sel.send(&s);
308     let oper = sel.select_timeout(ms(1000));
309     match oper {
310         Err(_) => panic!(),
311         Ok(oper) => match oper.index() {
312             i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
313             _ => unreachable!(),
314         },
315     }
316 }
317 
318 #[test]
default_only()319 fn default_only() {
320     let start = Instant::now();
321 
322     let mut sel = Select::new();
323     let oper = sel.try_select();
324     assert!(oper.is_err());
325     let now = Instant::now();
326     assert!(now - start <= ms(50));
327 
328     let start = Instant::now();
329     let mut sel = Select::new();
330     let oper = sel.select_timeout(ms(500));
331     assert!(oper.is_err());
332     let now = Instant::now();
333     assert!(now - start >= ms(450));
334     assert!(now - start <= ms(550));
335 }
336 
337 #[test]
unblocks()338 fn unblocks() {
339     let (s1, r1) = bounded::<i32>(0);
340     let (s2, r2) = bounded::<i32>(0);
341 
342     scope(|scope| {
343         scope.spawn(|_| {
344             thread::sleep(ms(500));
345             s2.send(2).unwrap();
346         });
347 
348         let mut sel = Select::new();
349         let oper1 = sel.recv(&r1);
350         let oper2 = sel.recv(&r2);
351         let oper = sel.select_timeout(ms(1000));
352         match oper {
353             Err(_) => panic!(),
354             Ok(oper) => match oper.index() {
355                 i if i == oper1 => panic!(),
356                 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
357                 _ => unreachable!(),
358             },
359         }
360     })
361     .unwrap();
362 
363     scope(|scope| {
364         scope.spawn(|_| {
365             thread::sleep(ms(500));
366             assert_eq!(r1.recv().unwrap(), 1);
367         });
368 
369         let mut sel = Select::new();
370         let oper1 = sel.send(&s1);
371         let oper2 = sel.send(&s2);
372         let oper = sel.select_timeout(ms(1000));
373         match oper {
374             Err(_) => panic!(),
375             Ok(oper) => match oper.index() {
376                 i if i == oper1 => oper.send(&s1, 1).unwrap(),
377                 i if i == oper2 => panic!(),
378                 _ => unreachable!(),
379             },
380         }
381     })
382     .unwrap();
383 }
384 
385 #[test]
both_ready()386 fn both_ready() {
387     let (s1, r1) = bounded(0);
388     let (s2, r2) = bounded(0);
389 
390     scope(|scope| {
391         scope.spawn(|_| {
392             thread::sleep(ms(500));
393             s1.send(1).unwrap();
394             assert_eq!(r2.recv().unwrap(), 2);
395         });
396 
397         for _ in 0..2 {
398             let mut sel = Select::new();
399             let oper1 = sel.recv(&r1);
400             let oper2 = sel.send(&s2);
401             let oper = sel.select();
402             match oper.index() {
403                 i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
404                 i if i == oper2 => oper.send(&s2, 2).unwrap(),
405                 _ => unreachable!(),
406             }
407         }
408     })
409     .unwrap();
410 }
411 
412 #[test]
loop_try()413 fn loop_try() {
414     const RUNS: usize = 20;
415 
416     for _ in 0..RUNS {
417         let (s1, r1) = bounded::<i32>(0);
418         let (s2, r2) = bounded::<i32>(0);
419         let (s_end, r_end) = bounded::<()>(0);
420 
421         scope(|scope| {
422             scope.spawn(|_| loop {
423                 let mut done = false;
424 
425                 let mut sel = Select::new();
426                 let oper1 = sel.send(&s1);
427                 let oper = sel.try_select();
428                 match oper {
429                     Err(_) => {}
430                     Ok(oper) => match oper.index() {
431                         i if i == oper1 => {
432                             let _ = oper.send(&s1, 1);
433                             done = true;
434                         }
435                         _ => unreachable!(),
436                     },
437                 }
438                 if done {
439                     break;
440                 }
441 
442                 let mut sel = Select::new();
443                 let oper1 = sel.recv(&r_end);
444                 let oper = sel.try_select();
445                 match oper {
446                     Err(_) => {}
447                     Ok(oper) => match oper.index() {
448                         i if i == oper1 => {
449                             let _ = oper.recv(&r_end);
450                             done = true;
451                         }
452                         _ => unreachable!(),
453                     },
454                 }
455                 if done {
456                     break;
457                 }
458             });
459 
460             scope.spawn(|_| loop {
461                 if let Ok(x) = r2.try_recv() {
462                     assert_eq!(x, 2);
463                     break;
464                 }
465 
466                 let mut done = false;
467                 let mut sel = Select::new();
468                 let oper1 = sel.recv(&r_end);
469                 let oper = sel.try_select();
470                 match oper {
471                     Err(_) => {}
472                     Ok(oper) => match oper.index() {
473                         i if i == oper1 => {
474                             let _ = oper.recv(&r_end);
475                             done = true;
476                         }
477                         _ => unreachable!(),
478                     },
479                 }
480                 if done {
481                     break;
482                 }
483             });
484 
485             scope.spawn(|_| {
486                 thread::sleep(ms(500));
487 
488                 let mut sel = Select::new();
489                 let oper1 = sel.recv(&r1);
490                 let oper2 = sel.send(&s2);
491                 let oper = sel.select_timeout(ms(1000));
492                 match oper {
493                     Err(_) => {}
494                     Ok(oper) => match oper.index() {
495                         i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
496                         i if i == oper2 => assert!(oper.send(&s2, 2).is_ok()),
497                         _ => unreachable!(),
498                     },
499                 }
500 
501                 drop(s_end);
502             });
503         })
504         .unwrap();
505     }
506 }
507 
508 #[test]
cloning1()509 fn cloning1() {
510     scope(|scope| {
511         let (s1, r1) = unbounded::<i32>();
512         let (_s2, r2) = unbounded::<i32>();
513         let (s3, r3) = unbounded::<()>();
514 
515         scope.spawn(move |_| {
516             r3.recv().unwrap();
517             drop(s1.clone());
518             assert!(r3.try_recv().is_err());
519             s1.send(1).unwrap();
520             r3.recv().unwrap();
521         });
522 
523         s3.send(()).unwrap();
524 
525         let mut sel = Select::new();
526         let oper1 = sel.recv(&r1);
527         let oper2 = sel.recv(&r2);
528         let oper = sel.select();
529         match oper.index() {
530             i if i == oper1 => drop(oper.recv(&r1)),
531             i if i == oper2 => drop(oper.recv(&r2)),
532             _ => unreachable!(),
533         }
534 
535         s3.send(()).unwrap();
536     })
537     .unwrap();
538 }
539 
540 #[test]
cloning2()541 fn cloning2() {
542     let (s1, r1) = unbounded::<()>();
543     let (s2, r2) = unbounded::<()>();
544     let (_s3, _r3) = unbounded::<()>();
545 
546     scope(|scope| {
547         scope.spawn(move |_| {
548             let mut sel = Select::new();
549             let oper1 = sel.recv(&r1);
550             let oper2 = sel.recv(&r2);
551             let oper = sel.select();
552             match oper.index() {
553                 i if i == oper1 => panic!(),
554                 i if i == oper2 => drop(oper.recv(&r2)),
555                 _ => unreachable!(),
556             }
557         });
558 
559         thread::sleep(ms(500));
560         drop(s1.clone());
561         s2.send(()).unwrap();
562     })
563     .unwrap();
564 }
565 
566 #[test]
preflight1()567 fn preflight1() {
568     let (s, r) = unbounded();
569     s.send(()).unwrap();
570 
571     let mut sel = Select::new();
572     let oper1 = sel.recv(&r);
573     let oper = sel.select();
574     match oper.index() {
575         i if i == oper1 => drop(oper.recv(&r)),
576         _ => unreachable!(),
577     }
578 }
579 
580 #[test]
preflight2()581 fn preflight2() {
582     let (s, r) = unbounded();
583     drop(s.clone());
584     s.send(()).unwrap();
585     drop(s);
586 
587     let mut sel = Select::new();
588     let oper1 = sel.recv(&r);
589     let oper = sel.select();
590     match oper.index() {
591         i if i == oper1 => assert_eq!(oper.recv(&r), Ok(())),
592         _ => unreachable!(),
593     }
594 
595     assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
596 }
597 
598 #[test]
preflight3()599 fn preflight3() {
600     let (s, r) = unbounded();
601     drop(s.clone());
602     s.send(()).unwrap();
603     drop(s);
604     r.recv().unwrap();
605 
606     let mut sel = Select::new();
607     let oper1 = sel.recv(&r);
608     let oper = sel.select();
609     match oper.index() {
610         i if i == oper1 => assert!(oper.recv(&r).is_err()),
611         _ => unreachable!(),
612     }
613 }
614 
615 #[test]
duplicate_operations()616 fn duplicate_operations() {
617     let (s, r) = unbounded::<i32>();
618     let hit = vec![Cell::new(false); 4];
619 
620     while hit.iter().map(|h| h.get()).any(|hit| !hit) {
621         let mut sel = Select::new();
622         let oper0 = sel.recv(&r);
623         let oper1 = sel.recv(&r);
624         let oper2 = sel.send(&s);
625         let oper3 = sel.send(&s);
626         let oper = sel.select();
627         match oper.index() {
628             i if i == oper0 => {
629                 assert!(oper.recv(&r).is_ok());
630                 hit[0].set(true);
631             }
632             i if i == oper1 => {
633                 assert!(oper.recv(&r).is_ok());
634                 hit[1].set(true);
635             }
636             i if i == oper2 => {
637                 assert!(oper.send(&s, 0).is_ok());
638                 hit[2].set(true);
639             }
640             i if i == oper3 => {
641                 assert!(oper.send(&s, 0).is_ok());
642                 hit[3].set(true);
643             }
644             _ => unreachable!(),
645         }
646     }
647 }
648 
649 #[test]
nesting()650 fn nesting() {
651     let (s, r) = unbounded::<i32>();
652 
653     let mut sel = Select::new();
654     let oper1 = sel.send(&s);
655     let oper = sel.select();
656     match oper.index() {
657         i if i == oper1 => {
658             assert!(oper.send(&s, 0).is_ok());
659 
660             let mut sel = Select::new();
661             let oper1 = sel.recv(&r);
662             let oper = sel.select();
663             match oper.index() {
664                 i if i == oper1 => {
665                     assert_eq!(oper.recv(&r), Ok(0));
666 
667                     let mut sel = Select::new();
668                     let oper1 = sel.send(&s);
669                     let oper = sel.select();
670                     match oper.index() {
671                         i if i == oper1 => {
672                             assert!(oper.send(&s, 1).is_ok());
673 
674                             let mut sel = Select::new();
675                             let oper1 = sel.recv(&r);
676                             let oper = sel.select();
677                             match oper.index() {
678                                 i if i == oper1 => {
679                                     assert_eq!(oper.recv(&r), Ok(1));
680                                 }
681                                 _ => unreachable!(),
682                             }
683                         }
684                         _ => unreachable!(),
685                     }
686                 }
687                 _ => unreachable!(),
688             }
689         }
690         _ => unreachable!(),
691     }
692 }
693 
694 #[test]
stress_recv()695 fn stress_recv() {
696     const COUNT: usize = 10_000;
697 
698     let (s1, r1) = unbounded();
699     let (s2, r2) = bounded(5);
700     let (s3, r3) = bounded(100);
701 
702     scope(|scope| {
703         scope.spawn(|_| {
704             for i in 0..COUNT {
705                 s1.send(i).unwrap();
706                 r3.recv().unwrap();
707 
708                 s2.send(i).unwrap();
709                 r3.recv().unwrap();
710             }
711         });
712 
713         for i in 0..COUNT {
714             for _ in 0..2 {
715                 let mut sel = Select::new();
716                 let oper1 = sel.recv(&r1);
717                 let oper2 = sel.recv(&r2);
718                 let oper = sel.select();
719                 match oper.index() {
720                     ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
721                     ix if ix == oper2 => assert_eq!(oper.recv(&r2), Ok(i)),
722                     _ => unreachable!(),
723                 }
724 
725                 s3.send(()).unwrap();
726             }
727         }
728     })
729     .unwrap();
730 }
731 
732 #[test]
stress_send()733 fn stress_send() {
734     const COUNT: usize = 10_000;
735 
736     let (s1, r1) = bounded(0);
737     let (s2, r2) = bounded(0);
738     let (s3, r3) = bounded(100);
739 
740     scope(|scope| {
741         scope.spawn(|_| {
742             for i in 0..COUNT {
743                 assert_eq!(r1.recv().unwrap(), i);
744                 assert_eq!(r2.recv().unwrap(), i);
745                 r3.recv().unwrap();
746             }
747         });
748 
749         for i in 0..COUNT {
750             for _ in 0..2 {
751                 let mut sel = Select::new();
752                 let oper1 = sel.send(&s1);
753                 let oper2 = sel.send(&s2);
754                 let oper = sel.select();
755                 match oper.index() {
756                     ix if ix == oper1 => assert!(oper.send(&s1, i).is_ok()),
757                     ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
758                     _ => unreachable!(),
759                 }
760             }
761             s3.send(()).unwrap();
762         }
763     })
764     .unwrap();
765 }
766 
767 #[test]
stress_mixed()768 fn stress_mixed() {
769     const COUNT: usize = 10_000;
770 
771     let (s1, r1) = bounded(0);
772     let (s2, r2) = bounded(0);
773     let (s3, r3) = bounded(100);
774 
775     scope(|scope| {
776         scope.spawn(|_| {
777             for i in 0..COUNT {
778                 s1.send(i).unwrap();
779                 assert_eq!(r2.recv().unwrap(), i);
780                 r3.recv().unwrap();
781             }
782         });
783 
784         for i in 0..COUNT {
785             for _ in 0..2 {
786                 let mut sel = Select::new();
787                 let oper1 = sel.recv(&r1);
788                 let oper2 = sel.send(&s2);
789                 let oper = sel.select();
790                 match oper.index() {
791                     ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
792                     ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
793                     _ => unreachable!(),
794                 }
795             }
796             s3.send(()).unwrap();
797         }
798     })
799     .unwrap();
800 }
801 
802 #[test]
stress_timeout_two_threads()803 fn stress_timeout_two_threads() {
804     const COUNT: usize = 20;
805 
806     let (s, r) = bounded(2);
807 
808     scope(|scope| {
809         scope.spawn(|_| {
810             for i in 0..COUNT {
811                 if i % 2 == 0 {
812                     thread::sleep(ms(500));
813                 }
814 
815                 let done = false;
816                 while !done {
817                     let mut sel = Select::new();
818                     let oper1 = sel.send(&s);
819                     let oper = sel.select_timeout(ms(100));
820                     match oper {
821                         Err(_) => {}
822                         Ok(oper) => match oper.index() {
823                             ix if ix == oper1 => {
824                                 assert!(oper.send(&s, i).is_ok());
825                                 break;
826                             }
827                             _ => unreachable!(),
828                         },
829                     }
830                 }
831             }
832         });
833 
834         scope.spawn(|_| {
835             for i in 0..COUNT {
836                 if i % 2 == 0 {
837                     thread::sleep(ms(500));
838                 }
839 
840                 let mut done = false;
841                 while !done {
842                     let mut sel = Select::new();
843                     let oper1 = sel.recv(&r);
844                     let oper = sel.select_timeout(ms(100));
845                     match oper {
846                         Err(_) => {}
847                         Ok(oper) => match oper.index() {
848                             ix if ix == oper1 => {
849                                 assert_eq!(oper.recv(&r), Ok(i));
850                                 done = true;
851                             }
852                             _ => unreachable!(),
853                         },
854                     }
855                 }
856             }
857         });
858     })
859     .unwrap();
860 }
861 
862 #[test]
send_recv_same_channel()863 fn send_recv_same_channel() {
864     let (s, r) = bounded::<i32>(0);
865     let mut sel = Select::new();
866     let oper1 = sel.send(&s);
867     let oper2 = sel.recv(&r);
868     let oper = sel.select_timeout(ms(100));
869     match oper {
870         Err(_) => {}
871         Ok(oper) => match oper.index() {
872             ix if ix == oper1 => panic!(),
873             ix if ix == oper2 => panic!(),
874             _ => unreachable!(),
875         },
876     }
877 
878     let (s, r) = unbounded::<i32>();
879     let mut sel = Select::new();
880     let oper1 = sel.send(&s);
881     let oper2 = sel.recv(&r);
882     let oper = sel.select_timeout(ms(100));
883     match oper {
884         Err(_) => panic!(),
885         Ok(oper) => match oper.index() {
886             ix if ix == oper1 => assert!(oper.send(&s, 0).is_ok()),
887             ix if ix == oper2 => panic!(),
888             _ => unreachable!(),
889         },
890     }
891 }
892 
893 #[test]
matching()894 fn matching() {
895     const THREADS: usize = 44;
896 
897     let (s, r) = &bounded::<usize>(0);
898 
899     scope(|scope| {
900         for i in 0..THREADS {
901             scope.spawn(move |_| {
902                 let mut sel = Select::new();
903                 let oper1 = sel.recv(&r);
904                 let oper2 = sel.send(&s);
905                 let oper = sel.select();
906                 match oper.index() {
907                     ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
908                     ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
909                     _ => unreachable!(),
910                 }
911             });
912         }
913     })
914     .unwrap();
915 
916     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
917 }
918 
919 #[test]
matching_with_leftover()920 fn matching_with_leftover() {
921     const THREADS: usize = 55;
922 
923     let (s, r) = &bounded::<usize>(0);
924 
925     scope(|scope| {
926         for i in 0..THREADS {
927             scope.spawn(move |_| {
928                 let mut sel = Select::new();
929                 let oper1 = sel.recv(&r);
930                 let oper2 = sel.send(&s);
931                 let oper = sel.select();
932                 match oper.index() {
933                     ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
934                     ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
935                     _ => unreachable!(),
936                 }
937             });
938         }
939         s.send(!0).unwrap();
940     })
941     .unwrap();
942 
943     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
944 }
945 
946 #[test]
channel_through_channel()947 fn channel_through_channel() {
948     const COUNT: usize = 1000;
949 
950     type T = Box<dyn Any + Send>;
951 
952     for cap in 0..3 {
953         let (s, r) = bounded::<T>(cap);
954 
955         scope(|scope| {
956             scope.spawn(move |_| {
957                 let mut s = s;
958 
959                 for _ in 0..COUNT {
960                     let (new_s, new_r) = bounded(cap);
961                     let new_r: T = Box::new(Some(new_r));
962 
963                     {
964                         let mut sel = Select::new();
965                         let oper1 = sel.send(&s);
966                         let oper = sel.select();
967                         match oper.index() {
968                             ix if ix == oper1 => assert!(oper.send(&s, new_r).is_ok()),
969                             _ => unreachable!(),
970                         }
971                     }
972 
973                     s = new_s;
974                 }
975             });
976 
977             scope.spawn(move |_| {
978                 let mut r = r;
979 
980                 for _ in 0..COUNT {
981                     let new = {
982                         let mut sel = Select::new();
983                         let oper1 = sel.recv(&r);
984                         let oper = sel.select();
985                         match oper.index() {
986                             ix if ix == oper1 => oper
987                                 .recv(&r)
988                                 .unwrap()
989                                 .downcast_mut::<Option<Receiver<T>>>()
990                                 .unwrap()
991                                 .take()
992                                 .unwrap(),
993                             _ => unreachable!(),
994                         }
995                     };
996                     r = new;
997                 }
998             });
999         })
1000         .unwrap();
1001     }
1002 }
1003 
1004 #[test]
linearizable_try()1005 fn linearizable_try() {
1006     const COUNT: usize = 100_000;
1007 
1008     for step in 0..2 {
1009         let (start_s, start_r) = bounded::<()>(0);
1010         let (end_s, end_r) = bounded::<()>(0);
1011 
1012         let ((s1, r1), (s2, r2)) = if step == 0 {
1013             (bounded::<i32>(1), bounded::<i32>(1))
1014         } else {
1015             (unbounded::<i32>(), unbounded::<i32>())
1016         };
1017 
1018         scope(|scope| {
1019             scope.spawn(|_| {
1020                 for _ in 0..COUNT {
1021                     start_s.send(()).unwrap();
1022 
1023                     s1.send(1).unwrap();
1024 
1025                     let mut sel = Select::new();
1026                     let oper1 = sel.recv(&r1);
1027                     let oper2 = sel.recv(&r2);
1028                     let oper = sel.try_select();
1029                     match oper {
1030                         Err(_) => unreachable!(),
1031                         Ok(oper) => match oper.index() {
1032                             ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
1033                             ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
1034                             _ => unreachable!(),
1035                         },
1036                     }
1037 
1038                     end_s.send(()).unwrap();
1039                     let _ = r2.try_recv();
1040                 }
1041             });
1042 
1043             for _ in 0..COUNT {
1044                 start_r.recv().unwrap();
1045 
1046                 s2.send(1).unwrap();
1047                 let _ = r1.try_recv();
1048 
1049                 end_r.recv().unwrap();
1050             }
1051         })
1052         .unwrap();
1053     }
1054 }
1055 
1056 #[test]
linearizable_timeout()1057 fn linearizable_timeout() {
1058     const COUNT: usize = 100_000;
1059 
1060     for step in 0..2 {
1061         let (start_s, start_r) = bounded::<()>(0);
1062         let (end_s, end_r) = bounded::<()>(0);
1063 
1064         let ((s1, r1), (s2, r2)) = if step == 0 {
1065             (bounded::<i32>(1), bounded::<i32>(1))
1066         } else {
1067             (unbounded::<i32>(), unbounded::<i32>())
1068         };
1069 
1070         scope(|scope| {
1071             scope.spawn(|_| {
1072                 for _ in 0..COUNT {
1073                     start_s.send(()).unwrap();
1074 
1075                     s1.send(1).unwrap();
1076 
1077                     let mut sel = Select::new();
1078                     let oper1 = sel.recv(&r1);
1079                     let oper2 = sel.recv(&r2);
1080                     let oper = sel.select_timeout(ms(0));
1081                     match oper {
1082                         Err(_) => unreachable!(),
1083                         Ok(oper) => match oper.index() {
1084                             ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
1085                             ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
1086                             _ => unreachable!(),
1087                         },
1088                     }
1089 
1090                     end_s.send(()).unwrap();
1091                     let _ = r2.try_recv();
1092                 }
1093             });
1094 
1095             for _ in 0..COUNT {
1096                 start_r.recv().unwrap();
1097 
1098                 s2.send(1).unwrap();
1099                 let _ = r1.try_recv();
1100 
1101                 end_r.recv().unwrap();
1102             }
1103         })
1104         .unwrap();
1105     }
1106 }
1107 
1108 #[test]
fairness1()1109 fn fairness1() {
1110     const COUNT: usize = 10_000;
1111 
1112     let (s1, r1) = bounded::<()>(COUNT);
1113     let (s2, r2) = unbounded::<()>();
1114 
1115     for _ in 0..COUNT {
1116         s1.send(()).unwrap();
1117         s2.send(()).unwrap();
1118     }
1119 
1120     let hits = vec![Cell::new(0usize); 4];
1121     for _ in 0..COUNT {
1122         let after = after(ms(0));
1123         let tick = tick(ms(0));
1124 
1125         let mut sel = Select::new();
1126         let oper1 = sel.recv(&r1);
1127         let oper2 = sel.recv(&r2);
1128         let oper3 = sel.recv(&after);
1129         let oper4 = sel.recv(&tick);
1130         let oper = sel.select();
1131         match oper.index() {
1132             i if i == oper1 => {
1133                 oper.recv(&r1).unwrap();
1134                 hits[0].set(hits[0].get() + 1);
1135             }
1136             i if i == oper2 => {
1137                 oper.recv(&r2).unwrap();
1138                 hits[1].set(hits[1].get() + 1);
1139             }
1140             i if i == oper3 => {
1141                 oper.recv(&after).unwrap();
1142                 hits[2].set(hits[2].get() + 1);
1143             }
1144             i if i == oper4 => {
1145                 oper.recv(&tick).unwrap();
1146                 hits[3].set(hits[3].get() + 1);
1147             }
1148             _ => unreachable!(),
1149         }
1150     }
1151     assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2));
1152 }
1153 
1154 #[test]
fairness2()1155 fn fairness2() {
1156     const COUNT: usize = 10_000;
1157 
1158     let (s1, r1) = unbounded::<()>();
1159     let (s2, r2) = bounded::<()>(1);
1160     let (s3, r3) = bounded::<()>(0);
1161 
1162     scope(|scope| {
1163         scope.spawn(|_| {
1164             for _ in 0..COUNT {
1165                 let mut sel = Select::new();
1166                 let mut oper1 = None;
1167                 let mut oper2 = None;
1168                 if s1.is_empty() {
1169                     oper1 = Some(sel.send(&s1));
1170                 }
1171                 if s2.is_empty() {
1172                     oper2 = Some(sel.send(&s2));
1173                 }
1174                 let oper3 = sel.send(&s3);
1175                 let oper = sel.select();
1176                 match oper.index() {
1177                     i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()),
1178                     i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()),
1179                     i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()),
1180                     _ => unreachable!(),
1181                 }
1182             }
1183         });
1184 
1185         let hits = vec![Cell::new(0usize); 3];
1186         for _ in 0..COUNT {
1187             let mut sel = Select::new();
1188             let oper1 = sel.recv(&r1);
1189             let oper2 = sel.recv(&r2);
1190             let oper3 = sel.recv(&r3);
1191             let oper = sel.select();
1192             match oper.index() {
1193                 i if i == oper1 => {
1194                     oper.recv(&r1).unwrap();
1195                     hits[0].set(hits[0].get() + 1);
1196                 }
1197                 i if i == oper2 => {
1198                     oper.recv(&r2).unwrap();
1199                     hits[1].set(hits[1].get() + 1);
1200                 }
1201                 i if i == oper3 => {
1202                     oper.recv(&r3).unwrap();
1203                     hits[2].set(hits[2].get() + 1);
1204                 }
1205                 _ => unreachable!(),
1206             }
1207         }
1208         assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50));
1209     })
1210     .unwrap();
1211 }
1212 
1213 #[test]
sync_and_clone()1214 fn sync_and_clone() {
1215     const THREADS: usize = 20;
1216 
1217     let (s, r) = &bounded::<usize>(0);
1218 
1219     let mut sel = Select::new();
1220     let oper1 = sel.recv(&r);
1221     let oper2 = sel.send(&s);
1222     let sel = &sel;
1223 
1224     scope(|scope| {
1225         for i in 0..THREADS {
1226             scope.spawn(move |_| {
1227                 let mut sel = sel.clone();
1228                 let oper = sel.select();
1229                 match oper.index() {
1230                     ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
1231                     ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
1232                     _ => unreachable!(),
1233                 }
1234             });
1235         }
1236     })
1237     .unwrap();
1238 
1239     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
1240 }
1241 
1242 #[test]
send_and_clone()1243 fn send_and_clone() {
1244     const THREADS: usize = 20;
1245 
1246     let (s, r) = &bounded::<usize>(0);
1247 
1248     let mut sel = Select::new();
1249     let oper1 = sel.recv(&r);
1250     let oper2 = sel.send(&s);
1251 
1252     scope(|scope| {
1253         for i in 0..THREADS {
1254             let mut sel = sel.clone();
1255             scope.spawn(move |_| {
1256                 let oper = sel.select();
1257                 match oper.index() {
1258                     ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
1259                     ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
1260                     _ => unreachable!(),
1261                 }
1262             });
1263         }
1264     })
1265     .unwrap();
1266 
1267     assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
1268 }
1269 
1270 #[test]
reuse()1271 fn reuse() {
1272     const COUNT: usize = 10_000;
1273 
1274     let (s1, r1) = bounded(0);
1275     let (s2, r2) = bounded(0);
1276     let (s3, r3) = bounded(100);
1277 
1278     scope(|scope| {
1279         scope.spawn(|_| {
1280             for i in 0..COUNT {
1281                 s1.send(i).unwrap();
1282                 assert_eq!(r2.recv().unwrap(), i);
1283                 r3.recv().unwrap();
1284             }
1285         });
1286 
1287         let mut sel = Select::new();
1288         let oper1 = sel.recv(&r1);
1289         let oper2 = sel.send(&s2);
1290 
1291         for i in 0..COUNT {
1292             for _ in 0..2 {
1293                 let oper = sel.select();
1294                 match oper.index() {
1295                     ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
1296                     ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
1297                     _ => unreachable!(),
1298                 }
1299             }
1300             s3.send(()).unwrap();
1301         }
1302     })
1303     .unwrap();
1304 }
1305