1 //! Tests copied from Go and manually rewritten in Rust.
2 //!
3 //! Source:
4 //! - https://github.com/golang/go
5 //!
6 //! Copyright & License:
7 //! - Copyright (c) 2009 The Go Authors
8 //! - https://golang.org/AUTHORS
9 //! - https://golang.org/LICENSE
10 //! - https://golang.org/PATENTS
11
12 use std::alloc::{GlobalAlloc, Layout, System};
13 use std::any::Any;
14 use std::cell::Cell;
15 use std::collections::HashMap;
16 use std::sync::atomic::{AtomicUsize, Ordering::SeqCst};
17 use std::sync::{Arc, Condvar, Mutex};
18 use std::thread;
19 use std::time::Duration;
20
21 use crossbeam_channel::{bounded, select, tick, unbounded, Receiver, Select, Sender};
22
ms(ms: u64) -> Duration23 fn ms(ms: u64) -> Duration {
24 Duration::from_millis(ms)
25 }
26
27 struct Chan<T> {
28 inner: Arc<Mutex<ChanInner<T>>>,
29 }
30
31 struct ChanInner<T> {
32 s: Option<Sender<T>>,
33 r: Receiver<T>,
34 }
35
36 impl<T> Clone for Chan<T> {
clone(&self) -> Chan<T>37 fn clone(&self) -> Chan<T> {
38 Chan {
39 inner: self.inner.clone(),
40 }
41 }
42 }
43
44 impl<T> Chan<T> {
send(&self, msg: T)45 fn send(&self, msg: T) {
46 let s = self
47 .inner
48 .lock()
49 .unwrap()
50 .s
51 .as_ref()
52 .expect("sending into closed channel")
53 .clone();
54 let _ = s.send(msg);
55 }
56
try_recv(&self) -> Option<T>57 fn try_recv(&self) -> Option<T> {
58 let r = self.inner.lock().unwrap().r.clone();
59 r.try_recv().ok()
60 }
61
recv(&self) -> Option<T>62 fn recv(&self) -> Option<T> {
63 let r = self.inner.lock().unwrap().r.clone();
64 r.recv().ok()
65 }
66
close(&self)67 fn close(&self) {
68 self.inner
69 .lock()
70 .unwrap()
71 .s
72 .take()
73 .expect("channel already closed");
74 }
75
rx(&self) -> Receiver<T>76 fn rx(&self) -> Receiver<T> {
77 self.inner.lock().unwrap().r.clone()
78 }
79
tx(&self) -> Sender<T>80 fn tx(&self) -> Sender<T> {
81 match self.inner.lock().unwrap().s.as_ref() {
82 None => {
83 let (s, r) = bounded(0);
84 std::mem::forget(r);
85 s
86 }
87 Some(s) => s.clone(),
88 }
89 }
90 }
91
92 impl<T> Iterator for Chan<T> {
93 type Item = T;
94
next(&mut self) -> Option<Self::Item>95 fn next(&mut self) -> Option<Self::Item> {
96 self.recv()
97 }
98 }
99
100 impl<'a, T> IntoIterator for &'a Chan<T> {
101 type Item = T;
102 type IntoIter = Chan<T>;
103
into_iter(self) -> Self::IntoIter104 fn into_iter(self) -> Self::IntoIter {
105 self.clone()
106 }
107 }
108
make<T>(cap: usize) -> Chan<T>109 fn make<T>(cap: usize) -> Chan<T> {
110 let (s, r) = bounded(cap);
111 Chan {
112 inner: Arc::new(Mutex::new(ChanInner { s: Some(s), r })),
113 }
114 }
115
make_unbounded<T>() -> Chan<T>116 fn make_unbounded<T>() -> Chan<T> {
117 let (s, r) = unbounded();
118 Chan {
119 inner: Arc::new(Mutex::new(ChanInner { s: Some(s), r })),
120 }
121 }
122 #[derive(Clone)]
123 struct WaitGroup(Arc<WaitGroupInner>);
124
125 struct WaitGroupInner {
126 cond: Condvar,
127 count: Mutex<i32>,
128 }
129
130 impl WaitGroup {
new() -> WaitGroup131 fn new() -> WaitGroup {
132 WaitGroup(Arc::new(WaitGroupInner {
133 cond: Condvar::new(),
134 count: Mutex::new(0),
135 }))
136 }
137
add(&self, delta: i32)138 fn add(&self, delta: i32) {
139 let mut count = self.0.count.lock().unwrap();
140 *count += delta;
141 assert!(*count >= 0);
142 self.0.cond.notify_all();
143 }
144
done(&self)145 fn done(&self) {
146 self.add(-1);
147 }
148
wait(&self)149 fn wait(&self) {
150 let mut count = self.0.count.lock().unwrap();
151 while *count > 0 {
152 count = self.0.cond.wait(count).unwrap();
153 }
154 }
155 }
156
157 struct Defer<F: FnOnce()> {
158 f: Option<Box<F>>,
159 }
160
161 impl<F: FnOnce()> Drop for Defer<F> {
drop(&mut self)162 fn drop(&mut self) {
163 let f = self.f.take().unwrap();
164 let mut f = Some(f);
165 let mut f = move || f.take().unwrap()();
166 f();
167 }
168 }
169
170 struct Counter;
171
172 static ALLOCATED: AtomicUsize = AtomicUsize::new(0);
173 unsafe impl GlobalAlloc for Counter {
alloc(&self, layout: Layout) -> *mut u8174 unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
175 let ret = System.alloc(layout);
176 if !ret.is_null() {
177 ALLOCATED.fetch_add(layout.size(), SeqCst);
178 }
179 return ret;
180 }
181
dealloc(&self, ptr: *mut u8, layout: Layout)182 unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
183 System.dealloc(ptr, layout);
184 ALLOCATED.fetch_sub(layout.size(), SeqCst);
185 }
186 }
187
188 #[global_allocator]
189 static A: Counter = Counter;
190
191 macro_rules! defer {
192 ($body:expr) => {
193 let _defer = Defer {
194 f: Some(Box::new(|| $body)),
195 };
196 };
197 }
198
199 macro_rules! go {
200 (@parse ref $v:ident, $($tail:tt)*) => {{
201 let ref $v = $v;
202 go!(@parse $($tail)*)
203 }};
204 (@parse move $v:ident, $($tail:tt)*) => {{
205 let $v = $v;
206 go!(@parse $($tail)*)
207 }};
208 (@parse $v:ident, $($tail:tt)*) => {{
209 let $v = $v.clone();
210 go!(@parse $($tail)*)
211 }};
212 (@parse $body:expr) => {
213 ::std::thread::spawn(move || {
214 let res = ::std::panic::catch_unwind(::std::panic::AssertUnwindSafe(|| {
215 $body
216 }));
217 if res.is_err() {
218 eprintln!("goroutine panicked: {:?}", res);
219 ::std::process::abort();
220 }
221 })
222 };
223 (@parse $($tail:tt)*) => {
224 compile_error!("invalid `go!` syntax")
225 };
226 ($($tail:tt)*) => {{
227 go!(@parse $($tail)*)
228 }};
229 }
230
231 // https://github.com/golang/go/blob/master/test/chan/doubleselect.go
232 mod doubleselect {
233 use super::*;
234
235 const ITERATIONS: i32 = 10_000;
236
sender(n: i32, c1: Chan<i32>, c2: Chan<i32>, c3: Chan<i32>, c4: Chan<i32>)237 fn sender(n: i32, c1: Chan<i32>, c2: Chan<i32>, c3: Chan<i32>, c4: Chan<i32>) {
238 defer! { c1.close() }
239 defer! { c2.close() }
240 defer! { c3.close() }
241 defer! { c4.close() }
242
243 for i in 0..n {
244 select! {
245 send(c1.tx(), i) -> _ => {}
246 send(c2.tx(), i) -> _ => {}
247 send(c3.tx(), i) -> _ => {}
248 send(c4.tx(), i) -> _ => {}
249 }
250 }
251 }
252
mux(out: Chan<i32>, inp: Chan<i32>, done: Chan<bool>)253 fn mux(out: Chan<i32>, inp: Chan<i32>, done: Chan<bool>) {
254 for v in inp {
255 out.send(v);
256 }
257 done.send(true);
258 }
259
recver(inp: Chan<i32>)260 fn recver(inp: Chan<i32>) {
261 let mut seen = HashMap::new();
262
263 for v in &inp {
264 if seen.contains_key(&v) {
265 panic!("got duplicate value for {}", v);
266 }
267 seen.insert(v, true);
268 }
269 }
270
271 #[test]
main()272 fn main() {
273 let c1 = make::<i32>(0);
274 let c2 = make::<i32>(0);
275 let c3 = make::<i32>(0);
276 let c4 = make::<i32>(0);
277 let done = make::<bool>(0);
278 let cmux = make::<i32>(0);
279
280 go!(c1, c2, c3, c4, sender(ITERATIONS, c1, c2, c3, c4));
281 go!(cmux, c1, done, mux(cmux, c1, done));
282 go!(cmux, c2, done, mux(cmux, c2, done));
283 go!(cmux, c3, done, mux(cmux, c3, done));
284 go!(cmux, c4, done, mux(cmux, c4, done));
285 go!(done, cmux, {
286 done.recv();
287 done.recv();
288 done.recv();
289 done.recv();
290 cmux.close();
291 });
292 recver(cmux);
293 }
294 }
295
296 // https://github.com/golang/go/blob/master/test/chan/fifo.go
297 mod fifo {
298 use super::*;
299
300 const N: i32 = 10;
301
302 #[test]
asynch_fifo()303 fn asynch_fifo() {
304 let ch = make::<i32>(N as usize);
305 for i in 0..N {
306 ch.send(i);
307 }
308 for i in 0..N {
309 if ch.recv() != Some(i) {
310 panic!("bad receive");
311 }
312 }
313 }
314
chain(ch: Chan<i32>, val: i32, inp: Chan<i32>, out: Chan<i32>)315 fn chain(ch: Chan<i32>, val: i32, inp: Chan<i32>, out: Chan<i32>) {
316 inp.recv();
317 if ch.recv() != Some(val) {
318 panic!("{}", val);
319 }
320 out.send(1);
321 }
322
323 #[test]
synch_fifo()324 fn synch_fifo() {
325 let ch = make::<i32>(0);
326 let mut inp = make::<i32>(0);
327 let start = inp.clone();
328
329 for i in 0..N {
330 let out = make::<i32>(0);
331 go!(ch, i, inp, out, chain(ch, i, inp, out));
332 inp = out;
333 }
334
335 start.send(0);
336 for i in 0..N {
337 ch.send(i);
338 }
339 inp.recv();
340 }
341 }
342
343 // https://github.com/golang/go/blob/master/test/chan/goroutines.go
344 mod goroutines {
345 use super::*;
346
f(left: Chan<i32>, right: Chan<i32>)347 fn f(left: Chan<i32>, right: Chan<i32>) {
348 left.send(right.recv().unwrap());
349 }
350
351 #[test]
main()352 fn main() {
353 let n = 100i32;
354
355 let leftmost = make::<i32>(0);
356 let mut right = leftmost.clone();
357 let mut left = leftmost.clone();
358
359 for _ in 0..n {
360 right = make::<i32>(0);
361 go!(left, right, f(left, right));
362 left = right.clone();
363 }
364
365 go!(right, right.send(1));
366 leftmost.recv().unwrap();
367 }
368 }
369
370 // https://github.com/golang/go/blob/master/test/chan/nonblock.go
371 mod nonblock {
372 use super::*;
373
i32receiver(c: Chan<i32>, strobe: Chan<bool>)374 fn i32receiver(c: Chan<i32>, strobe: Chan<bool>) {
375 if c.recv().unwrap() != 123 {
376 panic!("i32 value");
377 }
378 strobe.send(true);
379 }
380
i32sender(c: Chan<i32>, strobe: Chan<bool>)381 fn i32sender(c: Chan<i32>, strobe: Chan<bool>) {
382 c.send(234);
383 strobe.send(true);
384 }
385
i64receiver(c: Chan<i64>, strobe: Chan<bool>)386 fn i64receiver(c: Chan<i64>, strobe: Chan<bool>) {
387 if c.recv().unwrap() != 123456 {
388 panic!("i64 value");
389 }
390 strobe.send(true);
391 }
392
i64sender(c: Chan<i64>, strobe: Chan<bool>)393 fn i64sender(c: Chan<i64>, strobe: Chan<bool>) {
394 c.send(234567);
395 strobe.send(true);
396 }
397
breceiver(c: Chan<bool>, strobe: Chan<bool>)398 fn breceiver(c: Chan<bool>, strobe: Chan<bool>) {
399 if !c.recv().unwrap() {
400 panic!("b value");
401 }
402 strobe.send(true);
403 }
404
bsender(c: Chan<bool>, strobe: Chan<bool>)405 fn bsender(c: Chan<bool>, strobe: Chan<bool>) {
406 c.send(true);
407 strobe.send(true);
408 }
409
sreceiver(c: Chan<String>, strobe: Chan<bool>)410 fn sreceiver(c: Chan<String>, strobe: Chan<bool>) {
411 if c.recv().unwrap() != "hello" {
412 panic!("x value");
413 }
414 strobe.send(true);
415 }
416
ssender(c: Chan<String>, strobe: Chan<bool>)417 fn ssender(c: Chan<String>, strobe: Chan<bool>) {
418 c.send("hello again".to_string());
419 strobe.send(true);
420 }
421
422 const MAX_TRIES: usize = 10000; // Up to 100ms per test.
423
424 #[test]
main()425 fn main() {
426 let ticker = tick(Duration::new(0, 10_000)); // 10 us
427 let sleep = || {
428 ticker.recv().unwrap();
429 ticker.recv().unwrap();
430 thread::yield_now();
431 thread::yield_now();
432 thread::yield_now();
433 };
434
435 let sync = make::<bool>(0);
436
437 for buffer in 0..2 {
438 let c32 = make::<i32>(buffer);
439 let c64 = make::<i64>(buffer);
440 let cb = make::<bool>(buffer);
441 let cs = make::<String>(buffer);
442
443 select! {
444 recv(c32.rx()) -> _ => panic!("blocked i32sender"),
445 default => {}
446 }
447
448 select! {
449 recv(c64.rx()) -> _ => panic!("blocked i64sender"),
450 default => {}
451 }
452
453 select! {
454 recv(cb.rx()) -> _ => panic!("blocked bsender"),
455 default => {}
456 }
457
458 select! {
459 recv(cs.rx()) -> _ => panic!("blocked ssender"),
460 default => {}
461 }
462
463 go!(c32, sync, i32receiver(c32, sync));
464 let mut r#try = 0;
465 loop {
466 select! {
467 send(c32.tx(), 123) -> _ => break,
468 default => {
469 r#try += 1;
470 if r#try > MAX_TRIES {
471 println!("i32receiver buffer={}", buffer);
472 panic!("fail")
473 }
474 sleep();
475 }
476 }
477 }
478 sync.recv();
479 go!(c32, sync, i32sender(c32, sync));
480 if buffer > 0 {
481 sync.recv();
482 }
483 let mut r#try = 0;
484 loop {
485 select! {
486 recv(c32.rx()) -> v => {
487 if v != Ok(234) {
488 panic!("i32sender value");
489 }
490 break;
491 }
492 default => {
493 r#try += 1;
494 if r#try > MAX_TRIES {
495 println!("i32sender buffer={}", buffer);
496 panic!("fail");
497 }
498 sleep();
499 }
500 }
501 }
502 if buffer == 0 {
503 sync.recv();
504 }
505
506 go!(c64, sync, i64receiver(c64, sync));
507 let mut r#try = 0;
508 loop {
509 select! {
510 send(c64.tx(), 123456) -> _ => break,
511 default => {
512 r#try += 1;
513 if r#try > MAX_TRIES {
514 println!("i64receiver buffer={}", buffer);
515 panic!("fail")
516 }
517 sleep();
518 }
519 }
520 }
521 sync.recv();
522 go!(c64, sync, i64sender(c64, sync));
523 if buffer > 0 {
524 sync.recv();
525 }
526 let mut r#try = 0;
527 loop {
528 select! {
529 recv(c64.rx()) -> v => {
530 if v != Ok(234567) {
531 panic!("i64sender value");
532 }
533 break;
534 }
535 default => {
536 r#try += 1;
537 if r#try > MAX_TRIES {
538 println!("i64sender buffer={}", buffer);
539 panic!("fail");
540 }
541 sleep();
542 }
543 }
544 }
545 if buffer == 0 {
546 sync.recv();
547 }
548
549 go!(cb, sync, breceiver(cb, sync));
550 let mut r#try = 0;
551 loop {
552 select! {
553 send(cb.tx(), true) -> _ => break,
554 default => {
555 r#try += 1;
556 if r#try > MAX_TRIES {
557 println!("breceiver buffer={}", buffer);
558 panic!("fail")
559 }
560 sleep();
561 }
562 }
563 }
564 sync.recv();
565 go!(cb, sync, bsender(cb, sync));
566 if buffer > 0 {
567 sync.recv();
568 }
569 let mut r#try = 0;
570 loop {
571 select! {
572 recv(cb.rx()) -> v => {
573 if v != Ok(true) {
574 panic!("bsender value");
575 }
576 break;
577 }
578 default => {
579 r#try += 1;
580 if r#try > MAX_TRIES {
581 println!("bsender buffer={}", buffer);
582 panic!("fail");
583 }
584 sleep();
585 }
586 }
587 }
588 if buffer == 0 {
589 sync.recv();
590 }
591
592 go!(cs, sync, sreceiver(cs, sync));
593 let mut r#try = 0;
594 loop {
595 select! {
596 send(cs.tx(), "hello".to_string()) -> _ => break,
597 default => {
598 r#try += 1;
599 if r#try > MAX_TRIES {
600 println!("sreceiver buffer={}", buffer);
601 panic!("fail")
602 }
603 sleep();
604 }
605 }
606 }
607 sync.recv();
608 go!(cs, sync, ssender(cs, sync));
609 if buffer > 0 {
610 sync.recv();
611 }
612 let mut r#try = 0;
613 loop {
614 select! {
615 recv(cs.rx()) -> v => {
616 if v != Ok("hello again".to_string()) {
617 panic!("ssender value");
618 }
619 break;
620 }
621 default => {
622 r#try += 1;
623 if r#try > MAX_TRIES {
624 println!("ssender buffer={}", buffer);
625 panic!("fail");
626 }
627 sleep();
628 }
629 }
630 }
631 if buffer == 0 {
632 sync.recv();
633 }
634 }
635 }
636 }
637
638 // https://github.com/golang/go/blob/master/test/chan/select.go
639 mod select {
640 use super::*;
641
642 #[test]
main()643 fn main() {
644 let shift = Cell::new(0);
645 let counter = Cell::new(0);
646
647 let get_value = || {
648 counter.set(counter.get() + 1);
649 1 << shift.get()
650 };
651
652 let send = |mut a: Option<&Chan<u32>>, mut b: Option<&Chan<u32>>| {
653 let mut i = 0;
654 let never = make::<u32>(0);
655 loop {
656 let nil1 = never.tx();
657 let nil2 = never.tx();
658 let v1 = get_value();
659 let v2 = get_value();
660 select! {
661 send(a.map(|c| c.tx()).unwrap_or(nil1), v1) -> _ => {
662 i += 1;
663 a = None;
664 }
665 send(b.map(|c| c.tx()).unwrap_or(nil2), v2) -> _ => {
666 i += 1;
667 b = None;
668 }
669 default => break,
670 }
671 shift.set(shift.get() + 1);
672 }
673 i
674 };
675
676 let a = make::<u32>(1);
677 let b = make::<u32>(1);
678
679 assert_eq!(send(Some(&a), Some(&b)), 2);
680
681 let av = a.recv().unwrap();
682 let bv = b.recv().unwrap();
683 assert_eq!(av | bv, 3);
684
685 assert_eq!(send(Some(&a), None), 1);
686 assert_eq!(counter.get(), 10);
687 }
688 }
689
690 // https://github.com/golang/go/blob/master/test/chan/select2.go
691 mod select2 {
692 use super::*;
693
694 #[test]
main()695 fn main() {
696 fn sender(c: &Chan<i32>, n: i32) {
697 for _ in 0..n {
698 c.send(1);
699 }
700 }
701
702 fn receiver(c: &Chan<i32>, dummy: &Chan<i32>, n: i32) {
703 for _ in 0..n {
704 select! {
705 recv(c.rx()) -> _ => {
706 ()
707 }
708 recv(dummy.rx()) -> _ => {
709 panic!("dummy");
710 }
711 }
712 }
713 }
714
715 let c = make_unbounded::<i32>();
716 let dummy = make_unbounded::<i32>();
717
718 ALLOCATED.store(0, SeqCst);
719
720 go!(c, sender(&c, 100000));
721 receiver(&c, &dummy, 100000);
722
723 let alloc = ALLOCATED.load(SeqCst);
724
725 go!(c, sender(&c, 100000));
726 receiver(&c, &dummy, 100000);
727
728 assert!(!(ALLOCATED.load(SeqCst) > alloc && (ALLOCATED.load(SeqCst) - alloc) > 110000))
729 }
730 }
731
732 // https://github.com/golang/go/blob/master/test/chan/select3.go
733 mod select3 {
734 // TODO
735 }
736
737 // https://github.com/golang/go/blob/master/test/chan/select4.go
738 mod select4 {
739 use super::*;
740
741 #[test]
main()742 fn main() {
743 let c = make::<i32>(1);
744 let c1 = make::<i32>(0);
745 c.send(42);
746 select! {
747 recv(c1.rx()) -> _ => panic!("BUG"),
748 recv(c.rx()) -> v => assert_eq!(v, Ok(42)),
749 }
750 }
751 }
752
753 // https://github.com/golang/go/blob/master/test/chan/select6.go
754 mod select6 {
755 use super::*;
756
757 #[test]
main()758 fn main() {
759 let c1 = make::<bool>(0);
760 let c2 = make::<bool>(0);
761 let c3 = make::<bool>(0);
762
763 go!(c1, c1.recv());
764 go!(c1, c2, c3, {
765 select! {
766 recv(c1.rx()) -> _ => panic!("dummy"),
767 recv(c2.rx()) -> _ => c3.send(true),
768 }
769 c1.recv();
770 });
771 go!(c2, c2.send(true));
772
773 c3.recv();
774 c1.send(true);
775 c1.send(true);
776 }
777 }
778
779 // https://github.com/golang/go/blob/master/test/chan/select7.go
780 mod select7 {
781 use super::*;
782
recv1(c: Chan<i32>)783 fn recv1(c: Chan<i32>) {
784 c.recv().unwrap();
785 }
786
recv2(c: Chan<i32>)787 fn recv2(c: Chan<i32>) {
788 select! {
789 recv(c.rx()) -> _ => ()
790 }
791 }
792
recv3(c: Chan<i32>)793 fn recv3(c: Chan<i32>) {
794 let c2 = make::<i32>(1);
795 select! {
796 recv(c.rx()) -> _ => (),
797 recv(c2.rx()) -> _ => ()
798 }
799 }
800
send1(recv: fn(Chan<i32>))801 fn send1(recv: fn(Chan<i32>)) {
802 let c = make::<i32>(1);
803 go!(c, recv(c));
804 thread::yield_now();
805 c.send(1);
806 }
807
send2(recv: fn(Chan<i32>))808 fn send2(recv: fn(Chan<i32>)) {
809 let c = make::<i32>(1);
810 go!(c, recv(c));
811 thread::yield_now();
812 select! {
813 send(c.tx(), 1) -> _ => ()
814 }
815 }
816
send3(recv: fn(Chan<i32>))817 fn send3(recv: fn(Chan<i32>)) {
818 let c = make::<i32>(1);
819 go!(c, recv(c));
820 thread::yield_now();
821 let c2 = make::<i32>(1);
822 select! {
823 send(c.tx(), 1) -> _ => (),
824 send(c2.tx(), 1) -> _ => ()
825 }
826 }
827
828 #[test]
main()829 fn main() {
830 send1(recv1);
831 send2(recv1);
832 send3(recv1);
833 send1(recv2);
834 send2(recv2);
835 send3(recv2);
836 send1(recv3);
837 send2(recv3);
838 send3(recv3);
839 }
840 }
841
842 // https://github.com/golang/go/blob/master/test/chan/sieve1.go
843 mod sieve1 {
844 use super::*;
845
generate(ch: Chan<i32>)846 fn generate(ch: Chan<i32>) {
847 let mut i = 2;
848 loop {
849 ch.send(i);
850 i += 1;
851 }
852 }
853
filter(in_ch: Chan<i32>, out_ch: Chan<i32>, prime: i32)854 fn filter(in_ch: Chan<i32>, out_ch: Chan<i32>, prime: i32) {
855 for i in in_ch {
856 if i % prime != 0 {
857 out_ch.send(i);
858 }
859 }
860 }
861
sieve(primes: Chan<i32>)862 fn sieve(primes: Chan<i32>) {
863 let mut ch = make::<i32>(1);
864 go!(ch, generate(ch));
865 loop {
866 let prime = ch.recv().unwrap();
867 primes.send(prime);
868
869 let ch1 = make::<i32>(1);
870 go!(ch, ch1, prime, filter(ch, ch1, prime));
871 ch = ch1;
872 }
873 }
874
875 #[test]
main()876 fn main() {
877 let primes = make::<i32>(1);
878 go!(primes, sieve(primes));
879
880 let a = [
881 2, 3, 5, 7, 11, 13, 17, 19, 23, 29, 31, 37, 41, 43, 47, 53, 59, 61, 67, 71, 73, 79, 83,
882 89, 97,
883 ];
884 for item in a.iter() {
885 let x = primes.recv().unwrap();
886 if x != *item {
887 println!("{} != {}", x, item);
888 panic!("fail");
889 }
890 }
891 }
892 }
893
894 // https://github.com/golang/go/blob/master/test/chan/zerosize.go
895 mod zerosize {
896 use super::*;
897
898 #[test]
zero_size_struct()899 fn zero_size_struct() {
900 struct ZeroSize;
901 let _ = make::<ZeroSize>(0);
902 }
903
904 #[test]
zero_size_array()905 fn zero_size_array() {
906 let _ = make::<[u8; 0]>(0);
907 }
908 }
909
910 // https://github.com/golang/go/blob/master/src/runtime/chan_test.go
911 mod chan_test {
912 use super::*;
913
914 #[test]
test_chan()915 fn test_chan() {
916 const N: i32 = 200;
917
918 for cap in 0..N {
919 {
920 // Ensure that receive from empty chan blocks.
921 let c = make::<i32>(cap as usize);
922
923 let recv1 = Arc::new(Mutex::new(false));
924 go!(c, recv1, {
925 c.recv();
926 *recv1.lock().unwrap() = true;
927 });
928
929 let recv2 = Arc::new(Mutex::new(false));
930 go!(c, recv2, {
931 c.recv();
932 *recv2.lock().unwrap() = true;
933 });
934
935 thread::sleep(ms(1));
936
937 if *recv1.lock().unwrap() || *recv2.lock().unwrap() {
938 panic!();
939 }
940
941 // Ensure that non-blocking receive does not block.
942 select! {
943 recv(c.rx()) -> _ => panic!(),
944 default => {}
945 }
946 select! {
947 recv(c.rx()) -> _ => panic!(),
948 default => {}
949 }
950
951 c.send(0);
952 c.send(0);
953 }
954
955 {
956 // Ensure that send to full chan blocks.
957 let c = make::<i32>(cap as usize);
958 for i in 0..cap {
959 c.send(i);
960 }
961
962 let sent = Arc::new(Mutex::new(0));
963 go!(sent, c, {
964 c.send(0);
965 *sent.lock().unwrap() = 1;
966 });
967
968 thread::sleep(ms(1));
969
970 if *sent.lock().unwrap() != 0 {
971 panic!();
972 }
973
974 // Ensure that non-blocking send does not block.
975 select! {
976 send(c.tx(), 0) -> _ => panic!(),
977 default => {}
978 }
979 c.recv();
980 }
981
982 {
983 // Ensure that we receive 0 from closed chan.
984 let c = make::<i32>(cap as usize);
985 for i in 0..cap {
986 c.send(i);
987 }
988 c.close();
989
990 for i in 0..cap {
991 let v = c.recv();
992 if v != Some(i) {
993 panic!();
994 }
995 }
996
997 if c.recv() != None {
998 panic!();
999 }
1000 if c.try_recv() != None {
1001 panic!();
1002 }
1003 }
1004
1005 {
1006 // Ensure that close unblocks receive.
1007 let c = make::<i32>(cap as usize);
1008 let done = make::<bool>(0);
1009
1010 go!(c, done, {
1011 let v = c.try_recv();
1012 done.send(v.is_none());
1013 });
1014
1015 thread::sleep(ms(1));
1016 c.close();
1017
1018 if !done.recv().unwrap() {
1019 panic!();
1020 }
1021 }
1022
1023 {
1024 // Send 100 integers,
1025 // ensure that we receive them non-corrupted in FIFO order.
1026 let c = make::<i32>(cap as usize);
1027 go!(c, {
1028 for i in 0..100 {
1029 c.send(i);
1030 }
1031 });
1032 for i in 0..100 {
1033 if c.recv() != Some(i) {
1034 panic!();
1035 }
1036 }
1037
1038 // Same, but using recv2.
1039 go!(c, {
1040 for i in 0..100 {
1041 c.send(i);
1042 }
1043 });
1044 for i in 0..100 {
1045 if c.recv() != Some(i) {
1046 panic!();
1047 }
1048 }
1049 }
1050 }
1051 }
1052
1053 #[test]
test_nonblock_recv_race()1054 fn test_nonblock_recv_race() {
1055 const N: usize = 1000;
1056
1057 for _ in 0..N {
1058 let c = make::<i32>(1);
1059 c.send(1);
1060
1061 let t = go!(c, {
1062 select! {
1063 recv(c.rx()) -> _ => {}
1064 default => panic!("chan is not ready"),
1065 }
1066 });
1067
1068 c.close();
1069 c.recv();
1070 t.join().unwrap();
1071 }
1072 }
1073
1074 #[test]
test_nonblock_select_race()1075 fn test_nonblock_select_race() {
1076 const N: usize = 1000;
1077
1078 let done = make::<bool>(1);
1079 for _ in 0..N {
1080 let c1 = make::<i32>(1);
1081 let c2 = make::<i32>(1);
1082 c1.send(1);
1083
1084 go!(c1, c2, done, {
1085 select! {
1086 recv(c1.rx()) -> _ => {}
1087 recv(c2.rx()) -> _ => {}
1088 default => {
1089 done.send(false);
1090 return;
1091 }
1092 }
1093 done.send(true);
1094 });
1095
1096 c2.send(1);
1097 select! {
1098 recv(c1.rx()) -> _ => {}
1099 default => {}
1100 }
1101 if !done.recv().unwrap() {
1102 panic!("no chan is ready");
1103 }
1104 }
1105 }
1106
1107 #[test]
test_nonblock_select_race2()1108 fn test_nonblock_select_race2() {
1109 const N: usize = 1000;
1110
1111 let done = make::<bool>(1);
1112 for _ in 0..N {
1113 let c1 = make::<i32>(1);
1114 let c2 = make::<i32>(0);
1115 c1.send(1);
1116
1117 go!(c1, c2, done, {
1118 select! {
1119 recv(c1.rx()) -> _ => {}
1120 recv(c2.rx()) -> _ => {}
1121 default => {
1122 done.send(false);
1123 return;
1124 }
1125 }
1126 done.send(true);
1127 });
1128
1129 c2.close();
1130 select! {
1131 recv(c1.rx()) -> _ => {}
1132 default => {}
1133 }
1134 if !done.recv().unwrap() {
1135 panic!("no chan is ready");
1136 }
1137 }
1138 }
1139
1140 #[test]
test_self_select()1141 fn test_self_select() {
1142 // Ensure that send/recv on the same chan in select
1143 // does not crash nor deadlock.
1144
1145 for &cap in &[0, 10] {
1146 let wg = WaitGroup::new();
1147 wg.add(2);
1148 let c = make::<i32>(cap);
1149
1150 for p in 0..2 {
1151 let p = p;
1152 go!(wg, p, c, {
1153 defer! { wg.done() }
1154 for i in 0..1000 {
1155 if p == 0 || i % 2 == 0 {
1156 select! {
1157 send(c.tx(), p) -> _ => {}
1158 recv(c.rx()) -> v => {
1159 if cap == 0 && v.ok() == Some(p) {
1160 panic!("self receive");
1161 }
1162 }
1163 }
1164 } else {
1165 select! {
1166 recv(c.rx()) -> v => {
1167 if cap == 0 && v.ok() == Some(p) {
1168 panic!("self receive");
1169 }
1170 }
1171 send(c.tx(), p) -> _ => {}
1172 }
1173 }
1174 }
1175 });
1176 }
1177 wg.wait();
1178 }
1179 }
1180
1181 #[test]
test_select_stress()1182 fn test_select_stress() {
1183 let c = vec![
1184 make::<i32>(0),
1185 make::<i32>(0),
1186 make::<i32>(2),
1187 make::<i32>(3),
1188 ];
1189
1190 const N: usize = 10000;
1191
1192 // There are 4 goroutines that send N values on each of the chans,
1193 // + 4 goroutines that receive N values on each of the chans,
1194 // + 1 goroutine that sends N values on each of the chans in a single select,
1195 // + 1 goroutine that receives N values on each of the chans in a single select.
1196 // All these sends, receives and selects interact chaotically at runtime,
1197 // but we are careful that this whole construct does not deadlock.
1198 let wg = WaitGroup::new();
1199 wg.add(10);
1200
1201 for k in 0..4 {
1202 go!(k, c, wg, {
1203 for _ in 0..N {
1204 c[k].send(0);
1205 }
1206 wg.done();
1207 });
1208 go!(k, c, wg, {
1209 for _ in 0..N {
1210 c[k].recv();
1211 }
1212 wg.done();
1213 });
1214 }
1215
1216 go!(c, wg, {
1217 let mut n = [0; 4];
1218 let mut c1 = c.iter().map(|c| Some(c.rx().clone())).collect::<Vec<_>>();
1219
1220 for _ in 0..4 * N {
1221 let index = {
1222 let mut sel = Select::new();
1223 let mut opers = [!0; 4];
1224 for &i in &[3, 2, 0, 1] {
1225 if let Some(c) = &c1[i] {
1226 opers[i] = sel.recv(c);
1227 }
1228 }
1229
1230 let oper = sel.select();
1231 let mut index = !0;
1232 for i in 0..4 {
1233 if opers[i] == oper.index() {
1234 index = i;
1235 let _ = oper.recv(c1[i].as_ref().unwrap());
1236 break;
1237 }
1238 }
1239 index
1240 };
1241
1242 n[index] += 1;
1243 if n[index] == N {
1244 c1[index] = None;
1245 }
1246 }
1247 wg.done();
1248 });
1249
1250 go!(c, wg, {
1251 let mut n = [0; 4];
1252 let mut c1 = c.iter().map(|c| Some(c.tx().clone())).collect::<Vec<_>>();
1253
1254 for _ in 0..4 * N {
1255 let index = {
1256 let mut sel = Select::new();
1257 let mut opers = [!0; 4];
1258 for &i in &[0, 1, 2, 3] {
1259 if let Some(c) = &c1[i] {
1260 opers[i] = sel.send(c);
1261 }
1262 }
1263
1264 let oper = sel.select();
1265 let mut index = !0;
1266 for i in 0..4 {
1267 if opers[i] == oper.index() {
1268 index = i;
1269 let _ = oper.send(c1[i].as_ref().unwrap(), 0);
1270 break;
1271 }
1272 }
1273 index
1274 };
1275
1276 n[index] += 1;
1277 if n[index] == N {
1278 c1[index] = None;
1279 }
1280 }
1281 wg.done();
1282 });
1283
1284 wg.wait();
1285 }
1286
1287 #[test]
test_select_fairness()1288 fn test_select_fairness() {
1289 const TRIALS: usize = 10000;
1290
1291 let c1 = make::<u8>(TRIALS + 1);
1292 let c2 = make::<u8>(TRIALS + 1);
1293
1294 for _ in 0..TRIALS + 1 {
1295 c1.send(1);
1296 c2.send(2);
1297 }
1298
1299 let c3 = make::<u8>(0);
1300 let c4 = make::<u8>(0);
1301 let out = make::<u8>(0);
1302 let done = make::<u8>(0);
1303 let wg = WaitGroup::new();
1304
1305 wg.add(1);
1306 go!(wg, c1, c2, c3, c4, out, done, {
1307 defer! { wg.done() };
1308 loop {
1309 let b;
1310 select! {
1311 recv(c3.rx()) -> m => b = m.unwrap(),
1312 recv(c4.rx()) -> m => b = m.unwrap(),
1313 recv(c1.rx()) -> m => b = m.unwrap(),
1314 recv(c2.rx()) -> m => b = m.unwrap(),
1315 }
1316 select! {
1317 send(out.tx(), b) -> _ => {}
1318 recv(done.rx()) -> _ => return,
1319 }
1320 }
1321 });
1322
1323 let (mut cnt1, mut cnt2) = (0, 0);
1324 for _ in 0..TRIALS {
1325 match out.recv() {
1326 Some(1) => cnt1 += 1,
1327 Some(2) => cnt2 += 1,
1328 b => panic!("unexpected value {:?} on channel", b),
1329 }
1330 }
1331
1332 // If the select in the goroutine is fair,
1333 // cnt1 and cnt2 should be about the same value.
1334 // With 10,000 trials, the expected margin of error at
1335 // a confidence level of five nines is 4.4172 / (2 * Sqrt(10000)).
1336
1337 let r = cnt1 as f64 / TRIALS as f64;
1338 let e = (r - 0.5).abs();
1339
1340 if e > 4.4172 / (2.0 * (TRIALS as f64).sqrt()) {
1341 panic!(
1342 "unfair select: in {} trials, results were {}, {}",
1343 TRIALS, cnt1, cnt2,
1344 );
1345 }
1346
1347 done.close();
1348 wg.wait();
1349 }
1350
1351 #[test]
test_chan_send_interface()1352 fn test_chan_send_interface() {
1353 struct Mt;
1354
1355 let c = make::<Box<dyn Any>>(1);
1356 c.send(Box::new(Mt));
1357
1358 select! {
1359 send(c.tx(), Box::new(Mt)) -> _ => {}
1360 default => {}
1361 }
1362
1363 select! {
1364 send(c.tx(), Box::new(Mt)) -> _ => {}
1365 send(c.tx(), Box::new(Mt)) -> _ => {}
1366 default => {}
1367 }
1368 }
1369
1370 #[test]
test_pseudo_random_send()1371 fn test_pseudo_random_send() {
1372 const N: usize = 100;
1373
1374 for cap in 0..N {
1375 let c = make::<i32>(cap);
1376 let l = Arc::new(Mutex::new(vec![0i32; N]));
1377 let done = make::<bool>(0);
1378
1379 go!(c, done, l, {
1380 let mut l = l.lock().unwrap();
1381 for i in 0..N {
1382 thread::yield_now();
1383 l[i] = c.recv().unwrap();
1384 }
1385 done.send(true);
1386 });
1387
1388 for _ in 0..N {
1389 select! {
1390 send(c.tx(), 1) -> _ => {}
1391 send(c.tx(), 0) -> _ => {}
1392 }
1393 }
1394 done.recv();
1395
1396 let mut n0 = 0;
1397 let mut n1 = 0;
1398 for &i in l.lock().unwrap().iter() {
1399 n0 += (i + 1) % 2;
1400 n1 += i;
1401 }
1402
1403 if n0 <= N as i32 / 10 || n1 <= N as i32 / 10 {
1404 panic!(
1405 "Want pseudorandom, got {} zeros and {} ones (chan cap {})",
1406 n0, n1, cap,
1407 );
1408 }
1409 }
1410 }
1411
1412 #[test]
test_multi_consumer()1413 fn test_multi_consumer() {
1414 const NWORK: usize = 23;
1415 const NITER: usize = 271828;
1416
1417 let pn = [2, 3, 7, 11, 13, 17, 19, 23, 27, 31];
1418
1419 let q = make::<i32>(NWORK * 3);
1420 let r = make::<i32>(NWORK * 3);
1421
1422 let wg = WaitGroup::new();
1423 for i in 0..NWORK {
1424 wg.add(1);
1425 let w = i;
1426 go!(q, r, wg, pn, {
1427 for v in &q {
1428 if pn[w % pn.len()] == v {
1429 thread::yield_now();
1430 }
1431 r.send(v);
1432 }
1433 wg.done();
1434 });
1435 }
1436
1437 let expect = Arc::new(Mutex::new(0));
1438 go!(q, r, expect, wg, pn, {
1439 for i in 0..NITER {
1440 let v = pn[i % pn.len()];
1441 *expect.lock().unwrap() += v;
1442 q.send(v);
1443 }
1444 q.close();
1445 wg.wait();
1446 r.close();
1447 });
1448
1449 let mut n = 0;
1450 let mut s = 0;
1451 for v in &r {
1452 n += 1;
1453 s += v;
1454 }
1455
1456 if n != NITER || s != *expect.lock().unwrap() {
1457 panic!();
1458 }
1459 }
1460
1461 #[test]
test_select_duplicate_channel()1462 fn test_select_duplicate_channel() {
1463 // This test makes sure we can queue a G on
1464 // the same channel multiple times.
1465 let c = make::<i32>(0);
1466 let d = make::<i32>(0);
1467 let e = make::<i32>(0);
1468
1469 go!(c, d, e, {
1470 select! {
1471 recv(c.rx()) -> _ => {}
1472 recv(d.rx()) -> _ => {}
1473 recv(e.rx()) -> _ => {}
1474 }
1475 e.send(9);
1476 });
1477 thread::sleep(ms(1));
1478
1479 go!(c, c.recv());
1480 thread::sleep(ms(1));
1481
1482 d.send(7);
1483 e.recv();
1484 c.send(8);
1485 }
1486 }
1487
1488 // https://github.com/golang/go/blob/master/test/closedchan.go
1489 mod closedchan {
1490 // TODO
1491 }
1492
1493 // https://github.com/golang/go/blob/master/src/runtime/chanbarrier_test.go
1494 mod chanbarrier_test {
1495 // TODO
1496 }
1497
1498 // https://github.com/golang/go/blob/master/src/runtime/race/testdata/chan_test.go
1499 mod race_chan_test {
1500 // TODO
1501 }
1502
1503 // https://github.com/golang/go/blob/master/test/ken/chan.go
1504 mod chan {
1505 // TODO
1506 }
1507
1508 // https://github.com/golang/go/blob/master/test/ken/chan1.go
1509 mod chan1 {
1510 use super::*;
1511
1512 // sent messages
1513 const N: usize = 1000;
1514 // receiving "goroutines"
1515 const M: usize = 10;
1516 // channel buffering
1517 const W: usize = 2;
1518
r(c: Chan<usize>, m: usize, h: Arc<Mutex<[usize; N]>>)1519 fn r(c: Chan<usize>, m: usize, h: Arc<Mutex<[usize; N]>>) {
1520 loop {
1521 select! {
1522 recv(c.rx()) -> rr => {
1523 let r = rr.unwrap();
1524 let mut data = h.lock().unwrap();
1525 if data[r] != 1 {
1526 println!("r\nm={}\nr={}\nh={}\n", m, r, data[r]);
1527 panic!("fail")
1528 }
1529 data[r] = 2;
1530 }
1531 }
1532 }
1533 }
1534
s(c: Chan<usize>, h: Arc<Mutex<[usize; N]>>)1535 fn s(c: Chan<usize>, h: Arc<Mutex<[usize; N]>>) {
1536 for n in 0..N {
1537 let r = n;
1538 let mut data = h.lock().unwrap();
1539 if data[r] != 0 {
1540 println!("s");
1541 panic!("fail");
1542 }
1543 data[r] = 1;
1544 // https://github.com/crossbeam-rs/crossbeam/pull/615#discussion_r550281094
1545 drop(data);
1546 c.send(r);
1547 }
1548 }
1549
1550 #[test]
main()1551 fn main() {
1552 let h = Arc::new(Mutex::new([0usize; N]));
1553 let c = make::<usize>(W);
1554 for m in 0..M {
1555 go!(c, h, {
1556 r(c, m, h);
1557 });
1558 thread::yield_now();
1559 }
1560 thread::yield_now();
1561 thread::yield_now();
1562 s(c, h);
1563 }
1564 }
1565