1 //! Tests for channel selection using the `Select` struct.
2
3 use std::any::Any;
4 use std::cell::Cell;
5 use std::thread;
6 use std::time::{Duration, Instant};
7
8 use crossbeam_channel::{after, bounded, tick, unbounded, Receiver, Select, TryRecvError};
9 use crossbeam_utils::thread::scope;
10
ms(ms: u64) -> Duration11 fn ms(ms: u64) -> Duration {
12 Duration::from_millis(ms)
13 }
14
15 #[test]
smoke1()16 fn smoke1() {
17 let (s1, r1) = unbounded::<usize>();
18 let (s2, r2) = unbounded::<usize>();
19
20 s1.send(1).unwrap();
21
22 let mut sel = Select::new();
23 let oper1 = sel.recv(&r1);
24 let oper2 = sel.recv(&r2);
25 let oper = sel.select();
26 match oper.index() {
27 i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
28 i if i == oper2 => panic!(),
29 _ => unreachable!(),
30 }
31
32 s2.send(2).unwrap();
33
34 let mut sel = Select::new();
35 let oper1 = sel.recv(&r1);
36 let oper2 = sel.recv(&r2);
37 let oper = sel.select();
38 match oper.index() {
39 i if i == oper1 => panic!(),
40 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
41 _ => unreachable!(),
42 }
43 }
44
45 #[test]
smoke2()46 fn smoke2() {
47 let (_s1, r1) = unbounded::<i32>();
48 let (_s2, r2) = unbounded::<i32>();
49 let (_s3, r3) = unbounded::<i32>();
50 let (_s4, r4) = unbounded::<i32>();
51 let (s5, r5) = unbounded::<i32>();
52
53 s5.send(5).unwrap();
54
55 let mut sel = Select::new();
56 let oper1 = sel.recv(&r1);
57 let oper2 = sel.recv(&r2);
58 let oper3 = sel.recv(&r3);
59 let oper4 = sel.recv(&r4);
60 let oper5 = sel.recv(&r5);
61 let oper = sel.select();
62 match oper.index() {
63 i if i == oper1 => panic!(),
64 i if i == oper2 => panic!(),
65 i if i == oper3 => panic!(),
66 i if i == oper4 => panic!(),
67 i if i == oper5 => assert_eq!(oper.recv(&r5), Ok(5)),
68 _ => unreachable!(),
69 }
70 }
71
72 #[test]
disconnected()73 fn disconnected() {
74 let (s1, r1) = unbounded::<i32>();
75 let (s2, r2) = unbounded::<i32>();
76
77 scope(|scope| {
78 scope.spawn(|_| {
79 drop(s1);
80 thread::sleep(ms(500));
81 s2.send(5).unwrap();
82 });
83
84 let mut sel = Select::new();
85 let oper1 = sel.recv(&r1);
86 let oper2 = sel.recv(&r2);
87 let oper = sel.select_timeout(ms(1000));
88 match oper {
89 Err(_) => panic!(),
90 Ok(oper) => match oper.index() {
91 i if i == oper1 => assert!(oper.recv(&r1).is_err()),
92 i if i == oper2 => panic!(),
93 _ => unreachable!(),
94 },
95 }
96
97 r2.recv().unwrap();
98 })
99 .unwrap();
100
101 let mut sel = Select::new();
102 let oper1 = sel.recv(&r1);
103 let oper2 = sel.recv(&r2);
104 let oper = sel.select_timeout(ms(1000));
105 match oper {
106 Err(_) => panic!(),
107 Ok(oper) => match oper.index() {
108 i if i == oper1 => assert!(oper.recv(&r1).is_err()),
109 i if i == oper2 => panic!(),
110 _ => unreachable!(),
111 },
112 }
113
114 scope(|scope| {
115 scope.spawn(|_| {
116 thread::sleep(ms(500));
117 drop(s2);
118 });
119
120 let mut sel = Select::new();
121 let oper1 = sel.recv(&r2);
122 let oper = sel.select_timeout(ms(1000));
123 match oper {
124 Err(_) => panic!(),
125 Ok(oper) => match oper.index() {
126 i if i == oper1 => assert!(oper.recv(&r2).is_err()),
127 _ => unreachable!(),
128 },
129 }
130 })
131 .unwrap();
132 }
133
134 #[test]
default()135 fn default() {
136 let (s1, r1) = unbounded::<i32>();
137 let (s2, r2) = unbounded::<i32>();
138
139 let mut sel = Select::new();
140 let _oper1 = sel.recv(&r1);
141 let _oper2 = sel.recv(&r2);
142 let oper = sel.try_select();
143 match oper {
144 Err(_) => {}
145 Ok(_) => panic!(),
146 }
147
148 drop(s1);
149
150 let mut sel = Select::new();
151 let oper1 = sel.recv(&r1);
152 let oper2 = sel.recv(&r2);
153 let oper = sel.try_select();
154 match oper {
155 Err(_) => panic!(),
156 Ok(oper) => match oper.index() {
157 i if i == oper1 => assert!(oper.recv(&r1).is_err()),
158 i if i == oper2 => panic!(),
159 _ => unreachable!(),
160 },
161 }
162
163 s2.send(2).unwrap();
164
165 let mut sel = Select::new();
166 let oper1 = sel.recv(&r2);
167 let oper = sel.try_select();
168 match oper {
169 Err(_) => panic!(),
170 Ok(oper) => match oper.index() {
171 i if i == oper1 => assert_eq!(oper.recv(&r2), Ok(2)),
172 _ => unreachable!(),
173 },
174 }
175
176 let mut sel = Select::new();
177 let _oper1 = sel.recv(&r2);
178 let oper = sel.try_select();
179 match oper {
180 Err(_) => {}
181 Ok(_) => panic!(),
182 }
183
184 let mut sel = Select::new();
185 let oper = sel.try_select();
186 match oper {
187 Err(_) => {}
188 Ok(_) => panic!(),
189 }
190 }
191
192 #[test]
timeout()193 fn timeout() {
194 let (_s1, r1) = unbounded::<i32>();
195 let (s2, r2) = unbounded::<i32>();
196
197 scope(|scope| {
198 scope.spawn(|_| {
199 thread::sleep(ms(1500));
200 s2.send(2).unwrap();
201 });
202
203 let mut sel = Select::new();
204 let oper1 = sel.recv(&r1);
205 let oper2 = sel.recv(&r2);
206 let oper = sel.select_timeout(ms(1000));
207 match oper {
208 Err(_) => {}
209 Ok(oper) => match oper.index() {
210 i if i == oper1 => panic!(),
211 i if i == oper2 => panic!(),
212 _ => unreachable!(),
213 },
214 }
215
216 let mut sel = Select::new();
217 let oper1 = sel.recv(&r1);
218 let oper2 = sel.recv(&r2);
219 let oper = sel.select_timeout(ms(1000));
220 match oper {
221 Err(_) => panic!(),
222 Ok(oper) => match oper.index() {
223 i if i == oper1 => panic!(),
224 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
225 _ => unreachable!(),
226 },
227 }
228 })
229 .unwrap();
230
231 scope(|scope| {
232 let (s, r) = unbounded::<i32>();
233
234 scope.spawn(move |_| {
235 thread::sleep(ms(500));
236 drop(s);
237 });
238
239 let mut sel = Select::new();
240 let oper = sel.select_timeout(ms(1000));
241 match oper {
242 Err(_) => {
243 let mut sel = Select::new();
244 let oper1 = sel.recv(&r);
245 let oper = sel.try_select();
246 match oper {
247 Err(_) => panic!(),
248 Ok(oper) => match oper.index() {
249 i if i == oper1 => assert!(oper.recv(&r).is_err()),
250 _ => unreachable!(),
251 },
252 }
253 }
254 Ok(_) => unreachable!(),
255 }
256 })
257 .unwrap();
258 }
259
260 #[test]
default_when_disconnected()261 fn default_when_disconnected() {
262 let (_, r) = unbounded::<i32>();
263
264 let mut sel = Select::new();
265 let oper1 = sel.recv(&r);
266 let oper = sel.try_select();
267 match oper {
268 Err(_) => panic!(),
269 Ok(oper) => match oper.index() {
270 i if i == oper1 => assert!(oper.recv(&r).is_err()),
271 _ => unreachable!(),
272 },
273 }
274
275 let (_, r) = unbounded::<i32>();
276
277 let mut sel = Select::new();
278 let oper1 = sel.recv(&r);
279 let oper = sel.select_timeout(ms(1000));
280 match oper {
281 Err(_) => panic!(),
282 Ok(oper) => match oper.index() {
283 i if i == oper1 => assert!(oper.recv(&r).is_err()),
284 _ => unreachable!(),
285 },
286 }
287
288 let (s, _) = bounded::<i32>(0);
289
290 let mut sel = Select::new();
291 let oper1 = sel.send(&s);
292 let oper = sel.try_select();
293 match oper {
294 Err(_) => panic!(),
295 Ok(oper) => match oper.index() {
296 i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
297 _ => unreachable!(),
298 },
299 }
300
301 let (s, _) = bounded::<i32>(0);
302
303 let mut sel = Select::new();
304 let oper1 = sel.send(&s);
305 let oper = sel.select_timeout(ms(1000));
306 match oper {
307 Err(_) => panic!(),
308 Ok(oper) => match oper.index() {
309 i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
310 _ => unreachable!(),
311 },
312 }
313 }
314
315 #[test]
default_only()316 fn default_only() {
317 let start = Instant::now();
318
319 let mut sel = Select::new();
320 let oper = sel.try_select();
321 assert!(oper.is_err());
322 let now = Instant::now();
323 assert!(now - start <= ms(50));
324
325 let start = Instant::now();
326 let mut sel = Select::new();
327 let oper = sel.select_timeout(ms(500));
328 assert!(oper.is_err());
329 let now = Instant::now();
330 assert!(now - start >= ms(450));
331 assert!(now - start <= ms(550));
332 }
333
334 #[test]
unblocks()335 fn unblocks() {
336 let (s1, r1) = bounded::<i32>(0);
337 let (s2, r2) = bounded::<i32>(0);
338
339 scope(|scope| {
340 scope.spawn(|_| {
341 thread::sleep(ms(500));
342 s2.send(2).unwrap();
343 });
344
345 let mut sel = Select::new();
346 let oper1 = sel.recv(&r1);
347 let oper2 = sel.recv(&r2);
348 let oper = sel.select_timeout(ms(1000));
349 match oper {
350 Err(_) => panic!(),
351 Ok(oper) => match oper.index() {
352 i if i == oper1 => panic!(),
353 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
354 _ => unreachable!(),
355 },
356 }
357 })
358 .unwrap();
359
360 scope(|scope| {
361 scope.spawn(|_| {
362 thread::sleep(ms(500));
363 assert_eq!(r1.recv().unwrap(), 1);
364 });
365
366 let mut sel = Select::new();
367 let oper1 = sel.send(&s1);
368 let oper2 = sel.send(&s2);
369 let oper = sel.select_timeout(ms(1000));
370 match oper {
371 Err(_) => panic!(),
372 Ok(oper) => match oper.index() {
373 i if i == oper1 => oper.send(&s1, 1).unwrap(),
374 i if i == oper2 => panic!(),
375 _ => unreachable!(),
376 },
377 }
378 })
379 .unwrap();
380 }
381
382 #[test]
both_ready()383 fn both_ready() {
384 let (s1, r1) = bounded(0);
385 let (s2, r2) = bounded(0);
386
387 scope(|scope| {
388 scope.spawn(|_| {
389 thread::sleep(ms(500));
390 s1.send(1).unwrap();
391 assert_eq!(r2.recv().unwrap(), 2);
392 });
393
394 for _ in 0..2 {
395 let mut sel = Select::new();
396 let oper1 = sel.recv(&r1);
397 let oper2 = sel.send(&s2);
398 let oper = sel.select();
399 match oper.index() {
400 i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
401 i if i == oper2 => oper.send(&s2, 2).unwrap(),
402 _ => unreachable!(),
403 }
404 }
405 })
406 .unwrap();
407 }
408
409 #[test]
loop_try()410 fn loop_try() {
411 const RUNS: usize = 20;
412
413 for _ in 0..RUNS {
414 let (s1, r1) = bounded::<i32>(0);
415 let (s2, r2) = bounded::<i32>(0);
416 let (s_end, r_end) = bounded::<()>(0);
417
418 scope(|scope| {
419 scope.spawn(|_| loop {
420 let mut done = false;
421
422 let mut sel = Select::new();
423 let oper1 = sel.send(&s1);
424 let oper = sel.try_select();
425 match oper {
426 Err(_) => {}
427 Ok(oper) => match oper.index() {
428 i if i == oper1 => {
429 let _ = oper.send(&s1, 1);
430 done = true;
431 }
432 _ => unreachable!(),
433 },
434 }
435 if done {
436 break;
437 }
438
439 let mut sel = Select::new();
440 let oper1 = sel.recv(&r_end);
441 let oper = sel.try_select();
442 match oper {
443 Err(_) => {}
444 Ok(oper) => match oper.index() {
445 i if i == oper1 => {
446 let _ = oper.recv(&r_end);
447 done = true;
448 }
449 _ => unreachable!(),
450 },
451 }
452 if done {
453 break;
454 }
455 });
456
457 scope.spawn(|_| loop {
458 if let Ok(x) = r2.try_recv() {
459 assert_eq!(x, 2);
460 break;
461 }
462
463 let mut done = false;
464 let mut sel = Select::new();
465 let oper1 = sel.recv(&r_end);
466 let oper = sel.try_select();
467 match oper {
468 Err(_) => {}
469 Ok(oper) => match oper.index() {
470 i if i == oper1 => {
471 let _ = oper.recv(&r_end);
472 done = true;
473 }
474 _ => unreachable!(),
475 },
476 }
477 if done {
478 break;
479 }
480 });
481
482 scope.spawn(|_| {
483 thread::sleep(ms(500));
484
485 let mut sel = Select::new();
486 let oper1 = sel.recv(&r1);
487 let oper2 = sel.send(&s2);
488 let oper = sel.select_timeout(ms(1000));
489 match oper {
490 Err(_) => {}
491 Ok(oper) => match oper.index() {
492 i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
493 i if i == oper2 => assert!(oper.send(&s2, 2).is_ok()),
494 _ => unreachable!(),
495 },
496 }
497
498 drop(s_end);
499 });
500 })
501 .unwrap();
502 }
503 }
504
505 #[test]
cloning1()506 fn cloning1() {
507 scope(|scope| {
508 let (s1, r1) = unbounded::<i32>();
509 let (_s2, r2) = unbounded::<i32>();
510 let (s3, r3) = unbounded::<()>();
511
512 scope.spawn(move |_| {
513 r3.recv().unwrap();
514 drop(s1.clone());
515 assert!(r3.try_recv().is_err());
516 s1.send(1).unwrap();
517 r3.recv().unwrap();
518 });
519
520 s3.send(()).unwrap();
521
522 let mut sel = Select::new();
523 let oper1 = sel.recv(&r1);
524 let oper2 = sel.recv(&r2);
525 let oper = sel.select();
526 match oper.index() {
527 i if i == oper1 => drop(oper.recv(&r1)),
528 i if i == oper2 => drop(oper.recv(&r2)),
529 _ => unreachable!(),
530 }
531
532 s3.send(()).unwrap();
533 })
534 .unwrap();
535 }
536
537 #[test]
cloning2()538 fn cloning2() {
539 let (s1, r1) = unbounded::<()>();
540 let (s2, r2) = unbounded::<()>();
541 let (_s3, _r3) = unbounded::<()>();
542
543 scope(|scope| {
544 scope.spawn(move |_| {
545 let mut sel = Select::new();
546 let oper1 = sel.recv(&r1);
547 let oper2 = sel.recv(&r2);
548 let oper = sel.select();
549 match oper.index() {
550 i if i == oper1 => panic!(),
551 i if i == oper2 => drop(oper.recv(&r2)),
552 _ => unreachable!(),
553 }
554 });
555
556 thread::sleep(ms(500));
557 drop(s1.clone());
558 s2.send(()).unwrap();
559 })
560 .unwrap();
561 }
562
563 #[test]
preflight1()564 fn preflight1() {
565 let (s, r) = unbounded();
566 s.send(()).unwrap();
567
568 let mut sel = Select::new();
569 let oper1 = sel.recv(&r);
570 let oper = sel.select();
571 match oper.index() {
572 i if i == oper1 => drop(oper.recv(&r)),
573 _ => unreachable!(),
574 }
575 }
576
577 #[test]
preflight2()578 fn preflight2() {
579 let (s, r) = unbounded();
580 drop(s.clone());
581 s.send(()).unwrap();
582 drop(s);
583
584 let mut sel = Select::new();
585 let oper1 = sel.recv(&r);
586 let oper = sel.select();
587 match oper.index() {
588 i if i == oper1 => assert_eq!(oper.recv(&r), Ok(())),
589 _ => unreachable!(),
590 }
591
592 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
593 }
594
595 #[test]
preflight3()596 fn preflight3() {
597 let (s, r) = unbounded();
598 drop(s.clone());
599 s.send(()).unwrap();
600 drop(s);
601 r.recv().unwrap();
602
603 let mut sel = Select::new();
604 let oper1 = sel.recv(&r);
605 let oper = sel.select();
606 match oper.index() {
607 i if i == oper1 => assert!(oper.recv(&r).is_err()),
608 _ => unreachable!(),
609 }
610 }
611
612 #[test]
duplicate_operations()613 fn duplicate_operations() {
614 let (s, r) = unbounded::<i32>();
615 let hit = vec![Cell::new(false); 4];
616
617 while hit.iter().map(|h| h.get()).any(|hit| !hit) {
618 let mut sel = Select::new();
619 let oper0 = sel.recv(&r);
620 let oper1 = sel.recv(&r);
621 let oper2 = sel.send(&s);
622 let oper3 = sel.send(&s);
623 let oper = sel.select();
624 match oper.index() {
625 i if i == oper0 => {
626 assert!(oper.recv(&r).is_ok());
627 hit[0].set(true);
628 }
629 i if i == oper1 => {
630 assert!(oper.recv(&r).is_ok());
631 hit[1].set(true);
632 }
633 i if i == oper2 => {
634 assert!(oper.send(&s, 0).is_ok());
635 hit[2].set(true);
636 }
637 i if i == oper3 => {
638 assert!(oper.send(&s, 0).is_ok());
639 hit[3].set(true);
640 }
641 _ => unreachable!(),
642 }
643 }
644 }
645
646 #[test]
nesting()647 fn nesting() {
648 let (s, r) = unbounded::<i32>();
649
650 let mut sel = Select::new();
651 let oper1 = sel.send(&s);
652 let oper = sel.select();
653 match oper.index() {
654 i if i == oper1 => {
655 assert!(oper.send(&s, 0).is_ok());
656
657 let mut sel = Select::new();
658 let oper1 = sel.recv(&r);
659 let oper = sel.select();
660 match oper.index() {
661 i if i == oper1 => {
662 assert_eq!(oper.recv(&r), Ok(0));
663
664 let mut sel = Select::new();
665 let oper1 = sel.send(&s);
666 let oper = sel.select();
667 match oper.index() {
668 i if i == oper1 => {
669 assert!(oper.send(&s, 1).is_ok());
670
671 let mut sel = Select::new();
672 let oper1 = sel.recv(&r);
673 let oper = sel.select();
674 match oper.index() {
675 i if i == oper1 => {
676 assert_eq!(oper.recv(&r), Ok(1));
677 }
678 _ => unreachable!(),
679 }
680 }
681 _ => unreachable!(),
682 }
683 }
684 _ => unreachable!(),
685 }
686 }
687 _ => unreachable!(),
688 }
689 }
690
691 #[test]
stress_recv()692 fn stress_recv() {
693 const COUNT: usize = 10_000;
694
695 let (s1, r1) = unbounded();
696 let (s2, r2) = bounded(5);
697 let (s3, r3) = bounded(100);
698
699 scope(|scope| {
700 scope.spawn(|_| {
701 for i in 0..COUNT {
702 s1.send(i).unwrap();
703 r3.recv().unwrap();
704
705 s2.send(i).unwrap();
706 r3.recv().unwrap();
707 }
708 });
709
710 for i in 0..COUNT {
711 for _ in 0..2 {
712 let mut sel = Select::new();
713 let oper1 = sel.recv(&r1);
714 let oper2 = sel.recv(&r2);
715 let oper = sel.select();
716 match oper.index() {
717 ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
718 ix if ix == oper2 => assert_eq!(oper.recv(&r2), Ok(i)),
719 _ => unreachable!(),
720 }
721
722 s3.send(()).unwrap();
723 }
724 }
725 })
726 .unwrap();
727 }
728
729 #[test]
stress_send()730 fn stress_send() {
731 const COUNT: usize = 10_000;
732
733 let (s1, r1) = bounded(0);
734 let (s2, r2) = bounded(0);
735 let (s3, r3) = bounded(100);
736
737 scope(|scope| {
738 scope.spawn(|_| {
739 for i in 0..COUNT {
740 assert_eq!(r1.recv().unwrap(), i);
741 assert_eq!(r2.recv().unwrap(), i);
742 r3.recv().unwrap();
743 }
744 });
745
746 for i in 0..COUNT {
747 for _ in 0..2 {
748 let mut sel = Select::new();
749 let oper1 = sel.send(&s1);
750 let oper2 = sel.send(&s2);
751 let oper = sel.select();
752 match oper.index() {
753 ix if ix == oper1 => assert!(oper.send(&s1, i).is_ok()),
754 ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
755 _ => unreachable!(),
756 }
757 }
758 s3.send(()).unwrap();
759 }
760 })
761 .unwrap();
762 }
763
764 #[test]
stress_mixed()765 fn stress_mixed() {
766 const COUNT: usize = 10_000;
767
768 let (s1, r1) = bounded(0);
769 let (s2, r2) = bounded(0);
770 let (s3, r3) = bounded(100);
771
772 scope(|scope| {
773 scope.spawn(|_| {
774 for i in 0..COUNT {
775 s1.send(i).unwrap();
776 assert_eq!(r2.recv().unwrap(), i);
777 r3.recv().unwrap();
778 }
779 });
780
781 for i in 0..COUNT {
782 for _ in 0..2 {
783 let mut sel = Select::new();
784 let oper1 = sel.recv(&r1);
785 let oper2 = sel.send(&s2);
786 let oper = sel.select();
787 match oper.index() {
788 ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
789 ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
790 _ => unreachable!(),
791 }
792 }
793 s3.send(()).unwrap();
794 }
795 })
796 .unwrap();
797 }
798
799 #[test]
stress_timeout_two_threads()800 fn stress_timeout_two_threads() {
801 const COUNT: usize = 20;
802
803 let (s, r) = bounded(2);
804
805 scope(|scope| {
806 scope.spawn(|_| {
807 for i in 0..COUNT {
808 if i % 2 == 0 {
809 thread::sleep(ms(500));
810 }
811
812 loop {
813 let mut sel = Select::new();
814 let oper1 = sel.send(&s);
815 let oper = sel.select_timeout(ms(100));
816 match oper {
817 Err(_) => {}
818 Ok(oper) => match oper.index() {
819 ix if ix == oper1 => {
820 assert!(oper.send(&s, i).is_ok());
821 break;
822 }
823 _ => unreachable!(),
824 },
825 }
826 }
827 }
828 });
829
830 scope.spawn(|_| {
831 for i in 0..COUNT {
832 if i % 2 == 0 {
833 thread::sleep(ms(500));
834 }
835
836 loop {
837 let mut sel = Select::new();
838 let oper1 = sel.recv(&r);
839 let oper = sel.select_timeout(ms(100));
840 match oper {
841 Err(_) => {}
842 Ok(oper) => match oper.index() {
843 ix if ix == oper1 => {
844 assert_eq!(oper.recv(&r), Ok(i));
845 break;
846 }
847 _ => unreachable!(),
848 },
849 }
850 }
851 }
852 });
853 })
854 .unwrap();
855 }
856
857 #[test]
send_recv_same_channel()858 fn send_recv_same_channel() {
859 let (s, r) = bounded::<i32>(0);
860 let mut sel = Select::new();
861 let oper1 = sel.send(&s);
862 let oper2 = sel.recv(&r);
863 let oper = sel.select_timeout(ms(100));
864 match oper {
865 Err(_) => {}
866 Ok(oper) => match oper.index() {
867 ix if ix == oper1 => panic!(),
868 ix if ix == oper2 => panic!(),
869 _ => unreachable!(),
870 },
871 }
872
873 let (s, r) = unbounded::<i32>();
874 let mut sel = Select::new();
875 let oper1 = sel.send(&s);
876 let oper2 = sel.recv(&r);
877 let oper = sel.select_timeout(ms(100));
878 match oper {
879 Err(_) => panic!(),
880 Ok(oper) => match oper.index() {
881 ix if ix == oper1 => assert!(oper.send(&s, 0).is_ok()),
882 ix if ix == oper2 => panic!(),
883 _ => unreachable!(),
884 },
885 }
886 }
887
888 #[test]
matching()889 fn matching() {
890 const THREADS: usize = 44;
891
892 let (s, r) = &bounded::<usize>(0);
893
894 scope(|scope| {
895 for i in 0..THREADS {
896 scope.spawn(move |_| {
897 let mut sel = Select::new();
898 let oper1 = sel.recv(&r);
899 let oper2 = sel.send(&s);
900 let oper = sel.select();
901 match oper.index() {
902 ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
903 ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
904 _ => unreachable!(),
905 }
906 });
907 }
908 })
909 .unwrap();
910
911 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
912 }
913
914 #[test]
matching_with_leftover()915 fn matching_with_leftover() {
916 const THREADS: usize = 55;
917
918 let (s, r) = &bounded::<usize>(0);
919
920 scope(|scope| {
921 for i in 0..THREADS {
922 scope.spawn(move |_| {
923 let mut sel = Select::new();
924 let oper1 = sel.recv(&r);
925 let oper2 = sel.send(&s);
926 let oper = sel.select();
927 match oper.index() {
928 ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
929 ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
930 _ => unreachable!(),
931 }
932 });
933 }
934 s.send(!0).unwrap();
935 })
936 .unwrap();
937
938 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
939 }
940
941 #[test]
channel_through_channel()942 fn channel_through_channel() {
943 const COUNT: usize = 1000;
944
945 type T = Box<dyn Any + Send>;
946
947 for cap in 0..3 {
948 let (s, r) = bounded::<T>(cap);
949
950 scope(|scope| {
951 scope.spawn(move |_| {
952 let mut s = s;
953
954 for _ in 0..COUNT {
955 let (new_s, new_r) = bounded(cap);
956 let new_r: T = Box::new(Some(new_r));
957
958 {
959 let mut sel = Select::new();
960 let oper1 = sel.send(&s);
961 let oper = sel.select();
962 match oper.index() {
963 ix if ix == oper1 => assert!(oper.send(&s, new_r).is_ok()),
964 _ => unreachable!(),
965 }
966 }
967
968 s = new_s;
969 }
970 });
971
972 scope.spawn(move |_| {
973 let mut r = r;
974
975 for _ in 0..COUNT {
976 let new = {
977 let mut sel = Select::new();
978 let oper1 = sel.recv(&r);
979 let oper = sel.select();
980 match oper.index() {
981 ix if ix == oper1 => oper
982 .recv(&r)
983 .unwrap()
984 .downcast_mut::<Option<Receiver<T>>>()
985 .unwrap()
986 .take()
987 .unwrap(),
988 _ => unreachable!(),
989 }
990 };
991 r = new;
992 }
993 });
994 })
995 .unwrap();
996 }
997 }
998
999 #[test]
linearizable_try()1000 fn linearizable_try() {
1001 const COUNT: usize = 100_000;
1002
1003 for step in 0..2 {
1004 let (start_s, start_r) = bounded::<()>(0);
1005 let (end_s, end_r) = bounded::<()>(0);
1006
1007 let ((s1, r1), (s2, r2)) = if step == 0 {
1008 (bounded::<i32>(1), bounded::<i32>(1))
1009 } else {
1010 (unbounded::<i32>(), unbounded::<i32>())
1011 };
1012
1013 scope(|scope| {
1014 scope.spawn(|_| {
1015 for _ in 0..COUNT {
1016 start_s.send(()).unwrap();
1017
1018 s1.send(1).unwrap();
1019
1020 let mut sel = Select::new();
1021 let oper1 = sel.recv(&r1);
1022 let oper2 = sel.recv(&r2);
1023 let oper = sel.try_select();
1024 match oper {
1025 Err(_) => unreachable!(),
1026 Ok(oper) => match oper.index() {
1027 ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
1028 ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
1029 _ => unreachable!(),
1030 },
1031 }
1032
1033 end_s.send(()).unwrap();
1034 let _ = r2.try_recv();
1035 }
1036 });
1037
1038 for _ in 0..COUNT {
1039 start_r.recv().unwrap();
1040
1041 s2.send(1).unwrap();
1042 let _ = r1.try_recv();
1043
1044 end_r.recv().unwrap();
1045 }
1046 })
1047 .unwrap();
1048 }
1049 }
1050
1051 #[test]
linearizable_timeout()1052 fn linearizable_timeout() {
1053 const COUNT: usize = 100_000;
1054
1055 for step in 0..2 {
1056 let (start_s, start_r) = bounded::<()>(0);
1057 let (end_s, end_r) = bounded::<()>(0);
1058
1059 let ((s1, r1), (s2, r2)) = if step == 0 {
1060 (bounded::<i32>(1), bounded::<i32>(1))
1061 } else {
1062 (unbounded::<i32>(), unbounded::<i32>())
1063 };
1064
1065 scope(|scope| {
1066 scope.spawn(|_| {
1067 for _ in 0..COUNT {
1068 start_s.send(()).unwrap();
1069
1070 s1.send(1).unwrap();
1071
1072 let mut sel = Select::new();
1073 let oper1 = sel.recv(&r1);
1074 let oper2 = sel.recv(&r2);
1075 let oper = sel.select_timeout(ms(0));
1076 match oper {
1077 Err(_) => unreachable!(),
1078 Ok(oper) => match oper.index() {
1079 ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
1080 ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
1081 _ => unreachable!(),
1082 },
1083 }
1084
1085 end_s.send(()).unwrap();
1086 let _ = r2.try_recv();
1087 }
1088 });
1089
1090 for _ in 0..COUNT {
1091 start_r.recv().unwrap();
1092
1093 s2.send(1).unwrap();
1094 let _ = r1.try_recv();
1095
1096 end_r.recv().unwrap();
1097 }
1098 })
1099 .unwrap();
1100 }
1101 }
1102
1103 #[test]
fairness1()1104 fn fairness1() {
1105 const COUNT: usize = 10_000;
1106
1107 let (s1, r1) = bounded::<()>(COUNT);
1108 let (s2, r2) = unbounded::<()>();
1109
1110 for _ in 0..COUNT {
1111 s1.send(()).unwrap();
1112 s2.send(()).unwrap();
1113 }
1114
1115 let hits = vec![Cell::new(0usize); 4];
1116 for _ in 0..COUNT {
1117 let after = after(ms(0));
1118 let tick = tick(ms(0));
1119
1120 let mut sel = Select::new();
1121 let oper1 = sel.recv(&r1);
1122 let oper2 = sel.recv(&r2);
1123 let oper3 = sel.recv(&after);
1124 let oper4 = sel.recv(&tick);
1125 let oper = sel.select();
1126 match oper.index() {
1127 i if i == oper1 => {
1128 oper.recv(&r1).unwrap();
1129 hits[0].set(hits[0].get() + 1);
1130 }
1131 i if i == oper2 => {
1132 oper.recv(&r2).unwrap();
1133 hits[1].set(hits[1].get() + 1);
1134 }
1135 i if i == oper3 => {
1136 oper.recv(&after).unwrap();
1137 hits[2].set(hits[2].get() + 1);
1138 }
1139 i if i == oper4 => {
1140 oper.recv(&tick).unwrap();
1141 hits[3].set(hits[3].get() + 1);
1142 }
1143 _ => unreachable!(),
1144 }
1145 }
1146 assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2));
1147 }
1148
1149 #[test]
fairness2()1150 fn fairness2() {
1151 const COUNT: usize = 10_000;
1152
1153 let (s1, r1) = unbounded::<()>();
1154 let (s2, r2) = bounded::<()>(1);
1155 let (s3, r3) = bounded::<()>(0);
1156
1157 scope(|scope| {
1158 scope.spawn(|_| {
1159 for _ in 0..COUNT {
1160 let mut sel = Select::new();
1161 let mut oper1 = None;
1162 let mut oper2 = None;
1163 if s1.is_empty() {
1164 oper1 = Some(sel.send(&s1));
1165 }
1166 if s2.is_empty() {
1167 oper2 = Some(sel.send(&s2));
1168 }
1169 let oper3 = sel.send(&s3);
1170 let oper = sel.select();
1171 match oper.index() {
1172 i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()),
1173 i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()),
1174 i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()),
1175 _ => unreachable!(),
1176 }
1177 }
1178 });
1179
1180 let hits = vec![Cell::new(0usize); 3];
1181 for _ in 0..COUNT {
1182 let mut sel = Select::new();
1183 let oper1 = sel.recv(&r1);
1184 let oper2 = sel.recv(&r2);
1185 let oper3 = sel.recv(&r3);
1186 let oper = sel.select();
1187 match oper.index() {
1188 i if i == oper1 => {
1189 oper.recv(&r1).unwrap();
1190 hits[0].set(hits[0].get() + 1);
1191 }
1192 i if i == oper2 => {
1193 oper.recv(&r2).unwrap();
1194 hits[1].set(hits[1].get() + 1);
1195 }
1196 i if i == oper3 => {
1197 oper.recv(&r3).unwrap();
1198 hits[2].set(hits[2].get() + 1);
1199 }
1200 _ => unreachable!(),
1201 }
1202 }
1203 assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50));
1204 })
1205 .unwrap();
1206 }
1207
1208 #[test]
sync_and_clone()1209 fn sync_and_clone() {
1210 const THREADS: usize = 20;
1211
1212 let (s, r) = &bounded::<usize>(0);
1213
1214 let mut sel = Select::new();
1215 let oper1 = sel.recv(&r);
1216 let oper2 = sel.send(&s);
1217 let sel = &sel;
1218
1219 scope(|scope| {
1220 for i in 0..THREADS {
1221 scope.spawn(move |_| {
1222 let mut sel = sel.clone();
1223 let oper = sel.select();
1224 match oper.index() {
1225 ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
1226 ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
1227 _ => unreachable!(),
1228 }
1229 });
1230 }
1231 })
1232 .unwrap();
1233
1234 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
1235 }
1236
1237 #[test]
send_and_clone()1238 fn send_and_clone() {
1239 const THREADS: usize = 20;
1240
1241 let (s, r) = &bounded::<usize>(0);
1242
1243 let mut sel = Select::new();
1244 let oper1 = sel.recv(&r);
1245 let oper2 = sel.send(&s);
1246
1247 scope(|scope| {
1248 for i in 0..THREADS {
1249 let mut sel = sel.clone();
1250 scope.spawn(move |_| {
1251 let oper = sel.select();
1252 match oper.index() {
1253 ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
1254 ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
1255 _ => unreachable!(),
1256 }
1257 });
1258 }
1259 })
1260 .unwrap();
1261
1262 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
1263 }
1264
1265 #[test]
reuse()1266 fn reuse() {
1267 const COUNT: usize = 10_000;
1268
1269 let (s1, r1) = bounded(0);
1270 let (s2, r2) = bounded(0);
1271 let (s3, r3) = bounded(100);
1272
1273 scope(|scope| {
1274 scope.spawn(|_| {
1275 for i in 0..COUNT {
1276 s1.send(i).unwrap();
1277 assert_eq!(r2.recv().unwrap(), i);
1278 r3.recv().unwrap();
1279 }
1280 });
1281
1282 let mut sel = Select::new();
1283 let oper1 = sel.recv(&r1);
1284 let oper2 = sel.send(&s2);
1285
1286 for i in 0..COUNT {
1287 for _ in 0..2 {
1288 let oper = sel.select();
1289 match oper.index() {
1290 ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
1291 ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
1292 _ => unreachable!(),
1293 }
1294 }
1295 s3.send(()).unwrap();
1296 }
1297 })
1298 .unwrap();
1299 }
1300