1 //! Tests for channel readiness using the `Select` struct.
2
3 use std::any::Any;
4 use std::cell::Cell;
5 use std::thread;
6 use std::time::{Duration, Instant};
7
8 use crossbeam_channel::{after, bounded, tick, unbounded};
9 use crossbeam_channel::{Receiver, Select, TryRecvError, TrySendError};
10 use crossbeam_utils::thread::scope;
11
ms(ms: u64) -> Duration12 fn ms(ms: u64) -> Duration {
13 Duration::from_millis(ms)
14 }
15
16 #[test]
smoke1()17 fn smoke1() {
18 let (s1, r1) = unbounded::<usize>();
19 let (s2, r2) = unbounded::<usize>();
20
21 s1.send(1).unwrap();
22
23 let mut sel = Select::new();
24 sel.recv(&r1);
25 sel.recv(&r2);
26 assert_eq!(sel.ready(), 0);
27 assert_eq!(r1.try_recv(), Ok(1));
28
29 s2.send(2).unwrap();
30
31 let mut sel = Select::new();
32 sel.recv(&r1);
33 sel.recv(&r2);
34 assert_eq!(sel.ready(), 1);
35 assert_eq!(r2.try_recv(), Ok(2));
36 }
37
38 #[test]
smoke2()39 fn smoke2() {
40 let (_s1, r1) = unbounded::<i32>();
41 let (_s2, r2) = unbounded::<i32>();
42 let (_s3, r3) = unbounded::<i32>();
43 let (_s4, r4) = unbounded::<i32>();
44 let (s5, r5) = unbounded::<i32>();
45
46 s5.send(5).unwrap();
47
48 let mut sel = Select::new();
49 sel.recv(&r1);
50 sel.recv(&r2);
51 sel.recv(&r3);
52 sel.recv(&r4);
53 sel.recv(&r5);
54 assert_eq!(sel.ready(), 4);
55 assert_eq!(r5.try_recv(), Ok(5));
56 }
57
58 #[test]
disconnected()59 fn disconnected() {
60 let (s1, r1) = unbounded::<i32>();
61 let (s2, r2) = unbounded::<i32>();
62
63 scope(|scope| {
64 scope.spawn(|_| {
65 drop(s1);
66 thread::sleep(ms(500));
67 s2.send(5).unwrap();
68 });
69
70 let mut sel = Select::new();
71 sel.recv(&r1);
72 sel.recv(&r2);
73 match sel.ready_timeout(ms(1000)) {
74 Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)),
75 _ => panic!(),
76 }
77
78 r2.recv().unwrap();
79 })
80 .unwrap();
81
82 let mut sel = Select::new();
83 sel.recv(&r1);
84 sel.recv(&r2);
85 match sel.ready_timeout(ms(1000)) {
86 Ok(0) => assert_eq!(r1.try_recv(), Err(TryRecvError::Disconnected)),
87 _ => panic!(),
88 }
89
90 scope(|scope| {
91 scope.spawn(|_| {
92 thread::sleep(ms(500));
93 drop(s2);
94 });
95
96 let mut sel = Select::new();
97 sel.recv(&r2);
98 match sel.ready_timeout(ms(1000)) {
99 Ok(0) => assert_eq!(r2.try_recv(), Err(TryRecvError::Disconnected)),
100 _ => panic!(),
101 }
102 })
103 .unwrap();
104 }
105
106 #[test]
default()107 fn default() {
108 let (s1, r1) = unbounded::<i32>();
109 let (s2, r2) = unbounded::<i32>();
110
111 let mut sel = Select::new();
112 sel.recv(&r1);
113 sel.recv(&r2);
114 assert!(sel.try_ready().is_err());
115
116 drop(s1);
117
118 let mut sel = Select::new();
119 sel.recv(&r1);
120 sel.recv(&r2);
121 match sel.try_ready() {
122 Ok(0) => assert!(r1.try_recv().is_err()),
123 _ => panic!(),
124 }
125
126 s2.send(2).unwrap();
127
128 let mut sel = Select::new();
129 sel.recv(&r2);
130 match sel.try_ready() {
131 Ok(0) => assert_eq!(r2.try_recv(), Ok(2)),
132 _ => panic!(),
133 }
134
135 let mut sel = Select::new();
136 sel.recv(&r2);
137 assert!(sel.try_ready().is_err());
138
139 let mut sel = Select::new();
140 assert!(sel.try_ready().is_err());
141 }
142
143 #[test]
timeout()144 fn timeout() {
145 let (_s1, r1) = unbounded::<i32>();
146 let (s2, r2) = unbounded::<i32>();
147
148 scope(|scope| {
149 scope.spawn(|_| {
150 thread::sleep(ms(1500));
151 s2.send(2).unwrap();
152 });
153
154 let mut sel = Select::new();
155 sel.recv(&r1);
156 sel.recv(&r2);
157 assert!(sel.ready_timeout(ms(1000)).is_err());
158
159 let mut sel = Select::new();
160 sel.recv(&r1);
161 sel.recv(&r2);
162 match sel.ready_timeout(ms(1000)) {
163 Ok(1) => assert_eq!(r2.try_recv(), Ok(2)),
164 _ => panic!(),
165 }
166 })
167 .unwrap();
168
169 scope(|scope| {
170 let (s, r) = unbounded::<i32>();
171
172 scope.spawn(move |_| {
173 thread::sleep(ms(500));
174 drop(s);
175 });
176
177 let mut sel = Select::new();
178 assert!(sel.ready_timeout(ms(1000)).is_err());
179
180 let mut sel = Select::new();
181 sel.recv(&r);
182 match sel.try_ready() {
183 Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
184 _ => panic!(),
185 }
186 })
187 .unwrap();
188 }
189
190 #[test]
default_when_disconnected()191 fn default_when_disconnected() {
192 let (_, r) = unbounded::<i32>();
193
194 let mut sel = Select::new();
195 sel.recv(&r);
196 match sel.try_ready() {
197 Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
198 _ => panic!(),
199 }
200
201 let (_, r) = unbounded::<i32>();
202
203 let mut sel = Select::new();
204 sel.recv(&r);
205 match sel.ready_timeout(ms(1000)) {
206 Ok(0) => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
207 _ => panic!(),
208 }
209
210 let (s, _) = bounded::<i32>(0);
211
212 let mut sel = Select::new();
213 sel.send(&s);
214 match sel.try_ready() {
215 Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))),
216 _ => panic!(),
217 }
218
219 let (s, _) = bounded::<i32>(0);
220
221 let mut sel = Select::new();
222 sel.send(&s);
223 match sel.ready_timeout(ms(1000)) {
224 Ok(0) => assert_eq!(s.try_send(0), Err(TrySendError::Disconnected(0))),
225 _ => panic!(),
226 }
227 }
228
229 #[test]
default_only()230 fn default_only() {
231 let start = Instant::now();
232
233 let mut sel = Select::new();
234 assert!(sel.try_ready().is_err());
235 let now = Instant::now();
236 assert!(now - start <= ms(50));
237
238 let start = Instant::now();
239 let mut sel = Select::new();
240 assert!(sel.ready_timeout(ms(500)).is_err());
241 let now = Instant::now();
242 assert!(now - start >= ms(450));
243 assert!(now - start <= ms(550));
244 }
245
246 #[test]
unblocks()247 fn unblocks() {
248 let (s1, r1) = bounded::<i32>(0);
249 let (s2, r2) = bounded::<i32>(0);
250
251 scope(|scope| {
252 scope.spawn(|_| {
253 thread::sleep(ms(500));
254 s2.send(2).unwrap();
255 });
256
257 let mut sel = Select::new();
258 sel.recv(&r1);
259 sel.recv(&r2);
260 match sel.ready_timeout(ms(1000)) {
261 Ok(1) => assert_eq!(r2.try_recv(), Ok(2)),
262 _ => panic!(),
263 }
264 })
265 .unwrap();
266
267 scope(|scope| {
268 scope.spawn(|_| {
269 thread::sleep(ms(500));
270 assert_eq!(r1.recv().unwrap(), 1);
271 });
272
273 let mut sel = Select::new();
274 let oper1 = sel.send(&s1);
275 let oper2 = sel.send(&s2);
276 let oper = sel.select_timeout(ms(1000));
277 match oper {
278 Err(_) => panic!(),
279 Ok(oper) => match oper.index() {
280 i if i == oper1 => oper.send(&s1, 1).unwrap(),
281 i if i == oper2 => panic!(),
282 _ => unreachable!(),
283 },
284 }
285 })
286 .unwrap();
287 }
288
289 #[test]
both_ready()290 fn both_ready() {
291 let (s1, r1) = bounded(0);
292 let (s2, r2) = bounded(0);
293
294 scope(|scope| {
295 scope.spawn(|_| {
296 thread::sleep(ms(500));
297 s1.send(1).unwrap();
298 assert_eq!(r2.recv().unwrap(), 2);
299 });
300
301 for _ in 0..2 {
302 let mut sel = Select::new();
303 sel.recv(&r1);
304 sel.send(&s2);
305 match sel.ready() {
306 0 => assert_eq!(r1.try_recv(), Ok(1)),
307 1 => s2.try_send(2).unwrap(),
308 _ => panic!(),
309 }
310 }
311 })
312 .unwrap();
313 }
314
315 #[test]
cloning1()316 fn cloning1() {
317 scope(|scope| {
318 let (s1, r1) = unbounded::<i32>();
319 let (_s2, r2) = unbounded::<i32>();
320 let (s3, r3) = unbounded::<()>();
321
322 scope.spawn(move |_| {
323 r3.recv().unwrap();
324 drop(s1.clone());
325 assert!(r3.try_recv().is_err());
326 s1.send(1).unwrap();
327 r3.recv().unwrap();
328 });
329
330 s3.send(()).unwrap();
331
332 let mut sel = Select::new();
333 sel.recv(&r1);
334 sel.recv(&r2);
335 match sel.ready() {
336 0 => drop(r1.try_recv()),
337 1 => drop(r2.try_recv()),
338 _ => panic!(),
339 }
340
341 s3.send(()).unwrap();
342 })
343 .unwrap();
344 }
345
346 #[test]
cloning2()347 fn cloning2() {
348 let (s1, r1) = unbounded::<()>();
349 let (s2, r2) = unbounded::<()>();
350 let (_s3, _r3) = unbounded::<()>();
351
352 scope(|scope| {
353 scope.spawn(move |_| {
354 let mut sel = Select::new();
355 sel.recv(&r1);
356 sel.recv(&r2);
357 match sel.ready() {
358 0 => panic!(),
359 1 => drop(r2.try_recv()),
360 _ => panic!(),
361 }
362 });
363
364 thread::sleep(ms(500));
365 drop(s1.clone());
366 s2.send(()).unwrap();
367 })
368 .unwrap();
369 }
370
371 #[test]
preflight1()372 fn preflight1() {
373 let (s, r) = unbounded();
374 s.send(()).unwrap();
375
376 let mut sel = Select::new();
377 sel.recv(&r);
378 match sel.ready() {
379 0 => drop(r.try_recv()),
380 _ => panic!(),
381 }
382 }
383
384 #[test]
preflight2()385 fn preflight2() {
386 let (s, r) = unbounded();
387 drop(s.clone());
388 s.send(()).unwrap();
389 drop(s);
390
391 let mut sel = Select::new();
392 sel.recv(&r);
393 match sel.ready() {
394 0 => assert_eq!(r.try_recv(), Ok(())),
395 _ => panic!(),
396 }
397
398 assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected));
399 }
400
401 #[test]
preflight3()402 fn preflight3() {
403 let (s, r) = unbounded();
404 drop(s.clone());
405 s.send(()).unwrap();
406 drop(s);
407 r.recv().unwrap();
408
409 let mut sel = Select::new();
410 sel.recv(&r);
411 match sel.ready() {
412 0 => assert_eq!(r.try_recv(), Err(TryRecvError::Disconnected)),
413 _ => panic!(),
414 }
415 }
416
417 #[test]
duplicate_operations()418 fn duplicate_operations() {
419 let (s, r) = unbounded::<i32>();
420 let hit = vec![Cell::new(false); 4];
421
422 while hit.iter().map(|h| h.get()).any(|hit| !hit) {
423 let mut sel = Select::new();
424 sel.recv(&r);
425 sel.recv(&r);
426 sel.send(&s);
427 sel.send(&s);
428 match sel.ready() {
429 0 => {
430 assert!(r.try_recv().is_ok());
431 hit[0].set(true);
432 }
433 1 => {
434 assert!(r.try_recv().is_ok());
435 hit[1].set(true);
436 }
437 2 => {
438 assert!(s.try_send(0).is_ok());
439 hit[2].set(true);
440 }
441 3 => {
442 assert!(s.try_send(0).is_ok());
443 hit[3].set(true);
444 }
445 _ => panic!(),
446 }
447 }
448 }
449
450 #[test]
nesting()451 fn nesting() {
452 let (s, r) = unbounded::<i32>();
453
454 let mut sel = Select::new();
455 sel.send(&s);
456 match sel.ready() {
457 0 => {
458 assert!(s.try_send(0).is_ok());
459
460 let mut sel = Select::new();
461 sel.recv(&r);
462 match sel.ready() {
463 0 => {
464 assert_eq!(r.try_recv(), Ok(0));
465
466 let mut sel = Select::new();
467 sel.send(&s);
468 match sel.ready() {
469 0 => {
470 assert!(s.try_send(1).is_ok());
471
472 let mut sel = Select::new();
473 sel.recv(&r);
474 match sel.ready() {
475 0 => {
476 assert_eq!(r.try_recv(), Ok(1));
477 }
478 _ => panic!(),
479 }
480 }
481 _ => panic!(),
482 }
483 }
484 _ => panic!(),
485 }
486 }
487 _ => panic!(),
488 }
489 }
490
491 #[test]
stress_recv()492 fn stress_recv() {
493 const COUNT: usize = 10_000;
494
495 let (s1, r1) = unbounded();
496 let (s2, r2) = bounded(5);
497 let (s3, r3) = bounded(0);
498
499 scope(|scope| {
500 scope.spawn(|_| {
501 for i in 0..COUNT {
502 s1.send(i).unwrap();
503 r3.recv().unwrap();
504
505 s2.send(i).unwrap();
506 r3.recv().unwrap();
507 }
508 });
509
510 for i in 0..COUNT {
511 for _ in 0..2 {
512 let mut sel = Select::new();
513 sel.recv(&r1);
514 sel.recv(&r2);
515 match sel.ready() {
516 0 => assert_eq!(r1.try_recv(), Ok(i)),
517 1 => assert_eq!(r2.try_recv(), Ok(i)),
518 _ => panic!(),
519 }
520
521 s3.send(()).unwrap();
522 }
523 }
524 })
525 .unwrap();
526 }
527
528 #[test]
stress_send()529 fn stress_send() {
530 const COUNT: usize = 10_000;
531
532 let (s1, r1) = bounded(0);
533 let (s2, r2) = bounded(0);
534 let (s3, r3) = bounded(100);
535
536 scope(|scope| {
537 scope.spawn(|_| {
538 for i in 0..COUNT {
539 assert_eq!(r1.recv().unwrap(), i);
540 assert_eq!(r2.recv().unwrap(), i);
541 r3.recv().unwrap();
542 }
543 });
544
545 for i in 0..COUNT {
546 for _ in 0..2 {
547 let mut sel = Select::new();
548 sel.send(&s1);
549 sel.send(&s2);
550 match sel.ready() {
551 0 => assert!(s1.try_send(i).is_ok()),
552 1 => assert!(s2.try_send(i).is_ok()),
553 _ => panic!(),
554 }
555 }
556 s3.send(()).unwrap();
557 }
558 })
559 .unwrap();
560 }
561
562 #[test]
stress_mixed()563 fn stress_mixed() {
564 const COUNT: usize = 10_000;
565
566 let (s1, r1) = bounded(0);
567 let (s2, r2) = bounded(0);
568 let (s3, r3) = bounded(100);
569
570 scope(|scope| {
571 scope.spawn(|_| {
572 for i in 0..COUNT {
573 s1.send(i).unwrap();
574 assert_eq!(r2.recv().unwrap(), i);
575 r3.recv().unwrap();
576 }
577 });
578
579 for i in 0..COUNT {
580 for _ in 0..2 {
581 let mut sel = Select::new();
582 sel.recv(&r1);
583 sel.send(&s2);
584 match sel.ready() {
585 0 => assert_eq!(r1.try_recv(), Ok(i)),
586 1 => assert!(s2.try_send(i).is_ok()),
587 _ => panic!(),
588 }
589 }
590 s3.send(()).unwrap();
591 }
592 })
593 .unwrap();
594 }
595
596 #[test]
stress_timeout_two_threads()597 fn stress_timeout_two_threads() {
598 const COUNT: usize = 20;
599
600 let (s, r) = bounded(2);
601
602 scope(|scope| {
603 scope.spawn(|_| {
604 for i in 0..COUNT {
605 if i % 2 == 0 {
606 thread::sleep(ms(500));
607 }
608
609 loop {
610 let mut sel = Select::new();
611 sel.send(&s);
612 match sel.ready_timeout(ms(100)) {
613 Err(_) => {}
614 Ok(0) => {
615 assert!(s.try_send(i).is_ok());
616 break;
617 }
618 Ok(_) => panic!(),
619 }
620 }
621 }
622 });
623
624 scope.spawn(|_| {
625 for i in 0..COUNT {
626 if i % 2 == 0 {
627 thread::sleep(ms(500));
628 }
629
630 loop {
631 let mut sel = Select::new();
632 sel.recv(&r);
633 match sel.ready_timeout(ms(100)) {
634 Err(_) => {}
635 Ok(0) => {
636 assert_eq!(r.try_recv(), Ok(i));
637 break;
638 }
639 Ok(_) => panic!(),
640 }
641 }
642 }
643 });
644 })
645 .unwrap();
646 }
647
648 #[test]
send_recv_same_channel()649 fn send_recv_same_channel() {
650 let (s, r) = bounded::<i32>(0);
651 let mut sel = Select::new();
652 sel.send(&s);
653 sel.recv(&r);
654 assert!(sel.ready_timeout(ms(100)).is_err());
655
656 let (s, r) = unbounded::<i32>();
657 let mut sel = Select::new();
658 sel.send(&s);
659 sel.recv(&r);
660 match sel.ready_timeout(ms(100)) {
661 Err(_) => panic!(),
662 Ok(0) => assert!(s.try_send(0).is_ok()),
663 Ok(_) => panic!(),
664 }
665 }
666
667 #[test]
channel_through_channel()668 fn channel_through_channel() {
669 const COUNT: usize = 1000;
670
671 type T = Box<dyn Any + Send>;
672
673 for cap in 1..4 {
674 let (s, r) = bounded::<T>(cap);
675
676 scope(|scope| {
677 scope.spawn(move |_| {
678 let mut s = s;
679
680 for _ in 0..COUNT {
681 let (new_s, new_r) = bounded(cap);
682 let new_r: T = Box::new(Some(new_r));
683
684 {
685 let mut sel = Select::new();
686 sel.send(&s);
687 match sel.ready() {
688 0 => assert!(s.try_send(new_r).is_ok()),
689 _ => panic!(),
690 }
691 }
692
693 s = new_s;
694 }
695 });
696
697 scope.spawn(move |_| {
698 let mut r = r;
699
700 for _ in 0..COUNT {
701 let new = {
702 let mut sel = Select::new();
703 sel.recv(&r);
704 match sel.ready() {
705 0 => r
706 .try_recv()
707 .unwrap()
708 .downcast_mut::<Option<Receiver<T>>>()
709 .unwrap()
710 .take()
711 .unwrap(),
712 _ => panic!(),
713 }
714 };
715 r = new;
716 }
717 });
718 })
719 .unwrap();
720 }
721 }
722
723 #[test]
fairness1()724 fn fairness1() {
725 const COUNT: usize = 10_000;
726
727 let (s1, r1) = bounded::<()>(COUNT);
728 let (s2, r2) = unbounded::<()>();
729
730 for _ in 0..COUNT {
731 s1.send(()).unwrap();
732 s2.send(()).unwrap();
733 }
734
735 let hits = vec![Cell::new(0usize); 4];
736 for _ in 0..COUNT {
737 let after = after(ms(0));
738 let tick = tick(ms(0));
739
740 let mut sel = Select::new();
741 sel.recv(&r1);
742 sel.recv(&r2);
743 sel.recv(&after);
744 sel.recv(&tick);
745 match sel.ready() {
746 0 => {
747 r1.try_recv().unwrap();
748 hits[0].set(hits[0].get() + 1);
749 }
750 1 => {
751 r2.try_recv().unwrap();
752 hits[1].set(hits[1].get() + 1);
753 }
754 2 => {
755 after.try_recv().unwrap();
756 hits[2].set(hits[2].get() + 1);
757 }
758 3 => {
759 tick.try_recv().unwrap();
760 hits[3].set(hits[3].get() + 1);
761 }
762 _ => panic!(),
763 }
764 }
765 assert!(hits.iter().all(|x| x.get() >= COUNT / hits.len() / 2));
766 }
767
768 #[test]
fairness2()769 fn fairness2() {
770 const COUNT: usize = 100_000;
771
772 let (s1, r1) = unbounded::<()>();
773 let (s2, r2) = bounded::<()>(1);
774 let (s3, r3) = bounded::<()>(0);
775
776 scope(|scope| {
777 scope.spawn(|_| {
778 for _ in 0..COUNT {
779 let mut sel = Select::new();
780 let mut oper1 = None;
781 let mut oper2 = None;
782 if s1.is_empty() {
783 oper1 = Some(sel.send(&s1));
784 }
785 if s2.is_empty() {
786 oper2 = Some(sel.send(&s2));
787 }
788 let oper3 = sel.send(&s3);
789 let oper = sel.select();
790 match oper.index() {
791 i if Some(i) == oper1 => assert!(oper.send(&s1, ()).is_ok()),
792 i if Some(i) == oper2 => assert!(oper.send(&s2, ()).is_ok()),
793 i if i == oper3 => assert!(oper.send(&s3, ()).is_ok()),
794 _ => unreachable!(),
795 }
796 }
797 });
798
799 let hits = vec![Cell::new(0usize); 3];
800 for _ in 0..COUNT {
801 let mut sel = Select::new();
802 sel.recv(&r1);
803 sel.recv(&r2);
804 sel.recv(&r3);
805 loop {
806 match sel.ready() {
807 0 => {
808 if r1.try_recv().is_ok() {
809 hits[0].set(hits[0].get() + 1);
810 break;
811 }
812 }
813 1 => {
814 if r2.try_recv().is_ok() {
815 hits[1].set(hits[1].get() + 1);
816 break;
817 }
818 }
819 2 => {
820 if r3.try_recv().is_ok() {
821 hits[2].set(hits[2].get() + 1);
822 break;
823 }
824 }
825 _ => unreachable!(),
826 }
827 }
828 }
829 assert!(hits.iter().all(|x| x.get() > 0));
830 })
831 .unwrap();
832 }
833