1 //! Tests for channel readiness using the `Select` struct.
2
3 #![allow(clippy::drop_copy)]
4
5 use std::any::Any;
6 use std::cell::Cell;
7 use std::thread;
8 use std::time::{Duration, Instant};
9
10 use crossbeam_channel::{after, bounded, tick, unbounded};
11 use crossbeam_channel::{Receiver, Select, TryRecvError, TrySendError};
12 use crossbeam_utils::thread::scope;
13
ms(ms: u64) -> Duration14 fn ms(ms: u64) -> Duration {
15 Duration::from_millis(ms)
16 }
17
18 #[test]
smoke1()19 fn smoke1() {
20 let (s1, r1) = unbounded::<usize>();
21 let (s2, r2) = unbounded::<usize>();
22
23 s1.send(1).unwrap();
24
25 let mut sel = Select::new();
26 sel.recv(&r1);
27 sel.recv(&r2);
28 assert_eq!(sel.ready(), 0);
29 assert_eq!(r1.try_recv(), Ok(1));
30
31 s2.send(2).unwrap();
32
33 let mut sel = Select::new();
34 sel.recv(&r1);
35 sel.recv(&r2);
36 assert_eq!(sel.ready(), 1);
37 assert_eq!(r2.try_recv(), Ok(2));
38 }
39
40 #[test]
smoke2()41 fn smoke2() {
42 let (_s1, r1) = unbounded::<i32>();
43 let (_s2, r2) = unbounded::<i32>();
44 let (_s3, r3) = unbounded::<i32>();
45 let (_s4, r4) = unbounded::<i32>();
46 let (s5, r5) = unbounded::<i32>();
47
48 s5.send(5).unwrap();
49
50 let mut sel = Select::new();
51 sel.recv(&r1);
52 sel.recv(&r2);
53 sel.recv(&r3);
54 sel.recv(&r4);
55 sel.recv(&r5);
56 assert_eq!(sel.ready(), 4);
57 assert_eq!(r5.try_recv(), Ok(5));
58 }
59
60 #[test]
disconnected()61 fn disconnected() {
62 let (s1, r1) = unbounded::<i32>();
63 let (s2, r2) = unbounded::<i32>();
64
65 scope(|scope| {
66 scope.spawn(|_| {
67 drop(s1);
68 thread::sleep(ms(500));
69 s2.send(5).unwrap();
70 });
71
72 let mut sel = Select::new();
73 sel.recv(&r1);
74 sel.recv(&r2);
75 match sel.ready_timeout(ms(1000)) {
76 Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)),
77 _ => panic!(),
78 }
79
80 r2.recv().unwrap();
81 })
82 .unwrap();
83
84 let mut sel = Select::new();
85 sel.recv(&r1);
86 sel.recv(&r2);
87 match sel.ready_timeout(ms(1000)) {
88 Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)),
89 _ => panic!(),
90 }
91
92 scope(|scope| {
93 scope.spawn(|_| {
94 thread::sleep(ms(500));
95 drop(s2);
96 });
97
98 let mut sel = Select::new();
99 sel.recv(&r2);
100 match sel.ready_timeout(ms(1000)) {
101 Ok(0) => assert_eq!(r2.try_recv(), Err(TryRecvError::Disconnected)),
102 _ => panic!(),
103 }
104 })
105 .unwrap();
106 }
107
108 #[test]
default()109 fn default() {
110 let (s1, r1) = unbounded::<i32>();
111 let (s2, r2) = unbounded::<i32>();
112
113 let mut sel = Select::new();
114 sel.recv(&r1);
115 sel.recv(&r2);
116 assert!(sel.try_ready().is_err());
117
118 drop(s1);
119
120 let mut sel = Select::new();
121 sel.recv(&r1);
122 sel.recv(&r2);
123 match sel.try_ready() {
124 Ok(0) => assert!(r1.try_recv().is_err()),
125 _ => panic!(),
126 }
127
128 s2.send(2).unwrap();
129
130 let mut sel = Select::new();
131 sel.recv(&r2);
132 match sel.try_ready() {
133 Ok(0) => assert_eq!(r2.try_recv(), Ok(2)),
134 _ => panic!(),
135 }
136
137 let mut sel = Select::new();
138 sel.recv(&r2);
139 assert!(sel.try_ready().is_err());
140
141 let mut sel = Select::new();
142 assert!(sel.try_ready().is_err());
143 }
144
145 #[test]
timeout()146 fn timeout() {
147 let (_s1, r1) = unbounded::<i32>();
148 let (s2, r2) = unbounded::<i32>();
149
150 scope(|scope| {
151 scope.spawn(|_| {
152 thread::sleep(ms(1500));
153 s2.send(2).unwrap();
154 });
155
156 let mut sel = Select::new();
157 sel.recv(&r1);
158 sel.recv(&r2);
159 assert!(sel.ready_timeout(ms(1000)).is_err());
160
161 let mut sel = Select::new();
162 sel.recv(&r1);
163 sel.recv(&r2);
164 match sel.ready_timeout(ms(1000)) {
165 Ok(1) => assert_eq!(r2.try_recv(), Ok(2)),
166 _ => panic!(),
167 }
168 })
169 .unwrap();
170
171 scope(|scope| {
172 let (s, r) = unbounded::<i32>();
173
174 scope.spawn(move |_| {
175 thread::sleep(ms(500));
176 drop(s);
177 });
178
179 let mut sel = Select::new();
180 assert!(sel.ready_timeout(ms(1000)).is_err());
181
182 let mut sel = Select::new();
183 sel.recv(&r);
184 match sel.try_ready() {
185 Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
186 _ => panic!(),
187 }
188 })
189 .unwrap();
190 }
191
192 #[test]
default_when_disconnected()193 fn default_when_disconnected() {
194 let (_, r) = unbounded::<i32>();
195
196 let mut sel = Select::new();
197 sel.recv(&r);
198 match sel.try_ready() {
199 Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
200 _ => panic!(),
201 }
202
203 let (_, r) = unbounded::<i32>();
204
205 let mut sel = Select::new();
206 sel.recv(&r);
207 match sel.ready_timeout(ms(1000)) {
208 Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
209 _ => panic!(),
210 }
211
212 let (s, _) = bounded::<i32>(0);
213
214 let mut sel = Select::new();
215 sel.send(&s);
216 match sel.try_ready() {
217 Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))),
218 _ => panic!(),
219 }
220
221 let (s, _) = bounded::<i32>(0);
222
223 let mut sel = Select::new();
224 sel.send(&s);
225 match sel.ready_timeout(ms(1000)) {
226 Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))),
227 _ => panic!(),
228 }
229 }
230
231 #[test]
default_only()232 fn default_only() {
233 let start = Instant::now();
234
235 let mut sel = Select::new();
236 assert!(sel.try_ready().is_err());
237 let now = Instant::now();
238 assert!(now - start <= ms(50));
239
240 let start = Instant::now();
241 let mut sel = Select::new();
242 assert!(sel.ready_timeout(ms(500)).is_err());
243 let now = Instant::now();
244 assert!(now - start >= ms(450));
245 assert!(now - start <= ms(550));
246 }
247
248 #[test]
unblocks()249 fn unblocks() {
250 let (s1, r1) = bounded::<i32>(0);
251 let (s2, r2) = bounded::<i32>(0);
252
253 scope(|scope| {
254 scope.spawn(|_| {
255 thread::sleep(ms(500));
256 s2.send(2).unwrap();
257 });
258
259 let mut sel = Select::new();
260 sel.recv(&r1);
261 sel.recv(&r2);
262 match sel.ready_timeout(ms(1000)) {
263 Ok(1) => assert_eq!(r2.try_recv(), Ok(2)),
264 _ => panic!(),
265 }
266 })
267 .unwrap();
268
269 scope(|scope| {
270 scope.spawn(|_| {
271 thread::sleep(ms(500));
272 assert_eq!(r1.recv().unwrap(), 1);
273 });
274
275 let mut sel = Select::new();
276 let oper1 = sel.send(&s1);
277 let oper2 = sel.send(&s2);
278 let oper = sel.select_timeout(ms(1000));
279 match oper {
280 Err(_) => panic!(),
281 Ok(oper) => match oper.index() {
282 i if i == oper1 => oper.send(&s1, 1).unwrap(),
283 i if i == oper2 => panic!(),
284 _ => unreachable!(),
285 },
286 }
287 })
288 .unwrap();
289 }
290
291 #[test]
both_ready()292 fn both_ready() {
293 let (s1, r1) = bounded(0);
294 let (s2, r2) = bounded(0);
295
296 scope(|scope| {
297 scope.spawn(|_| {
298 thread::sleep(ms(500));
299 s1.send(1).unwrap();
300 assert_eq!(r2.recv().unwrap(), 2);
301 });
302
303 for _ in 0..2 {
304 let mut sel = Select::new();
305 sel.recv(&r1);
306 sel.send(&s2);
307 match sel.ready() {
308 0 => assert_eq!(r1.try_recv(), Ok(1)),
309 1 => s2.try_send(2).unwrap(),
310 _ => panic!(),
311 }
312 }
313 })
314 .unwrap();
315 }
316
317 #[test]
cloning1()318 fn cloning1() {
319 scope(|scope| {
320 let (s1, r1) = unbounded::<i32>();
321 let (_s2, r2) = unbounded::<i32>();
322 let (s3, r3) = unbounded::<()>();
323
324 scope.spawn(move |_| {
325 r3.recv().unwrap();
326 drop(s1.clone());
327 assert!(r3.try_recv().is_err());
328 s1.send(1).unwrap();
329 r3.recv().unwrap();
330 });
331
332 s3.send(()).unwrap();
333
334 let mut sel = Select::new();
335 sel.recv(&r1);
336 sel.recv(&r2);
337 match sel.ready() {
338 0 => drop(r1.try_recv()),
339 1 => drop(r2.try_recv()),
340 _ => panic!(),
341 }
342
343 s3.send(()).unwrap();
344 })
345 .unwrap();
346 }
347
348 #[test]
cloning2()349 fn cloning2() {
350 let (s1, r1) = unbounded::<()>();
351 let (s2, r2) = unbounded::<()>();
352 let (_s3, _r3) = unbounded::<()>();
353
354 scope(|scope| {
355 scope.spawn(move |_| {
356 let mut sel = Select::new();
357 sel.recv(&r1);
358 sel.recv(&r2);
359 match sel.ready() {
360 0 => panic!(),
361 1 => drop(r2.try_recv()),
362 _ => panic!(),
363 }
364 });
365
366 thread::sleep(ms(500));
367 drop(s1.clone());
368 s2.send(()).unwrap();
369 })
370 .unwrap();
371 }
372
373 #[test]
preflight1()374 fn preflight1() {
375 let (s, r) = unbounded();
376 s.send(()).unwrap();
377
378 let mut sel = Select::new();
379 sel.recv(&r);
380 match sel.ready() {
381 0 => drop(r.try_recv()),
382 _ => panic!(),
383 }
384 }
385
386 #[test]
preflight2()387 fn preflight2() {
388 let (s, r) = unbounded();
389 drop(s.clone());
390 s.send(()).unwrap();
391 drop(s);
392
393 let mut sel = Select::new();
394 sel.recv(&r);
395 match sel.ready() {
396 0 => assert_eq!(r.try_recv(), Ok(())),
397 _ => panic!(),
398 }
399
400 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
401 }
402
403 #[test]
preflight3()404 fn preflight3() {
405 let (s, r) = unbounded();
406 drop(s.clone());
407 s.send(()).unwrap();
408 drop(s);
409 r.recv().unwrap();
410
411 let mut sel = Select::new();
412 sel.recv(&r);
413 match sel.ready() {
414 0 => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
415 _ => panic!(),
416 }
417 }
418
419 #[test]
duplicate_operations()420 fn duplicate_operations() {
421 let (s, r) = unbounded::<i32>();
422 let hit = vec![Cell::new(false); 4];
423
424 while hit.iter().map(|h| h.get()).any(|hit| !hit) {
425 let mut sel = Select::new();
426 sel.recv(&r);
427 sel.recv(&r);
428 sel.send(&s);
429 sel.send(&s);
430 match sel.ready() {
431 0 => {
432 assert!(r.try_recv().is_ok());
433 hit[0].set(true);
434 }
435 1 => {
436 assert!(r.try_recv().is_ok());
437 hit[1].set(true);
438 }
439 2 => {
440 assert!(s.try_send(0).is_ok());
441 hit[2].set(true);
442 }
443 3 => {
444 assert!(s.try_send(0).is_ok());
445 hit[3].set(true);
446 }
447 _ => panic!(),
448 }
449 }
450 }
451
452 #[test]
nesting()453 fn nesting() {
454 let (s, r) = unbounded::<i32>();
455
456 let mut sel = Select::new();
457 sel.send(&s);
458 match sel.ready() {
459 0 => {
460 assert!(s.try_send(0).is_ok());
461
462 let mut sel = Select::new();
463 sel.recv(&r);
464 match sel.ready() {
465 0 => {
466 assert_eq!(r.try_recv(), Ok(0));
467
468 let mut sel = Select::new();
469 sel.send(&s);
470 match sel.ready() {
471 0 => {
472 assert!(s.try_send(1).is_ok());
473
474 let mut sel = Select::new();
475 sel.recv(&r);
476 match sel.ready() {
477 0 => {
478 assert_eq!(r.try_recv(), Ok(1));
479 }
480 _ => panic!(),
481 }
482 }
483 _ => panic!(),
484 }
485 }
486 _ => panic!(),
487 }
488 }
489 _ => panic!(),
490 }
491 }
492
493 #[test]
stress_recv()494 fn stress_recv() {
495 #[cfg(miri)]
496 const COUNT: usize = 100;
497 #[cfg(not(miri))]
498 const COUNT: usize = 10_000;
499
500 let (s1, r1) = unbounded();
501 let (s2, r2) = bounded(5);
502 let (s3, r3) = bounded(0);
503
504 scope(|scope| {
505 scope.spawn(|_| {
506 for i in 0..COUNT {
507 s1.send(i).unwrap();
508 r3.recv().unwrap();
509
510 s2.send(i).unwrap();
511 r3.recv().unwrap();
512 }
513 });
514
515 for i in 0..COUNT {
516 for _ in 0..2 {
517 let mut sel = Select::new();
518 sel.recv(&r1);
519 sel.recv(&r2);
520 match sel.ready() {
521 0 => assert_eq!(r1.try_recv(), Ok(i)),
522 1 => assert_eq!(r2.try_recv(), Ok(i)),
523 _ => panic!(),
524 }
525
526 s3.send(()).unwrap();
527 }
528 }
529 })
530 .unwrap();
531 }
532
533 #[test]
stress_send()534 fn stress_send() {
535 #[cfg(miri)]
536 const COUNT: usize = 100;
537 #[cfg(not(miri))]
538 const COUNT: usize = 10_000;
539
540 let (s1, r1) = bounded(0);
541 let (s2, r2) = bounded(0);
542 let (s3, r3) = bounded(100);
543
544 scope(|scope| {
545 scope.spawn(|_| {
546 for i in 0..COUNT {
547 assert_eq!(r1.recv().unwrap(), i);
548 assert_eq!(r2.recv().unwrap(), i);
549 r3.recv().unwrap();
550 }
551 });
552
553 for i in 0..COUNT {
554 for _ in 0..2 {
555 let mut sel = Select::new();
556 sel.send(&s1);
557 sel.send(&s2);
558 match sel.ready() {
559 0 => assert!(s1.try_send(i).is_ok()),
560 1 => assert!(s2.try_send(i).is_ok()),
561 _ => panic!(),
562 }
563 }
564 s3.send(()).unwrap();
565 }
566 })
567 .unwrap();
568 }
569
570 #[test]
stress_mixed()571 fn stress_mixed() {
572 #[cfg(miri)]
573 const COUNT: usize = 100;
574 #[cfg(not(miri))]
575 const COUNT: usize = 10_000;
576
577 let (s1, r1) = bounded(0);
578 let (s2, r2) = bounded(0);
579 let (s3, r3) = bounded(100);
580
581 scope(|scope| {
582 scope.spawn(|_| {
583 for i in 0..COUNT {
584 s1.send(i).unwrap();
585 assert_eq!(r2.recv().unwrap(), i);
586 r3.recv().unwrap();
587 }
588 });
589
590 for i in 0..COUNT {
591 for _ in 0..2 {
592 let mut sel = Select::new();
593 sel.recv(&r1);
594 sel.send(&s2);
595 match sel.ready() {
596 0 => assert_eq!(r1.try_recv(), Ok(i)),
597 1 => assert!(s2.try_send(i).is_ok()),
598 _ => panic!(),
599 }
600 }
601 s3.send(()).unwrap();
602 }
603 })
604 .unwrap();
605 }
606
607 #[test]
stress_timeout_two_threads()608 fn stress_timeout_two_threads() {
609 const COUNT: usize = 20;
610
611 let (s, r) = bounded(2);
612
613 scope(|scope| {
614 scope.spawn(|_| {
615 for i in 0..COUNT {
616 if i % 2 == 0 {
617 thread::sleep(ms(500));
618 }
619
620 loop {
621 let mut sel = Select::new();
622 sel.send(&s);
623 match sel.ready_timeout(ms(100)) {
624 Err(_) => {}
625 Ok(0) => {
626 assert!(s.try_send(i).is_ok());
627 break;
628 }
629 Ok(_) => panic!(),
630 }
631 }
632 }
633 });
634
635 scope.spawn(|_| {
636 for i in 0..COUNT {
637 if i % 2 == 0 {
638 thread::sleep(ms(500));
639 }
640
641 loop {
642 let mut sel = Select::new();
643 sel.recv(&r);
644 match sel.ready_timeout(ms(100)) {
645 Err(_) => {}
646 Ok(0) => {
647 assert_eq!(r.try_recv(), Ok(i));
648 break;
649 }
650 Ok(_) => panic!(),
651 }
652 }
653 }
654 });
655 })
656 .unwrap();
657 }
658
659 #[test]
send_recv_same_channel()660 fn send_recv_same_channel() {
661 let (s, r) = bounded::<i32>(0);
662 let mut sel = Select::new();
663 sel.send(&s);
664 sel.recv(&r);
665 assert!(sel.ready_timeout(ms(100)).is_err());
666
667 let (s, r) = unbounded::<i32>();
668 let mut sel = Select::new();
669 sel.send(&s);
670 sel.recv(&r);
671 match sel.ready_timeout(ms(100)) {
672 Err(_) => panic!(),
673 Ok(0) => assert!(s.try_send(0).is_ok()),
674 Ok(_) => panic!(),
675 }
676 }
677
678 #[test]
channel_through_channel()679 fn channel_through_channel() {
680 #[cfg(miri)]
681 const COUNT: usize = 100;
682 #[cfg(not(miri))]
683 const COUNT: usize = 1000;
684
685 type T = Box<dyn Any + Send>;
686
687 for cap in 1..4 {
688 let (s, r) = bounded::<T>(cap);
689
690 scope(|scope| {
691 scope.spawn(move |_| {
692 let mut s = s;
693
694 for _ in 0..COUNT {
695 let (new_s, new_r) = bounded(cap);
696 let new_r: T = Box::new(Some(new_r));
697
698 {
699 let mut sel = Select::new();
700 sel.send(&s);
701 match sel.ready() {
702 0 => assert!(s.try_send(new_r).is_ok()),
703 _ => panic!(),
704 }
705 }
706
707 s = new_s;
708 }
709 });
710
711 scope.spawn(move |_| {
712 let mut r = r;
713
714 for _ in 0..COUNT {
715 let new = {
716 let mut sel = Select::new();
717 sel.recv(&r);
718 match sel.ready() {
719 0 => r
720 .try_recv()
721 .unwrap()
722 .downcast_mut::<Option<Receiver<T>>>()
723 .unwrap()
724 .take()
725 .unwrap(),
726 _ => panic!(),
727 }
728 };
729 r = new;
730 }
731 });
732 })
733 .unwrap();
734 }
735 }
736
737 #[test]
fairness1()738 fn fairness1() {
739 #[cfg(miri)]
740 const COUNT: usize = 100;
741 #[cfg(not(miri))]
742 const COUNT: usize = 10_000;
743
744 let (s1, r1) = bounded::<()>(COUNT);
745 let (s2, r2) = unbounded::<()>();
746
747 for _ in 0..COUNT {
748 s1.send(()).unwrap();
749 s2.send(()).unwrap();
750 }
751
752 let hits = vec![Cell::new(0usize); 4];
753 for _ in 0..COUNT {
754 let after = after(ms(0));
755 let tick = tick(ms(0));
756
757 let mut sel = Select::new();
758 sel.recv(&r1);
759 sel.recv(&r2);
760 sel.recv(&after);
761 sel.recv(&tick);
762 match sel.ready() {
763 0 => {
764 r1.try_recv().unwrap();
765 hits[0].set(hits[0].get() + 1);
766 }
767 1 => {
768 r2.try_recv().unwrap();
769 hits[1].set(hits[1].get() + 1);
770 }
771 2 => {
772 after.try_recv().unwrap();
773 hits[2].set(hits[2].get() + 1);
774 }
775 3 => {
776 tick.try_recv().unwrap();
777 hits[3].set(hits[3].get() + 1);
778 }
779 _ => panic!(),
780 }
781 }
782 assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2));
783 }
784
785 #[test]
fairness2()786 fn fairness2() {
787 #[cfg(miri)]
788 const COUNT: usize = 100;
789 #[cfg(not(miri))]
790 const COUNT: usize = 100_000;
791
792 let (s1, r1) = unbounded::<()>();
793 let (s2, r2) = bounded::<()>(1);
794 let (s3, r3) = bounded::<()>(0);
795
796 scope(|scope| {
797 scope.spawn(|_| {
798 for _ in 0..COUNT {
799 let mut sel = Select::new();
800 let mut oper1 = None;
801 let mut oper2 = None;
802 if s1.is_empty() {
803 oper1 = Some(sel.send(&s1));
804 }
805 if s2.is_empty() {
806 oper2 = Some(sel.send(&s2));
807 }
808 let oper3 = sel.send(&s3);
809 let oper = sel.select();
810 match oper.index() {
811 i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()),
812 i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()),
813 i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()),
814 _ => unreachable!(),
815 }
816 }
817 });
818
819 let hits = vec![Cell::new(0usize); 3];
820 for _ in 0..COUNT {
821 let mut sel = Select::new();
822 sel.recv(&r1);
823 sel.recv(&r2);
824 sel.recv(&r3);
825 loop {
826 match sel.ready() {
827 0 => {
828 if r1.try_recv().is_ok() {
829 hits[0].set(hits[0].get() + 1);
830 break;
831 }
832 }
833 1 => {
834 if r2.try_recv().is_ok() {
835 hits[1].set(hits[1].get() + 1);
836 break;
837 }
838 }
839 2 => {
840 if r3.try_recv().is_ok() {
841 hits[2].set(hits[2].get() + 1);
842 break;
843 }
844 }
845 _ => unreachable!(),
846 }
847 }
848 }
849 assert!(hits.iter().all(|x| x.get() > 0));
850 })
851 .unwrap();
852 }
853