1 //! Tests for channel selection using the `Select` struct.
2
3 extern crate crossbeam_channel;
4 extern crate crossbeam_utils;
5
6 use std::any::Any;
7 use std::cell::Cell;
8 use std::thread;
9 use std::time::{Duration, Instant};
10
11 use crossbeam_channel::{after, bounded, tick, unbounded, Receiver, Select, TryRecvError};
12 use crossbeam_utils::thread::scope;
13
ms(ms: u64) -> Duration14 fn ms(ms: u64) -> Duration {
15 Duration::from_millis(ms)
16 }
17
18 #[test]
smoke1()19 fn smoke1() {
20 let (s1, r1) = unbounded::<usize>();
21 let (s2, r2) = unbounded::<usize>();
22
23 s1.send(1).unwrap();
24
25 let mut sel = Select::new();
26 let oper1 = sel.recv(&r1);
27 let oper2 = sel.recv(&r2);
28 let oper = sel.select();
29 match oper.index() {
30 i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
31 i if i == oper2 => panic!(),
32 _ => unreachable!(),
33 }
34
35 s2.send(2).unwrap();
36
37 let mut sel = Select::new();
38 let oper1 = sel.recv(&r1);
39 let oper2 = sel.recv(&r2);
40 let oper = sel.select();
41 match oper.index() {
42 i if i == oper1 => panic!(),
43 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
44 _ => unreachable!(),
45 }
46 }
47
48 #[test]
smoke2()49 fn smoke2() {
50 let (_s1, r1) = unbounded::<i32>();
51 let (_s2, r2) = unbounded::<i32>();
52 let (_s3, r3) = unbounded::<i32>();
53 let (_s4, r4) = unbounded::<i32>();
54 let (s5, r5) = unbounded::<i32>();
55
56 s5.send(5).unwrap();
57
58 let mut sel = Select::new();
59 let oper1 = sel.recv(&r1);
60 let oper2 = sel.recv(&r2);
61 let oper3 = sel.recv(&r3);
62 let oper4 = sel.recv(&r4);
63 let oper5 = sel.recv(&r5);
64 let oper = sel.select();
65 match oper.index() {
66 i if i == oper1 => panic!(),
67 i if i == oper2 => panic!(),
68 i if i == oper3 => panic!(),
69 i if i == oper4 => panic!(),
70 i if i == oper5 => assert_eq!(oper.recv(&r5), Ok(5)),
71 _ => unreachable!(),
72 }
73 }
74
75 #[test]
disconnected()76 fn disconnected() {
77 let (s1, r1) = unbounded::<i32>();
78 let (s2, r2) = unbounded::<i32>();
79
80 scope(|scope| {
81 scope.spawn(|_| {
82 drop(s1);
83 thread::sleep(ms(500));
84 s2.send(5).unwrap();
85 });
86
87 let mut sel = Select::new();
88 let oper1 = sel.recv(&r1);
89 let oper2 = sel.recv(&r2);
90 let oper = sel.select_timeout(ms(1000));
91 match oper {
92 Err(_) => panic!(),
93 Ok(oper) => match oper.index() {
94 i if i == oper1 => assert!(oper.recv(&r1).is_err()),
95 i if i == oper2 => panic!(),
96 _ => unreachable!(),
97 },
98 }
99
100 r2.recv().unwrap();
101 })
102 .unwrap();
103
104 let mut sel = Select::new();
105 let oper1 = sel.recv(&r1);
106 let oper2 = sel.recv(&r2);
107 let oper = sel.select_timeout(ms(1000));
108 match oper {
109 Err(_) => panic!(),
110 Ok(oper) => match oper.index() {
111 i if i == oper1 => assert!(oper.recv(&r1).is_err()),
112 i if i == oper2 => panic!(),
113 _ => unreachable!(),
114 },
115 }
116
117 scope(|scope| {
118 scope.spawn(|_| {
119 thread::sleep(ms(500));
120 drop(s2);
121 });
122
123 let mut sel = Select::new();
124 let oper1 = sel.recv(&r2);
125 let oper = sel.select_timeout(ms(1000));
126 match oper {
127 Err(_) => panic!(),
128 Ok(oper) => match oper.index() {
129 i if i == oper1 => assert!(oper.recv(&r2).is_err()),
130 _ => unreachable!(),
131 },
132 }
133 })
134 .unwrap();
135 }
136
137 #[test]
default()138 fn default() {
139 let (s1, r1) = unbounded::<i32>();
140 let (s2, r2) = unbounded::<i32>();
141
142 let mut sel = Select::new();
143 let _oper1 = sel.recv(&r1);
144 let _oper2 = sel.recv(&r2);
145 let oper = sel.try_select();
146 match oper {
147 Err(_) => {}
148 Ok(_) => panic!(),
149 }
150
151 drop(s1);
152
153 let mut sel = Select::new();
154 let oper1 = sel.recv(&r1);
155 let oper2 = sel.recv(&r2);
156 let oper = sel.try_select();
157 match oper {
158 Err(_) => panic!(),
159 Ok(oper) => match oper.index() {
160 i if i == oper1 => assert!(oper.recv(&r1).is_err()),
161 i if i == oper2 => panic!(),
162 _ => unreachable!(),
163 },
164 }
165
166 s2.send(2).unwrap();
167
168 let mut sel = Select::new();
169 let oper1 = sel.recv(&r2);
170 let oper = sel.try_select();
171 match oper {
172 Err(_) => panic!(),
173 Ok(oper) => match oper.index() {
174 i if i == oper1 => assert_eq!(oper.recv(&r2), Ok(2)),
175 _ => unreachable!(),
176 },
177 }
178
179 let mut sel = Select::new();
180 let _oper1 = sel.recv(&r2);
181 let oper = sel.try_select();
182 match oper {
183 Err(_) => {}
184 Ok(_) => panic!(),
185 }
186
187 let mut sel = Select::new();
188 let oper = sel.try_select();
189 match oper {
190 Err(_) => {}
191 Ok(_) => panic!(),
192 }
193 }
194
195 #[test]
timeout()196 fn timeout() {
197 let (_s1, r1) = unbounded::<i32>();
198 let (s2, r2) = unbounded::<i32>();
199
200 scope(|scope| {
201 scope.spawn(|_| {
202 thread::sleep(ms(1500));
203 s2.send(2).unwrap();
204 });
205
206 let mut sel = Select::new();
207 let oper1 = sel.recv(&r1);
208 let oper2 = sel.recv(&r2);
209 let oper = sel.select_timeout(ms(1000));
210 match oper {
211 Err(_) => {}
212 Ok(oper) => match oper.index() {
213 i if i == oper1 => panic!(),
214 i if i == oper2 => panic!(),
215 _ => unreachable!(),
216 },
217 }
218
219 let mut sel = Select::new();
220 let oper1 = sel.recv(&r1);
221 let oper2 = sel.recv(&r2);
222 let oper = sel.select_timeout(ms(1000));
223 match oper {
224 Err(_) => panic!(),
225 Ok(oper) => match oper.index() {
226 i if i == oper1 => panic!(),
227 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
228 _ => unreachable!(),
229 },
230 }
231 })
232 .unwrap();
233
234 scope(|scope| {
235 let (s, r) = unbounded::<i32>();
236
237 scope.spawn(move |_| {
238 thread::sleep(ms(500));
239 drop(s);
240 });
241
242 let mut sel = Select::new();
243 let oper = sel.select_timeout(ms(1000));
244 match oper {
245 Err(_) => {
246 let mut sel = Select::new();
247 let oper1 = sel.recv(&r);
248 let oper = sel.try_select();
249 match oper {
250 Err(_) => panic!(),
251 Ok(oper) => match oper.index() {
252 i if i == oper1 => assert!(oper.recv(&r).is_err()),
253 _ => unreachable!(),
254 },
255 }
256 }
257 Ok(_) => unreachable!(),
258 }
259 })
260 .unwrap();
261 }
262
263 #[test]
default_when_disconnected()264 fn default_when_disconnected() {
265 let (_, r) = unbounded::<i32>();
266
267 let mut sel = Select::new();
268 let oper1 = sel.recv(&r);
269 let oper = sel.try_select();
270 match oper {
271 Err(_) => panic!(),
272 Ok(oper) => match oper.index() {
273 i if i == oper1 => assert!(oper.recv(&r).is_err()),
274 _ => unreachable!(),
275 },
276 }
277
278 let (_, r) = unbounded::<i32>();
279
280 let mut sel = Select::new();
281 let oper1 = sel.recv(&r);
282 let oper = sel.select_timeout(ms(1000));
283 match oper {
284 Err(_) => panic!(),
285 Ok(oper) => match oper.index() {
286 i if i == oper1 => assert!(oper.recv(&r).is_err()),
287 _ => unreachable!(),
288 },
289 }
290
291 let (s, _) = bounded::<i32>(0);
292
293 let mut sel = Select::new();
294 let oper1 = sel.send(&s);
295 let oper = sel.try_select();
296 match oper {
297 Err(_) => panic!(),
298 Ok(oper) => match oper.index() {
299 i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
300 _ => unreachable!(),
301 },
302 }
303
304 let (s, _) = bounded::<i32>(0);
305
306 let mut sel = Select::new();
307 let oper1 = sel.send(&s);
308 let oper = sel.select_timeout(ms(1000));
309 match oper {
310 Err(_) => panic!(),
311 Ok(oper) => match oper.index() {
312 i if i == oper1 => assert!(oper.send(&s, 0).is_err()),
313 _ => unreachable!(),
314 },
315 }
316 }
317
318 #[test]
default_only()319 fn default_only() {
320 let start = Instant::now();
321
322 let mut sel = Select::new();
323 let oper = sel.try_select();
324 assert!(oper.is_err());
325 let now = Instant::now();
326 assert!(now - start <= ms(50));
327
328 let start = Instant::now();
329 let mut sel = Select::new();
330 let oper = sel.select_timeout(ms(500));
331 assert!(oper.is_err());
332 let now = Instant::now();
333 assert!(now - start >= ms(450));
334 assert!(now - start <= ms(550));
335 }
336
337 #[test]
unblocks()338 fn unblocks() {
339 let (s1, r1) = bounded::<i32>(0);
340 let (s2, r2) = bounded::<i32>(0);
341
342 scope(|scope| {
343 scope.spawn(|_| {
344 thread::sleep(ms(500));
345 s2.send(2).unwrap();
346 });
347
348 let mut sel = Select::new();
349 let oper1 = sel.recv(&r1);
350 let oper2 = sel.recv(&r2);
351 let oper = sel.select_timeout(ms(1000));
352 match oper {
353 Err(_) => panic!(),
354 Ok(oper) => match oper.index() {
355 i if i == oper1 => panic!(),
356 i if i == oper2 => assert_eq!(oper.recv(&r2), Ok(2)),
357 _ => unreachable!(),
358 },
359 }
360 })
361 .unwrap();
362
363 scope(|scope| {
364 scope.spawn(|_| {
365 thread::sleep(ms(500));
366 assert_eq!(r1.recv().unwrap(), 1);
367 });
368
369 let mut sel = Select::new();
370 let oper1 = sel.send(&s1);
371 let oper2 = sel.send(&s2);
372 let oper = sel.select_timeout(ms(1000));
373 match oper {
374 Err(_) => panic!(),
375 Ok(oper) => match oper.index() {
376 i if i == oper1 => oper.send(&s1, 1).unwrap(),
377 i if i == oper2 => panic!(),
378 _ => unreachable!(),
379 },
380 }
381 })
382 .unwrap();
383 }
384
385 #[test]
both_ready()386 fn both_ready() {
387 let (s1, r1) = bounded(0);
388 let (s2, r2) = bounded(0);
389
390 scope(|scope| {
391 scope.spawn(|_| {
392 thread::sleep(ms(500));
393 s1.send(1).unwrap();
394 assert_eq!(r2.recv().unwrap(), 2);
395 });
396
397 for _ in 0..2 {
398 let mut sel = Select::new();
399 let oper1 = sel.recv(&r1);
400 let oper2 = sel.send(&s2);
401 let oper = sel.select();
402 match oper.index() {
403 i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
404 i if i == oper2 => oper.send(&s2, 2).unwrap(),
405 _ => unreachable!(),
406 }
407 }
408 })
409 .unwrap();
410 }
411
412 #[test]
loop_try()413 fn loop_try() {
414 const RUNS: usize = 20;
415
416 for _ in 0..RUNS {
417 let (s1, r1) = bounded::<i32>(0);
418 let (s2, r2) = bounded::<i32>(0);
419 let (s_end, r_end) = bounded::<()>(0);
420
421 scope(|scope| {
422 scope.spawn(|_| loop {
423 let mut done = false;
424
425 let mut sel = Select::new();
426 let oper1 = sel.send(&s1);
427 let oper = sel.try_select();
428 match oper {
429 Err(_) => {}
430 Ok(oper) => match oper.index() {
431 i if i == oper1 => {
432 let _ = oper.send(&s1, 1);
433 done = true;
434 }
435 _ => unreachable!(),
436 },
437 }
438 if done {
439 break;
440 }
441
442 let mut sel = Select::new();
443 let oper1 = sel.recv(&r_end);
444 let oper = sel.try_select();
445 match oper {
446 Err(_) => {}
447 Ok(oper) => match oper.index() {
448 i if i == oper1 => {
449 let _ = oper.recv(&r_end);
450 done = true;
451 }
452 _ => unreachable!(),
453 },
454 }
455 if done {
456 break;
457 }
458 });
459
460 scope.spawn(|_| loop {
461 if let Ok(x) = r2.try_recv() {
462 assert_eq!(x, 2);
463 break;
464 }
465
466 let mut done = false;
467 let mut sel = Select::new();
468 let oper1 = sel.recv(&r_end);
469 let oper = sel.try_select();
470 match oper {
471 Err(_) => {}
472 Ok(oper) => match oper.index() {
473 i if i == oper1 => {
474 let _ = oper.recv(&r_end);
475 done = true;
476 }
477 _ => unreachable!(),
478 },
479 }
480 if done {
481 break;
482 }
483 });
484
485 scope.spawn(|_| {
486 thread::sleep(ms(500));
487
488 let mut sel = Select::new();
489 let oper1 = sel.recv(&r1);
490 let oper2 = sel.send(&s2);
491 let oper = sel.select_timeout(ms(1000));
492 match oper {
493 Err(_) => {}
494 Ok(oper) => match oper.index() {
495 i if i == oper1 => assert_eq!(oper.recv(&r1), Ok(1)),
496 i if i == oper2 => assert!(oper.send(&s2, 2).is_ok()),
497 _ => unreachable!(),
498 },
499 }
500
501 drop(s_end);
502 });
503 })
504 .unwrap();
505 }
506 }
507
508 #[test]
cloning1()509 fn cloning1() {
510 scope(|scope| {
511 let (s1, r1) = unbounded::<i32>();
512 let (_s2, r2) = unbounded::<i32>();
513 let (s3, r3) = unbounded::<()>();
514
515 scope.spawn(move |_| {
516 r3.recv().unwrap();
517 drop(s1.clone());
518 assert!(r3.try_recv().is_err());
519 s1.send(1).unwrap();
520 r3.recv().unwrap();
521 });
522
523 s3.send(()).unwrap();
524
525 let mut sel = Select::new();
526 let oper1 = sel.recv(&r1);
527 let oper2 = sel.recv(&r2);
528 let oper = sel.select();
529 match oper.index() {
530 i if i == oper1 => drop(oper.recv(&r1)),
531 i if i == oper2 => drop(oper.recv(&r2)),
532 _ => unreachable!(),
533 }
534
535 s3.send(()).unwrap();
536 })
537 .unwrap();
538 }
539
540 #[test]
cloning2()541 fn cloning2() {
542 let (s1, r1) = unbounded::<()>();
543 let (s2, r2) = unbounded::<()>();
544 let (_s3, _r3) = unbounded::<()>();
545
546 scope(|scope| {
547 scope.spawn(move |_| {
548 let mut sel = Select::new();
549 let oper1 = sel.recv(&r1);
550 let oper2 = sel.recv(&r2);
551 let oper = sel.select();
552 match oper.index() {
553 i if i == oper1 => panic!(),
554 i if i == oper2 => drop(oper.recv(&r2)),
555 _ => unreachable!(),
556 }
557 });
558
559 thread::sleep(ms(500));
560 drop(s1.clone());
561 s2.send(()).unwrap();
562 })
563 .unwrap();
564 }
565
566 #[test]
preflight1()567 fn preflight1() {
568 let (s, r) = unbounded();
569 s.send(()).unwrap();
570
571 let mut sel = Select::new();
572 let oper1 = sel.recv(&r);
573 let oper = sel.select();
574 match oper.index() {
575 i if i == oper1 => drop(oper.recv(&r)),
576 _ => unreachable!(),
577 }
578 }
579
580 #[test]
preflight2()581 fn preflight2() {
582 let (s, r) = unbounded();
583 drop(s.clone());
584 s.send(()).unwrap();
585 drop(s);
586
587 let mut sel = Select::new();
588 let oper1 = sel.recv(&r);
589 let oper = sel.select();
590 match oper.index() {
591 i if i == oper1 => assert_eq!(oper.recv(&r), Ok(())),
592 _ => unreachable!(),
593 }
594
595 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
596 }
597
598 #[test]
preflight3()599 fn preflight3() {
600 let (s, r) = unbounded();
601 drop(s.clone());
602 s.send(()).unwrap();
603 drop(s);
604 r.recv().unwrap();
605
606 let mut sel = Select::new();
607 let oper1 = sel.recv(&r);
608 let oper = sel.select();
609 match oper.index() {
610 i if i == oper1 => assert!(oper.recv(&r).is_err()),
611 _ => unreachable!(),
612 }
613 }
614
615 #[test]
duplicate_operations()616 fn duplicate_operations() {
617 let (s, r) = unbounded::<i32>();
618 let hit = vec![Cell::new(false); 4];
619
620 while hit.iter().map(|h| h.get()).any(|hit| !hit) {
621 let mut sel = Select::new();
622 let oper0 = sel.recv(&r);
623 let oper1 = sel.recv(&r);
624 let oper2 = sel.send(&s);
625 let oper3 = sel.send(&s);
626 let oper = sel.select();
627 match oper.index() {
628 i if i == oper0 => {
629 assert!(oper.recv(&r).is_ok());
630 hit[0].set(true);
631 }
632 i if i == oper1 => {
633 assert!(oper.recv(&r).is_ok());
634 hit[1].set(true);
635 }
636 i if i == oper2 => {
637 assert!(oper.send(&s, 0).is_ok());
638 hit[2].set(true);
639 }
640 i if i == oper3 => {
641 assert!(oper.send(&s, 0).is_ok());
642 hit[3].set(true);
643 }
644 _ => unreachable!(),
645 }
646 }
647 }
648
649 #[test]
nesting()650 fn nesting() {
651 let (s, r) = unbounded::<i32>();
652
653 let mut sel = Select::new();
654 let oper1 = sel.send(&s);
655 let oper = sel.select();
656 match oper.index() {
657 i if i == oper1 => {
658 assert!(oper.send(&s, 0).is_ok());
659
660 let mut sel = Select::new();
661 let oper1 = sel.recv(&r);
662 let oper = sel.select();
663 match oper.index() {
664 i if i == oper1 => {
665 assert_eq!(oper.recv(&r), Ok(0));
666
667 let mut sel = Select::new();
668 let oper1 = sel.send(&s);
669 let oper = sel.select();
670 match oper.index() {
671 i if i == oper1 => {
672 assert!(oper.send(&s, 1).is_ok());
673
674 let mut sel = Select::new();
675 let oper1 = sel.recv(&r);
676 let oper = sel.select();
677 match oper.index() {
678 i if i == oper1 => {
679 assert_eq!(oper.recv(&r), Ok(1));
680 }
681 _ => unreachable!(),
682 }
683 }
684 _ => unreachable!(),
685 }
686 }
687 _ => unreachable!(),
688 }
689 }
690 _ => unreachable!(),
691 }
692 }
693
694 #[test]
stress_recv()695 fn stress_recv() {
696 const COUNT: usize = 10_000;
697
698 let (s1, r1) = unbounded();
699 let (s2, r2) = bounded(5);
700 let (s3, r3) = bounded(100);
701
702 scope(|scope| {
703 scope.spawn(|_| {
704 for i in 0..COUNT {
705 s1.send(i).unwrap();
706 r3.recv().unwrap();
707
708 s2.send(i).unwrap();
709 r3.recv().unwrap();
710 }
711 });
712
713 for i in 0..COUNT {
714 for _ in 0..2 {
715 let mut sel = Select::new();
716 let oper1 = sel.recv(&r1);
717 let oper2 = sel.recv(&r2);
718 let oper = sel.select();
719 match oper.index() {
720 ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
721 ix if ix == oper2 => assert_eq!(oper.recv(&r2), Ok(i)),
722 _ => unreachable!(),
723 }
724
725 s3.send(()).unwrap();
726 }
727 }
728 })
729 .unwrap();
730 }
731
732 #[test]
stress_send()733 fn stress_send() {
734 const COUNT: usize = 10_000;
735
736 let (s1, r1) = bounded(0);
737 let (s2, r2) = bounded(0);
738 let (s3, r3) = bounded(100);
739
740 scope(|scope| {
741 scope.spawn(|_| {
742 for i in 0..COUNT {
743 assert_eq!(r1.recv().unwrap(), i);
744 assert_eq!(r2.recv().unwrap(), i);
745 r3.recv().unwrap();
746 }
747 });
748
749 for i in 0..COUNT {
750 for _ in 0..2 {
751 let mut sel = Select::new();
752 let oper1 = sel.send(&s1);
753 let oper2 = sel.send(&s2);
754 let oper = sel.select();
755 match oper.index() {
756 ix if ix == oper1 => assert!(oper.send(&s1, i).is_ok()),
757 ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
758 _ => unreachable!(),
759 }
760 }
761 s3.send(()).unwrap();
762 }
763 })
764 .unwrap();
765 }
766
767 #[test]
stress_mixed()768 fn stress_mixed() {
769 const COUNT: usize = 10_000;
770
771 let (s1, r1) = bounded(0);
772 let (s2, r2) = bounded(0);
773 let (s3, r3) = bounded(100);
774
775 scope(|scope| {
776 scope.spawn(|_| {
777 for i in 0..COUNT {
778 s1.send(i).unwrap();
779 assert_eq!(r2.recv().unwrap(), i);
780 r3.recv().unwrap();
781 }
782 });
783
784 for i in 0..COUNT {
785 for _ in 0..2 {
786 let mut sel = Select::new();
787 let oper1 = sel.recv(&r1);
788 let oper2 = sel.send(&s2);
789 let oper = sel.select();
790 match oper.index() {
791 ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
792 ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
793 _ => unreachable!(),
794 }
795 }
796 s3.send(()).unwrap();
797 }
798 })
799 .unwrap();
800 }
801
802 #[test]
stress_timeout_two_threads()803 fn stress_timeout_two_threads() {
804 const COUNT: usize = 20;
805
806 let (s, r) = bounded(2);
807
808 scope(|scope| {
809 scope.spawn(|_| {
810 for i in 0..COUNT {
811 if i % 2 == 0 {
812 thread::sleep(ms(500));
813 }
814
815 let done = false;
816 while !done {
817 let mut sel = Select::new();
818 let oper1 = sel.send(&s);
819 let oper = sel.select_timeout(ms(100));
820 match oper {
821 Err(_) => {}
822 Ok(oper) => match oper.index() {
823 ix if ix == oper1 => {
824 assert!(oper.send(&s, i).is_ok());
825 break;
826 }
827 _ => unreachable!(),
828 },
829 }
830 }
831 }
832 });
833
834 scope.spawn(|_| {
835 for i in 0..COUNT {
836 if i % 2 == 0 {
837 thread::sleep(ms(500));
838 }
839
840 let mut done = false;
841 while !done {
842 let mut sel = Select::new();
843 let oper1 = sel.recv(&r);
844 let oper = sel.select_timeout(ms(100));
845 match oper {
846 Err(_) => {}
847 Ok(oper) => match oper.index() {
848 ix if ix == oper1 => {
849 assert_eq!(oper.recv(&r), Ok(i));
850 done = true;
851 }
852 _ => unreachable!(),
853 },
854 }
855 }
856 }
857 });
858 })
859 .unwrap();
860 }
861
862 #[test]
send_recv_same_channel()863 fn send_recv_same_channel() {
864 let (s, r) = bounded::<i32>(0);
865 let mut sel = Select::new();
866 let oper1 = sel.send(&s);
867 let oper2 = sel.recv(&r);
868 let oper = sel.select_timeout(ms(100));
869 match oper {
870 Err(_) => {}
871 Ok(oper) => match oper.index() {
872 ix if ix == oper1 => panic!(),
873 ix if ix == oper2 => panic!(),
874 _ => unreachable!(),
875 },
876 }
877
878 let (s, r) = unbounded::<i32>();
879 let mut sel = Select::new();
880 let oper1 = sel.send(&s);
881 let oper2 = sel.recv(&r);
882 let oper = sel.select_timeout(ms(100));
883 match oper {
884 Err(_) => panic!(),
885 Ok(oper) => match oper.index() {
886 ix if ix == oper1 => assert!(oper.send(&s, 0).is_ok()),
887 ix if ix == oper2 => panic!(),
888 _ => unreachable!(),
889 },
890 }
891 }
892
893 #[test]
matching()894 fn matching() {
895 const THREADS: usize = 44;
896
897 let (s, r) = &bounded::<usize>(0);
898
899 scope(|scope| {
900 for i in 0..THREADS {
901 scope.spawn(move |_| {
902 let mut sel = Select::new();
903 let oper1 = sel.recv(&r);
904 let oper2 = sel.send(&s);
905 let oper = sel.select();
906 match oper.index() {
907 ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
908 ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
909 _ => unreachable!(),
910 }
911 });
912 }
913 })
914 .unwrap();
915
916 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
917 }
918
919 #[test]
matching_with_leftover()920 fn matching_with_leftover() {
921 const THREADS: usize = 55;
922
923 let (s, r) = &bounded::<usize>(0);
924
925 scope(|scope| {
926 for i in 0..THREADS {
927 scope.spawn(move |_| {
928 let mut sel = Select::new();
929 let oper1 = sel.recv(&r);
930 let oper2 = sel.send(&s);
931 let oper = sel.select();
932 match oper.index() {
933 ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
934 ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
935 _ => unreachable!(),
936 }
937 });
938 }
939 s.send(!0).unwrap();
940 })
941 .unwrap();
942
943 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
944 }
945
946 #[test]
channel_through_channel()947 fn channel_through_channel() {
948 const COUNT: usize = 1000;
949
950 type T = Box<dyn Any + Send>;
951
952 for cap in 0..3 {
953 let (s, r) = bounded::<T>(cap);
954
955 scope(|scope| {
956 scope.spawn(move |_| {
957 let mut s = s;
958
959 for _ in 0..COUNT {
960 let (new_s, new_r) = bounded(cap);
961 let new_r: T = Box::new(Some(new_r));
962
963 {
964 let mut sel = Select::new();
965 let oper1 = sel.send(&s);
966 let oper = sel.select();
967 match oper.index() {
968 ix if ix == oper1 => assert!(oper.send(&s, new_r).is_ok()),
969 _ => unreachable!(),
970 }
971 }
972
973 s = new_s;
974 }
975 });
976
977 scope.spawn(move |_| {
978 let mut r = r;
979
980 for _ in 0..COUNT {
981 let new = {
982 let mut sel = Select::new();
983 let oper1 = sel.recv(&r);
984 let oper = sel.select();
985 match oper.index() {
986 ix if ix == oper1 => oper
987 .recv(&r)
988 .unwrap()
989 .downcast_mut::<Option<Receiver<T>>>()
990 .unwrap()
991 .take()
992 .unwrap(),
993 _ => unreachable!(),
994 }
995 };
996 r = new;
997 }
998 });
999 })
1000 .unwrap();
1001 }
1002 }
1003
1004 #[test]
linearizable_try()1005 fn linearizable_try() {
1006 const COUNT: usize = 100_000;
1007
1008 for step in 0..2 {
1009 let (start_s, start_r) = bounded::<()>(0);
1010 let (end_s, end_r) = bounded::<()>(0);
1011
1012 let ((s1, r1), (s2, r2)) = if step == 0 {
1013 (bounded::<i32>(1), bounded::<i32>(1))
1014 } else {
1015 (unbounded::<i32>(), unbounded::<i32>())
1016 };
1017
1018 scope(|scope| {
1019 scope.spawn(|_| {
1020 for _ in 0..COUNT {
1021 start_s.send(()).unwrap();
1022
1023 s1.send(1).unwrap();
1024
1025 let mut sel = Select::new();
1026 let oper1 = sel.recv(&r1);
1027 let oper2 = sel.recv(&r2);
1028 let oper = sel.try_select();
1029 match oper {
1030 Err(_) => unreachable!(),
1031 Ok(oper) => match oper.index() {
1032 ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
1033 ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
1034 _ => unreachable!(),
1035 },
1036 }
1037
1038 end_s.send(()).unwrap();
1039 let _ = r2.try_recv();
1040 }
1041 });
1042
1043 for _ in 0..COUNT {
1044 start_r.recv().unwrap();
1045
1046 s2.send(1).unwrap();
1047 let _ = r1.try_recv();
1048
1049 end_r.recv().unwrap();
1050 }
1051 })
1052 .unwrap();
1053 }
1054 }
1055
1056 #[test]
linearizable_timeout()1057 fn linearizable_timeout() {
1058 const COUNT: usize = 100_000;
1059
1060 for step in 0..2 {
1061 let (start_s, start_r) = bounded::<()>(0);
1062 let (end_s, end_r) = bounded::<()>(0);
1063
1064 let ((s1, r1), (s2, r2)) = if step == 0 {
1065 (bounded::<i32>(1), bounded::<i32>(1))
1066 } else {
1067 (unbounded::<i32>(), unbounded::<i32>())
1068 };
1069
1070 scope(|scope| {
1071 scope.spawn(|_| {
1072 for _ in 0..COUNT {
1073 start_s.send(()).unwrap();
1074
1075 s1.send(1).unwrap();
1076
1077 let mut sel = Select::new();
1078 let oper1 = sel.recv(&r1);
1079 let oper2 = sel.recv(&r2);
1080 let oper = sel.select_timeout(ms(0));
1081 match oper {
1082 Err(_) => unreachable!(),
1083 Ok(oper) => match oper.index() {
1084 ix if ix == oper1 => assert!(oper.recv(&r1).is_ok()),
1085 ix if ix == oper2 => assert!(oper.recv(&r2).is_ok()),
1086 _ => unreachable!(),
1087 },
1088 }
1089
1090 end_s.send(()).unwrap();
1091 let _ = r2.try_recv();
1092 }
1093 });
1094
1095 for _ in 0..COUNT {
1096 start_r.recv().unwrap();
1097
1098 s2.send(1).unwrap();
1099 let _ = r1.try_recv();
1100
1101 end_r.recv().unwrap();
1102 }
1103 })
1104 .unwrap();
1105 }
1106 }
1107
1108 #[test]
fairness1()1109 fn fairness1() {
1110 const COUNT: usize = 10_000;
1111
1112 let (s1, r1) = bounded::<()>(COUNT);
1113 let (s2, r2) = unbounded::<()>();
1114
1115 for _ in 0..COUNT {
1116 s1.send(()).unwrap();
1117 s2.send(()).unwrap();
1118 }
1119
1120 let hits = vec![Cell::new(0usize); 4];
1121 for _ in 0..COUNT {
1122 let after = after(ms(0));
1123 let tick = tick(ms(0));
1124
1125 let mut sel = Select::new();
1126 let oper1 = sel.recv(&r1);
1127 let oper2 = sel.recv(&r2);
1128 let oper3 = sel.recv(&after);
1129 let oper4 = sel.recv(&tick);
1130 let oper = sel.select();
1131 match oper.index() {
1132 i if i == oper1 => {
1133 oper.recv(&r1).unwrap();
1134 hits[0].set(hits[0].get() + 1);
1135 }
1136 i if i == oper2 => {
1137 oper.recv(&r2).unwrap();
1138 hits[1].set(hits[1].get() + 1);
1139 }
1140 i if i == oper3 => {
1141 oper.recv(&after).unwrap();
1142 hits[2].set(hits[2].get() + 1);
1143 }
1144 i if i == oper4 => {
1145 oper.recv(&tick).unwrap();
1146 hits[3].set(hits[3].get() + 1);
1147 }
1148 _ => unreachable!(),
1149 }
1150 }
1151 assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2));
1152 }
1153
1154 #[test]
fairness2()1155 fn fairness2() {
1156 const COUNT: usize = 10_000;
1157
1158 let (s1, r1) = unbounded::<()>();
1159 let (s2, r2) = bounded::<()>(1);
1160 let (s3, r3) = bounded::<()>(0);
1161
1162 scope(|scope| {
1163 scope.spawn(|_| {
1164 for _ in 0..COUNT {
1165 let mut sel = Select::new();
1166 let mut oper1 = None;
1167 let mut oper2 = None;
1168 if s1.is_empty() {
1169 oper1 = Some(sel.send(&s1));
1170 }
1171 if s2.is_empty() {
1172 oper2 = Some(sel.send(&s2));
1173 }
1174 let oper3 = sel.send(&s3);
1175 let oper = sel.select();
1176 match oper.index() {
1177 i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()),
1178 i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()),
1179 i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()),
1180 _ => unreachable!(),
1181 }
1182 }
1183 });
1184
1185 let hits = vec![Cell::new(0usize); 3];
1186 for _ in 0..COUNT {
1187 let mut sel = Select::new();
1188 let oper1 = sel.recv(&r1);
1189 let oper2 = sel.recv(&r2);
1190 let oper3 = sel.recv(&r3);
1191 let oper = sel.select();
1192 match oper.index() {
1193 i if i == oper1 => {
1194 oper.recv(&r1).unwrap();
1195 hits[0].set(hits[0].get() + 1);
1196 }
1197 i if i == oper2 => {
1198 oper.recv(&r2).unwrap();
1199 hits[1].set(hits[1].get() + 1);
1200 }
1201 i if i == oper3 => {
1202 oper.recv(&r3).unwrap();
1203 hits[2].set(hits[2].get() + 1);
1204 }
1205 _ => unreachable!(),
1206 }
1207 }
1208 assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 50));
1209 })
1210 .unwrap();
1211 }
1212
1213 #[test]
sync_and_clone()1214 fn sync_and_clone() {
1215 const THREADS: usize = 20;
1216
1217 let (s, r) = &bounded::<usize>(0);
1218
1219 let mut sel = Select::new();
1220 let oper1 = sel.recv(&r);
1221 let oper2 = sel.send(&s);
1222 let sel = &sel;
1223
1224 scope(|scope| {
1225 for i in 0..THREADS {
1226 scope.spawn(move |_| {
1227 let mut sel = sel.clone();
1228 let oper = sel.select();
1229 match oper.index() {
1230 ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
1231 ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
1232 _ => unreachable!(),
1233 }
1234 });
1235 }
1236 })
1237 .unwrap();
1238
1239 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
1240 }
1241
1242 #[test]
send_and_clone()1243 fn send_and_clone() {
1244 const THREADS: usize = 20;
1245
1246 let (s, r) = &bounded::<usize>(0);
1247
1248 let mut sel = Select::new();
1249 let oper1 = sel.recv(&r);
1250 let oper2 = sel.send(&s);
1251
1252 scope(|scope| {
1253 for i in 0..THREADS {
1254 let mut sel = sel.clone();
1255 scope.spawn(move |_| {
1256 let oper = sel.select();
1257 match oper.index() {
1258 ix if ix == oper1 => assert_ne!(oper.recv(&r), Ok(i)),
1259 ix if ix == oper2 => assert!(oper.send(&s, i).is_ok()),
1260 _ => unreachable!(),
1261 }
1262 });
1263 }
1264 })
1265 .unwrap();
1266
1267 assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
1268 }
1269
1270 #[test]
reuse()1271 fn reuse() {
1272 const COUNT: usize = 10_000;
1273
1274 let (s1, r1) = bounded(0);
1275 let (s2, r2) = bounded(0);
1276 let (s3, r3) = bounded(100);
1277
1278 scope(|scope| {
1279 scope.spawn(|_| {
1280 for i in 0..COUNT {
1281 s1.send(i).unwrap();
1282 assert_eq!(r2.recv().unwrap(), i);
1283 r3.recv().unwrap();
1284 }
1285 });
1286
1287 let mut sel = Select::new();
1288 let oper1 = sel.recv(&r1);
1289 let oper2 = sel.send(&s2);
1290
1291 for i in 0..COUNT {
1292 for _ in 0..2 {
1293 let oper = sel.select();
1294 match oper.index() {
1295 ix if ix == oper1 => assert_eq!(oper.recv(&r1), Ok(i)),
1296 ix if ix == oper2 => assert!(oper.send(&s2, i).is_ok()),
1297 _ => unreachable!(),
1298 }
1299 }
1300 s3.send(()).unwrap();
1301 }
1302 })
1303 .unwrap();
1304 }
1305