1 //! Tests copied from `std::sync::mpsc`.
2 //!
3 //! This is a copy of tests for the `std::sync::mpsc` channels from the standard library, but
4 //! modified to work with `crossbeam-channel` instead.
5 //!
6 //! Minor tweaks were needed to make the tests compile:
7 //!
8 //! - Replace `box` syntax with `Box::new`.
9 //! - Replace all uses of `Select` with `select!`.
10 //! - Change the imports.
11 //! - Join all spawned threads.
12 //! - Removed assertion from oneshot_multi_thread_send_close_stress tests.
13 //!
14 //! Source:
15 //!   - https://github.com/rust-lang/rust/tree/master/src/libstd/sync/mpsc
16 //!
17 //! Copyright & License:
18 //!   - Copyright 2013-2014 The Rust Project Developers
19 //!   - Apache License, Version 2.0 or MIT license, at your option
20 //!   - https://github.com/rust-lang/rust/blob/master/COPYRIGHT
21 //!   - https://www.rust-lang.org/en-US/legal.html
22 
23 use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError};
24 use std::sync::mpsc::{SendError, TrySendError};
25 use std::thread::JoinHandle;
26 use std::time::Duration;
27 
28 use crossbeam_channel as cc;
29 
30 pub struct Sender<T> {
31     pub inner: cc::Sender<T>,
32 }
33 
34 impl<T> Sender<T> {
send(&self, t: T) -> Result<(), SendError<T>>35     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
36         self.inner.send(t).map_err(|cc::SendError(m)| SendError(m))
37     }
38 }
39 
40 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>41     fn clone(&self) -> Sender<T> {
42         Sender {
43             inner: self.inner.clone(),
44         }
45     }
46 }
47 
48 pub struct SyncSender<T> {
49     pub inner: cc::Sender<T>,
50 }
51 
52 impl<T> SyncSender<T> {
send(&self, t: T) -> Result<(), SendError<T>>53     pub fn send(&self, t: T) -> Result<(), SendError<T>> {
54         self.inner.send(t).map_err(|cc::SendError(m)| SendError(m))
55     }
56 
try_send(&self, t: T) -> Result<(), TrySendError<T>>57     pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
58         self.inner.try_send(t).map_err(|err| match err {
59             cc::TrySendError::Full(m) => TrySendError::Full(m),
60             cc::TrySendError::Disconnected(m) => TrySendError::Disconnected(m),
61         })
62     }
63 }
64 
65 impl<T> Clone for SyncSender<T> {
clone(&self) -> SyncSender<T>66     fn clone(&self) -> SyncSender<T> {
67         SyncSender {
68             inner: self.inner.clone(),
69         }
70     }
71 }
72 
73 pub struct Receiver<T> {
74     pub inner: cc::Receiver<T>,
75 }
76 
77 impl<T> Receiver<T> {
try_recv(&self) -> Result<T, TryRecvError>78     pub fn try_recv(&self) -> Result<T, TryRecvError> {
79         self.inner.try_recv().map_err(|err| match err {
80             cc::TryRecvError::Empty => TryRecvError::Empty,
81             cc::TryRecvError::Disconnected => TryRecvError::Disconnected,
82         })
83     }
84 
recv(&self) -> Result<T, RecvError>85     pub fn recv(&self) -> Result<T, RecvError> {
86         self.inner.recv().map_err(|_| RecvError)
87     }
88 
recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>89     pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
90         self.inner.recv_timeout(timeout).map_err(|err| match err {
91             cc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
92             cc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
93         })
94     }
95 
iter(&self) -> Iter<T>96     pub fn iter(&self) -> Iter<T> {
97         Iter { inner: self }
98     }
99 
try_iter(&self) -> TryIter<T>100     pub fn try_iter(&self) -> TryIter<T> {
101         TryIter { inner: self }
102     }
103 }
104 
105 impl<'a, T> IntoIterator for &'a Receiver<T> {
106     type Item = T;
107     type IntoIter = Iter<'a, T>;
108 
into_iter(self) -> Iter<'a, T>109     fn into_iter(self) -> Iter<'a, T> {
110         self.iter()
111     }
112 }
113 
114 impl<T> IntoIterator for Receiver<T> {
115     type Item = T;
116     type IntoIter = IntoIter<T>;
117 
into_iter(self) -> IntoIter<T>118     fn into_iter(self) -> IntoIter<T> {
119         IntoIter { inner: self }
120     }
121 }
122 
123 pub struct TryIter<'a, T: 'a> {
124     inner: &'a Receiver<T>,
125 }
126 
127 impl<'a, T> Iterator for TryIter<'a, T> {
128     type Item = T;
129 
next(&mut self) -> Option<T>130     fn next(&mut self) -> Option<T> {
131         self.inner.try_recv().ok()
132     }
133 }
134 
135 pub struct Iter<'a, T: 'a> {
136     inner: &'a Receiver<T>,
137 }
138 
139 impl<'a, T> Iterator for Iter<'a, T> {
140     type Item = T;
141 
next(&mut self) -> Option<T>142     fn next(&mut self) -> Option<T> {
143         self.inner.recv().ok()
144     }
145 }
146 
147 pub struct IntoIter<T> {
148     inner: Receiver<T>,
149 }
150 
151 impl<T> Iterator for IntoIter<T> {
152     type Item = T;
153 
next(&mut self) -> Option<T>154     fn next(&mut self) -> Option<T> {
155         self.inner.recv().ok()
156     }
157 }
158 
channel<T>() -> (Sender<T>, Receiver<T>)159 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
160     let (s, r) = cc::unbounded();
161     let s = Sender { inner: s };
162     let r = Receiver { inner: r };
163     (s, r)
164 }
165 
sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>)166 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
167     let (s, r) = cc::bounded(bound);
168     let s = SyncSender { inner: s };
169     let r = Receiver { inner: r };
170     (s, r)
171 }
172 
173 macro_rules! select {
174     (
175         $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
176     ) => ({
177         cc::crossbeam_channel_internal! {
178             $(
179                 recv(($rx).inner) -> res => {
180                     let $name = res.map_err(|_| ::std::sync::mpsc::RecvError);
181                     $code
182                 }
183             )+
184         }
185     })
186 }
187 
188 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs
189 mod channel_tests {
190     use super::*;
191 
192     use std::env;
193     use std::thread;
194     use std::time::{Duration, Instant};
195 
stress_factor() -> usize196     pub fn stress_factor() -> usize {
197         match env::var("RUST_TEST_STRESS") {
198             Ok(val) => val.parse().unwrap(),
199             Err(..) => 1,
200         }
201     }
202 
203     #[test]
smoke()204     fn smoke() {
205         let (tx, rx) = channel::<i32>();
206         tx.send(1).unwrap();
207         assert_eq!(rx.recv().unwrap(), 1);
208     }
209 
210     #[test]
drop_full()211     fn drop_full() {
212         let (tx, _rx) = channel::<Box<isize>>();
213         tx.send(Box::new(1)).unwrap();
214     }
215 
216     #[test]
drop_full_shared()217     fn drop_full_shared() {
218         let (tx, _rx) = channel::<Box<isize>>();
219         drop(tx.clone());
220         drop(tx.clone());
221         tx.send(Box::new(1)).unwrap();
222     }
223 
224     #[test]
smoke_shared()225     fn smoke_shared() {
226         let (tx, rx) = channel::<i32>();
227         tx.send(1).unwrap();
228         assert_eq!(rx.recv().unwrap(), 1);
229         let tx = tx.clone();
230         tx.send(1).unwrap();
231         assert_eq!(rx.recv().unwrap(), 1);
232     }
233 
234     #[test]
smoke_threads()235     fn smoke_threads() {
236         let (tx, rx) = channel::<i32>();
237         let t = thread::spawn(move || {
238             tx.send(1).unwrap();
239         });
240         assert_eq!(rx.recv().unwrap(), 1);
241         t.join().unwrap();
242     }
243 
244     #[test]
smoke_port_gone()245     fn smoke_port_gone() {
246         let (tx, rx) = channel::<i32>();
247         drop(rx);
248         assert!(tx.send(1).is_err());
249     }
250 
251     #[test]
smoke_shared_port_gone()252     fn smoke_shared_port_gone() {
253         let (tx, rx) = channel::<i32>();
254         drop(rx);
255         assert!(tx.send(1).is_err())
256     }
257 
258     #[test]
smoke_shared_port_gone2()259     fn smoke_shared_port_gone2() {
260         let (tx, rx) = channel::<i32>();
261         drop(rx);
262         let tx2 = tx.clone();
263         drop(tx);
264         assert!(tx2.send(1).is_err());
265     }
266 
267     #[test]
port_gone_concurrent()268     fn port_gone_concurrent() {
269         let (tx, rx) = channel::<i32>();
270         let t = thread::spawn(move || {
271             rx.recv().unwrap();
272         });
273         while tx.send(1).is_ok() {}
274         t.join().unwrap();
275     }
276 
277     #[test]
port_gone_concurrent_shared()278     fn port_gone_concurrent_shared() {
279         let (tx, rx) = channel::<i32>();
280         let tx2 = tx.clone();
281         let t = thread::spawn(move || {
282             rx.recv().unwrap();
283         });
284         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
285         t.join().unwrap();
286     }
287 
288     #[test]
smoke_chan_gone()289     fn smoke_chan_gone() {
290         let (tx, rx) = channel::<i32>();
291         drop(tx);
292         assert!(rx.recv().is_err());
293     }
294 
295     #[test]
smoke_chan_gone_shared()296     fn smoke_chan_gone_shared() {
297         let (tx, rx) = channel::<()>();
298         let tx2 = tx.clone();
299         drop(tx);
300         drop(tx2);
301         assert!(rx.recv().is_err());
302     }
303 
304     #[test]
chan_gone_concurrent()305     fn chan_gone_concurrent() {
306         let (tx, rx) = channel::<i32>();
307         let t = thread::spawn(move || {
308             tx.send(1).unwrap();
309             tx.send(1).unwrap();
310         });
311         while rx.recv().is_ok() {}
312         t.join().unwrap();
313     }
314 
315     #[test]
stress()316     fn stress() {
317         let (tx, rx) = channel::<i32>();
318         let t = thread::spawn(move || {
319             for _ in 0..10000 {
320                 tx.send(1).unwrap();
321             }
322         });
323         for _ in 0..10000 {
324             assert_eq!(rx.recv().unwrap(), 1);
325         }
326         t.join().ok().unwrap();
327     }
328 
329     #[test]
stress_shared()330     fn stress_shared() {
331         const AMT: u32 = 10000;
332         const NTHREADS: u32 = 8;
333         let (tx, rx) = channel::<i32>();
334 
335         let t = thread::spawn(move || {
336             for _ in 0..AMT * NTHREADS {
337                 assert_eq!(rx.recv().unwrap(), 1);
338             }
339             match rx.try_recv() {
340                 Ok(..) => panic!(),
341                 _ => {}
342             }
343         });
344 
345         let mut ts = Vec::with_capacity(NTHREADS as usize);
346         for _ in 0..NTHREADS {
347             let tx = tx.clone();
348             let t = thread::spawn(move || {
349                 for _ in 0..AMT {
350                     tx.send(1).unwrap();
351                 }
352             });
353             ts.push(t);
354         }
355         drop(tx);
356         t.join().ok().unwrap();
357         for t in ts {
358             t.join().unwrap();
359         }
360     }
361 
362     #[test]
send_from_outside_runtime()363     fn send_from_outside_runtime() {
364         let (tx1, rx1) = channel::<()>();
365         let (tx2, rx2) = channel::<i32>();
366         let t1 = thread::spawn(move || {
367             tx1.send(()).unwrap();
368             for _ in 0..40 {
369                 assert_eq!(rx2.recv().unwrap(), 1);
370             }
371         });
372         rx1.recv().unwrap();
373         let t2 = thread::spawn(move || {
374             for _ in 0..40 {
375                 tx2.send(1).unwrap();
376             }
377         });
378         t1.join().ok().unwrap();
379         t2.join().ok().unwrap();
380     }
381 
382     #[test]
recv_from_outside_runtime()383     fn recv_from_outside_runtime() {
384         let (tx, rx) = channel::<i32>();
385         let t = thread::spawn(move || {
386             for _ in 0..40 {
387                 assert_eq!(rx.recv().unwrap(), 1);
388             }
389         });
390         for _ in 0..40 {
391             tx.send(1).unwrap();
392         }
393         t.join().ok().unwrap();
394     }
395 
396     #[test]
no_runtime()397     fn no_runtime() {
398         let (tx1, rx1) = channel::<i32>();
399         let (tx2, rx2) = channel::<i32>();
400         let t1 = thread::spawn(move || {
401             assert_eq!(rx1.recv().unwrap(), 1);
402             tx2.send(2).unwrap();
403         });
404         let t2 = thread::spawn(move || {
405             tx1.send(1).unwrap();
406             assert_eq!(rx2.recv().unwrap(), 2);
407         });
408         t1.join().ok().unwrap();
409         t2.join().ok().unwrap();
410     }
411 
412     #[test]
oneshot_single_thread_close_port_first()413     fn oneshot_single_thread_close_port_first() {
414         // Simple test of closing without sending
415         let (_tx, rx) = channel::<i32>();
416         drop(rx);
417     }
418 
419     #[test]
oneshot_single_thread_close_chan_first()420     fn oneshot_single_thread_close_chan_first() {
421         // Simple test of closing without sending
422         let (tx, _rx) = channel::<i32>();
423         drop(tx);
424     }
425 
426     #[test]
oneshot_single_thread_send_port_close()427     fn oneshot_single_thread_send_port_close() {
428         // Testing that the sender cleans up the payload if receiver is closed
429         let (tx, rx) = channel::<Box<i32>>();
430         drop(rx);
431         assert!(tx.send(Box::new(0)).is_err());
432     }
433 
434     #[test]
oneshot_single_thread_recv_chan_close()435     fn oneshot_single_thread_recv_chan_close() {
436         let (tx, rx) = channel::<i32>();
437         drop(tx);
438         assert_eq!(rx.recv(), Err(RecvError));
439     }
440 
441     #[test]
oneshot_single_thread_send_then_recv()442     fn oneshot_single_thread_send_then_recv() {
443         let (tx, rx) = channel::<Box<i32>>();
444         tx.send(Box::new(10)).unwrap();
445         assert!(*rx.recv().unwrap() == 10);
446     }
447 
448     #[test]
oneshot_single_thread_try_send_open()449     fn oneshot_single_thread_try_send_open() {
450         let (tx, rx) = channel::<i32>();
451         assert!(tx.send(10).is_ok());
452         assert!(rx.recv().unwrap() == 10);
453     }
454 
455     #[test]
oneshot_single_thread_try_send_closed()456     fn oneshot_single_thread_try_send_closed() {
457         let (tx, rx) = channel::<i32>();
458         drop(rx);
459         assert!(tx.send(10).is_err());
460     }
461 
462     #[test]
oneshot_single_thread_try_recv_open()463     fn oneshot_single_thread_try_recv_open() {
464         let (tx, rx) = channel::<i32>();
465         tx.send(10).unwrap();
466         assert!(rx.recv() == Ok(10));
467     }
468 
469     #[test]
oneshot_single_thread_try_recv_closed()470     fn oneshot_single_thread_try_recv_closed() {
471         let (tx, rx) = channel::<i32>();
472         drop(tx);
473         assert!(rx.recv().is_err());
474     }
475 
476     #[test]
oneshot_single_thread_peek_data()477     fn oneshot_single_thread_peek_data() {
478         let (tx, rx) = channel::<i32>();
479         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
480         tx.send(10).unwrap();
481         assert_eq!(rx.try_recv(), Ok(10));
482     }
483 
484     #[test]
oneshot_single_thread_peek_close()485     fn oneshot_single_thread_peek_close() {
486         let (tx, rx) = channel::<i32>();
487         drop(tx);
488         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
489         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
490     }
491 
492     #[test]
oneshot_single_thread_peek_open()493     fn oneshot_single_thread_peek_open() {
494         let (_tx, rx) = channel::<i32>();
495         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
496     }
497 
498     #[test]
oneshot_multi_task_recv_then_send()499     fn oneshot_multi_task_recv_then_send() {
500         let (tx, rx) = channel::<Box<i32>>();
501         let t = thread::spawn(move || {
502             assert!(*rx.recv().unwrap() == 10);
503         });
504 
505         tx.send(Box::new(10)).unwrap();
506         t.join().unwrap();
507     }
508 
509     #[test]
oneshot_multi_task_recv_then_close()510     fn oneshot_multi_task_recv_then_close() {
511         let (tx, rx) = channel::<Box<i32>>();
512         let t = thread::spawn(move || {
513             drop(tx);
514         });
515         thread::spawn(move || {
516             assert_eq!(rx.recv(), Err(RecvError));
517         })
518         .join()
519         .unwrap();
520         t.join().unwrap();
521     }
522 
523     #[test]
oneshot_multi_thread_close_stress()524     fn oneshot_multi_thread_close_stress() {
525         let stress_factor = stress_factor();
526         let mut ts = Vec::with_capacity(stress_factor);
527         for _ in 0..stress_factor {
528             let (tx, rx) = channel::<i32>();
529             let t = thread::spawn(move || {
530                 drop(rx);
531             });
532             ts.push(t);
533             drop(tx);
534         }
535         for t in ts {
536             t.join().unwrap();
537         }
538     }
539 
540     #[test]
oneshot_multi_thread_send_close_stress()541     fn oneshot_multi_thread_send_close_stress() {
542         let stress_factor = stress_factor();
543         let mut ts = Vec::with_capacity(2 * stress_factor);
544         for _ in 0..stress_factor {
545             let (tx, rx) = channel::<i32>();
546             let t = thread::spawn(move || {
547                 drop(rx);
548             });
549             ts.push(t);
550             thread::spawn(move || {
551                 let _ = tx.send(1);
552             })
553             .join()
554             .unwrap();
555         }
556         for t in ts {
557             t.join().unwrap();
558         }
559     }
560 
561     #[test]
oneshot_multi_thread_recv_close_stress()562     fn oneshot_multi_thread_recv_close_stress() {
563         let stress_factor = stress_factor();
564         let mut ts = Vec::with_capacity(2 * stress_factor);
565         for _ in 0..stress_factor {
566             let (tx, rx) = channel::<i32>();
567             let t = thread::spawn(move || {
568                 thread::spawn(move || {
569                     assert_eq!(rx.recv(), Err(RecvError));
570                 })
571                 .join()
572                 .unwrap();
573             });
574             ts.push(t);
575             let t2 = thread::spawn(move || {
576                 let t = thread::spawn(move || {
577                     drop(tx);
578                 });
579                 t.join().unwrap();
580             });
581             ts.push(t2);
582         }
583         for t in ts {
584             t.join().unwrap();
585         }
586     }
587 
588     #[test]
oneshot_multi_thread_send_recv_stress()589     fn oneshot_multi_thread_send_recv_stress() {
590         let stress_factor = stress_factor();
591         let mut ts = Vec::with_capacity(stress_factor);
592         for _ in 0..stress_factor {
593             let (tx, rx) = channel::<Box<isize>>();
594             let t = thread::spawn(move || {
595                 tx.send(Box::new(10)).unwrap();
596             });
597             ts.push(t);
598             assert!(*rx.recv().unwrap() == 10);
599         }
600         for t in ts {
601             t.join().unwrap();
602         }
603     }
604 
605     #[test]
stream_send_recv_stress()606     fn stream_send_recv_stress() {
607         let stress_factor = stress_factor();
608         let mut ts = Vec::with_capacity(2 * stress_factor);
609         for _ in 0..stress_factor {
610             let (tx, rx) = channel();
611 
612             if let Some(t) = send(tx, 0) {
613                 ts.push(t);
614             }
615             if let Some(t2) = recv(rx, 0) {
616                 ts.push(t2);
617             }
618 
619             fn send(tx: Sender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
620                 if i == 10 {
621                     return None;
622                 }
623 
624                 Some(thread::spawn(move || {
625                     tx.send(Box::new(i)).unwrap();
626                     send(tx, i + 1);
627                 }))
628             }
629 
630             fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
631                 if i == 10 {
632                     return None;
633                 }
634 
635                 Some(thread::spawn(move || {
636                     assert!(*rx.recv().unwrap() == i);
637                     recv(rx, i + 1);
638                 }))
639             }
640         }
641         for t in ts {
642             t.join().unwrap();
643         }
644     }
645 
646     #[test]
oneshot_single_thread_recv_timeout()647     fn oneshot_single_thread_recv_timeout() {
648         let (tx, rx) = channel();
649         tx.send(()).unwrap();
650         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
651         assert_eq!(
652             rx.recv_timeout(Duration::from_millis(1)),
653             Err(RecvTimeoutError::Timeout)
654         );
655         tx.send(()).unwrap();
656         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
657     }
658 
659     #[test]
stress_recv_timeout_two_threads()660     fn stress_recv_timeout_two_threads() {
661         let (tx, rx) = channel();
662         let stress = stress_factor() + 100;
663         let timeout = Duration::from_millis(100);
664 
665         let t = thread::spawn(move || {
666             for i in 0..stress {
667                 if i % 2 == 0 {
668                     thread::sleep(timeout * 2);
669                 }
670                 tx.send(1usize).unwrap();
671             }
672         });
673 
674         let mut recv_count = 0;
675         loop {
676             match rx.recv_timeout(timeout) {
677                 Ok(n) => {
678                     assert_eq!(n, 1usize);
679                     recv_count += 1;
680                 }
681                 Err(RecvTimeoutError::Timeout) => continue,
682                 Err(RecvTimeoutError::Disconnected) => break,
683             }
684         }
685 
686         assert_eq!(recv_count, stress);
687         t.join().unwrap()
688     }
689 
690     #[test]
recv_timeout_upgrade()691     fn recv_timeout_upgrade() {
692         let (tx, rx) = channel::<()>();
693         let timeout = Duration::from_millis(1);
694         let _tx_clone = tx.clone();
695 
696         let start = Instant::now();
697         assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
698         assert!(Instant::now() >= start + timeout);
699     }
700 
701     #[test]
stress_recv_timeout_shared()702     fn stress_recv_timeout_shared() {
703         let (tx, rx) = channel();
704         let stress = stress_factor() + 100;
705 
706         let mut ts = Vec::with_capacity(stress);
707         for i in 0..stress {
708             let tx = tx.clone();
709             let t = thread::spawn(move || {
710                 thread::sleep(Duration::from_millis(i as u64 * 10));
711                 tx.send(1usize).unwrap();
712             });
713             ts.push(t);
714         }
715 
716         drop(tx);
717 
718         let mut recv_count = 0;
719         loop {
720             match rx.recv_timeout(Duration::from_millis(10)) {
721                 Ok(n) => {
722                     assert_eq!(n, 1usize);
723                     recv_count += 1;
724                 }
725                 Err(RecvTimeoutError::Timeout) => continue,
726                 Err(RecvTimeoutError::Disconnected) => break,
727             }
728         }
729 
730         assert_eq!(recv_count, stress);
731         for t in ts {
732             t.join().unwrap();
733         }
734     }
735 
736     #[test]
recv_a_lot()737     fn recv_a_lot() {
738         // Regression test that we don't run out of stack in scheduler context
739         let (tx, rx) = channel();
740         for _ in 0..10000 {
741             tx.send(()).unwrap();
742         }
743         for _ in 0..10000 {
744             rx.recv().unwrap();
745         }
746     }
747 
748     #[test]
shared_recv_timeout()749     fn shared_recv_timeout() {
750         let (tx, rx) = channel();
751         let total = 5;
752         let mut ts = Vec::with_capacity(total);
753         for _ in 0..total {
754             let tx = tx.clone();
755             let t = thread::spawn(move || {
756                 tx.send(()).unwrap();
757             });
758             ts.push(t);
759         }
760 
761         for _ in 0..total {
762             rx.recv().unwrap();
763         }
764 
765         assert_eq!(
766             rx.recv_timeout(Duration::from_millis(1)),
767             Err(RecvTimeoutError::Timeout)
768         );
769         tx.send(()).unwrap();
770         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
771         for t in ts {
772             t.join().unwrap();
773         }
774     }
775 
776     #[test]
shared_chan_stress()777     fn shared_chan_stress() {
778         let (tx, rx) = channel();
779         let total = stress_factor() + 100;
780         let mut ts = Vec::with_capacity(total);
781         for _ in 0..total {
782             let tx = tx.clone();
783             let t = thread::spawn(move || {
784                 tx.send(()).unwrap();
785             });
786             ts.push(t);
787         }
788 
789         for _ in 0..total {
790             rx.recv().unwrap();
791         }
792         for t in ts {
793             t.join().unwrap();
794         }
795     }
796 
797     #[test]
test_nested_recv_iter()798     fn test_nested_recv_iter() {
799         let (tx, rx) = channel::<i32>();
800         let (total_tx, total_rx) = channel::<i32>();
801 
802         let t = thread::spawn(move || {
803             let mut acc = 0;
804             for x in rx.iter() {
805                 acc += x;
806             }
807             total_tx.send(acc).unwrap();
808         });
809 
810         tx.send(3).unwrap();
811         tx.send(1).unwrap();
812         tx.send(2).unwrap();
813         drop(tx);
814         assert_eq!(total_rx.recv().unwrap(), 6);
815         t.join().unwrap();
816     }
817 
818     #[test]
test_recv_iter_break()819     fn test_recv_iter_break() {
820         let (tx, rx) = channel::<i32>();
821         let (count_tx, count_rx) = channel();
822 
823         let t = thread::spawn(move || {
824             let mut count = 0;
825             for x in rx.iter() {
826                 if count >= 3 {
827                     break;
828                 } else {
829                     count += x;
830                 }
831             }
832             count_tx.send(count).unwrap();
833         });
834 
835         tx.send(2).unwrap();
836         tx.send(2).unwrap();
837         tx.send(2).unwrap();
838         let _ = tx.send(2);
839         drop(tx);
840         assert_eq!(count_rx.recv().unwrap(), 4);
841         t.join().unwrap();
842     }
843 
844     #[test]
test_recv_try_iter()845     fn test_recv_try_iter() {
846         let (request_tx, request_rx) = channel();
847         let (response_tx, response_rx) = channel();
848 
849         // Request `x`s until we have `6`.
850         let t = thread::spawn(move || {
851             let mut count = 0;
852             loop {
853                 for x in response_rx.try_iter() {
854                     count += x;
855                     if count == 6 {
856                         return count;
857                     }
858                 }
859                 request_tx.send(()).unwrap();
860             }
861         });
862 
863         for _ in request_rx.iter() {
864             if response_tx.send(2).is_err() {
865                 break;
866             }
867         }
868 
869         assert_eq!(t.join().unwrap(), 6);
870     }
871 
872     #[test]
test_recv_into_iter_owned()873     fn test_recv_into_iter_owned() {
874         let mut iter = {
875             let (tx, rx) = channel::<i32>();
876             tx.send(1).unwrap();
877             tx.send(2).unwrap();
878 
879             rx.into_iter()
880         };
881         assert_eq!(iter.next().unwrap(), 1);
882         assert_eq!(iter.next().unwrap(), 2);
883         assert_eq!(iter.next().is_none(), true);
884     }
885 
886     #[test]
test_recv_into_iter_borrowed()887     fn test_recv_into_iter_borrowed() {
888         let (tx, rx) = channel::<i32>();
889         tx.send(1).unwrap();
890         tx.send(2).unwrap();
891         drop(tx);
892         let mut iter = (&rx).into_iter();
893         assert_eq!(iter.next().unwrap(), 1);
894         assert_eq!(iter.next().unwrap(), 2);
895         assert_eq!(iter.next().is_none(), true);
896     }
897 
898     #[test]
try_recv_states()899     fn try_recv_states() {
900         let (tx1, rx1) = channel::<i32>();
901         let (tx2, rx2) = channel::<()>();
902         let (tx3, rx3) = channel::<()>();
903         let t = thread::spawn(move || {
904             rx2.recv().unwrap();
905             tx1.send(1).unwrap();
906             tx3.send(()).unwrap();
907             rx2.recv().unwrap();
908             drop(tx1);
909             tx3.send(()).unwrap();
910         });
911 
912         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
913         tx2.send(()).unwrap();
914         rx3.recv().unwrap();
915         assert_eq!(rx1.try_recv(), Ok(1));
916         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
917         tx2.send(()).unwrap();
918         rx3.recv().unwrap();
919         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
920         t.join().unwrap();
921     }
922 
923     // This bug used to end up in a livelock inside of the Receiver destructor
924     // because the internal state of the Shared packet was corrupted
925     #[test]
destroy_upgraded_shared_port_when_sender_still_active()926     fn destroy_upgraded_shared_port_when_sender_still_active() {
927         let (tx, rx) = channel();
928         let (tx2, rx2) = channel();
929         let t = thread::spawn(move || {
930             rx.recv().unwrap(); // wait on a oneshot
931             drop(rx); // destroy a shared
932             tx2.send(()).unwrap();
933         });
934         // make sure the other thread has gone to sleep
935         for _ in 0..5000 {
936             thread::yield_now();
937         }
938 
939         // upgrade to a shared chan and send a message
940         let tx2 = tx.clone();
941         drop(tx);
942         tx2.send(()).unwrap();
943 
944         // wait for the child thread to exit before we exit
945         rx2.recv().unwrap();
946         t.join().unwrap();
947     }
948 
949     #[test]
issue_32114()950     fn issue_32114() {
951         let (tx, _) = channel();
952         let _ = tx.send(123);
953         assert_eq!(tx.send(123), Err(SendError(123)));
954     }
955 }
956 
957 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs
958 mod sync_channel_tests {
959     use super::*;
960 
961     use std::env;
962     use std::thread;
963     use std::time::Duration;
964 
stress_factor() -> usize965     pub fn stress_factor() -> usize {
966         match env::var("RUST_TEST_STRESS") {
967             Ok(val) => val.parse().unwrap(),
968             Err(..) => 1,
969         }
970     }
971 
972     #[test]
smoke()973     fn smoke() {
974         let (tx, rx) = sync_channel::<i32>(1);
975         tx.send(1).unwrap();
976         assert_eq!(rx.recv().unwrap(), 1);
977     }
978 
979     #[test]
drop_full()980     fn drop_full() {
981         let (tx, _rx) = sync_channel::<Box<isize>>(1);
982         tx.send(Box::new(1)).unwrap();
983     }
984 
985     #[test]
smoke_shared()986     fn smoke_shared() {
987         let (tx, rx) = sync_channel::<i32>(1);
988         tx.send(1).unwrap();
989         assert_eq!(rx.recv().unwrap(), 1);
990         let tx = tx.clone();
991         tx.send(1).unwrap();
992         assert_eq!(rx.recv().unwrap(), 1);
993     }
994 
995     #[test]
recv_timeout()996     fn recv_timeout() {
997         let (tx, rx) = sync_channel::<i32>(1);
998         assert_eq!(
999             rx.recv_timeout(Duration::from_millis(1)),
1000             Err(RecvTimeoutError::Timeout)
1001         );
1002         tx.send(1).unwrap();
1003         assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
1004     }
1005 
1006     #[test]
smoke_threads()1007     fn smoke_threads() {
1008         let (tx, rx) = sync_channel::<i32>(0);
1009         let t = thread::spawn(move || {
1010             tx.send(1).unwrap();
1011         });
1012         assert_eq!(rx.recv().unwrap(), 1);
1013         t.join().unwrap();
1014     }
1015 
1016     #[test]
smoke_port_gone()1017     fn smoke_port_gone() {
1018         let (tx, rx) = sync_channel::<i32>(0);
1019         drop(rx);
1020         assert!(tx.send(1).is_err());
1021     }
1022 
1023     #[test]
smoke_shared_port_gone2()1024     fn smoke_shared_port_gone2() {
1025         let (tx, rx) = sync_channel::<i32>(0);
1026         drop(rx);
1027         let tx2 = tx.clone();
1028         drop(tx);
1029         assert!(tx2.send(1).is_err());
1030     }
1031 
1032     #[test]
port_gone_concurrent()1033     fn port_gone_concurrent() {
1034         let (tx, rx) = sync_channel::<i32>(0);
1035         let t = thread::spawn(move || {
1036             rx.recv().unwrap();
1037         });
1038         while tx.send(1).is_ok() {}
1039         t.join().unwrap();
1040     }
1041 
1042     #[test]
port_gone_concurrent_shared()1043     fn port_gone_concurrent_shared() {
1044         let (tx, rx) = sync_channel::<i32>(0);
1045         let tx2 = tx.clone();
1046         let t = thread::spawn(move || {
1047             rx.recv().unwrap();
1048         });
1049         while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1050         t.join().unwrap();
1051     }
1052 
1053     #[test]
smoke_chan_gone()1054     fn smoke_chan_gone() {
1055         let (tx, rx) = sync_channel::<i32>(0);
1056         drop(tx);
1057         assert!(rx.recv().is_err());
1058     }
1059 
1060     #[test]
smoke_chan_gone_shared()1061     fn smoke_chan_gone_shared() {
1062         let (tx, rx) = sync_channel::<()>(0);
1063         let tx2 = tx.clone();
1064         drop(tx);
1065         drop(tx2);
1066         assert!(rx.recv().is_err());
1067     }
1068 
1069     #[test]
chan_gone_concurrent()1070     fn chan_gone_concurrent() {
1071         let (tx, rx) = sync_channel::<i32>(0);
1072         let t = thread::spawn(move || {
1073             tx.send(1).unwrap();
1074             tx.send(1).unwrap();
1075         });
1076         while rx.recv().is_ok() {}
1077         t.join().unwrap();
1078     }
1079 
1080     #[test]
stress()1081     fn stress() {
1082         let (tx, rx) = sync_channel::<i32>(0);
1083         let t = thread::spawn(move || {
1084             for _ in 0..10000 {
1085                 tx.send(1).unwrap();
1086             }
1087         });
1088         for _ in 0..10000 {
1089             assert_eq!(rx.recv().unwrap(), 1);
1090         }
1091         t.join().unwrap();
1092     }
1093 
1094     #[test]
stress_recv_timeout_two_threads()1095     fn stress_recv_timeout_two_threads() {
1096         let (tx, rx) = sync_channel::<i32>(0);
1097 
1098         let t = thread::spawn(move || {
1099             for _ in 0..10000 {
1100                 tx.send(1).unwrap();
1101             }
1102         });
1103 
1104         let mut recv_count = 0;
1105         loop {
1106             match rx.recv_timeout(Duration::from_millis(1)) {
1107                 Ok(v) => {
1108                     assert_eq!(v, 1);
1109                     recv_count += 1;
1110                 }
1111                 Err(RecvTimeoutError::Timeout) => continue,
1112                 Err(RecvTimeoutError::Disconnected) => break,
1113             }
1114         }
1115 
1116         assert_eq!(recv_count, 10000);
1117         t.join().unwrap();
1118     }
1119 
1120     #[test]
stress_recv_timeout_shared()1121     fn stress_recv_timeout_shared() {
1122         const AMT: u32 = 1000;
1123         const NTHREADS: u32 = 8;
1124         let (tx, rx) = sync_channel::<i32>(0);
1125         let (dtx, drx) = sync_channel::<()>(0);
1126 
1127         let t = thread::spawn(move || {
1128             let mut recv_count = 0;
1129             loop {
1130                 match rx.recv_timeout(Duration::from_millis(10)) {
1131                     Ok(v) => {
1132                         assert_eq!(v, 1);
1133                         recv_count += 1;
1134                     }
1135                     Err(RecvTimeoutError::Timeout) => continue,
1136                     Err(RecvTimeoutError::Disconnected) => break,
1137                 }
1138             }
1139 
1140             assert_eq!(recv_count, AMT * NTHREADS);
1141             assert!(rx.try_recv().is_err());
1142 
1143             dtx.send(()).unwrap();
1144         });
1145 
1146         let mut ts = Vec::with_capacity(NTHREADS as usize);
1147         for _ in 0..NTHREADS {
1148             let tx = tx.clone();
1149             let t = thread::spawn(move || {
1150                 for _ in 0..AMT {
1151                     tx.send(1).unwrap();
1152                 }
1153             });
1154             ts.push(t);
1155         }
1156 
1157         drop(tx);
1158 
1159         drx.recv().unwrap();
1160         for t in ts {
1161             t.join().unwrap();
1162         }
1163         t.join().unwrap();
1164     }
1165 
1166     #[test]
stress_shared()1167     fn stress_shared() {
1168         const AMT: u32 = 1000;
1169         const NTHREADS: u32 = 8;
1170         let (tx, rx) = sync_channel::<i32>(0);
1171         let (dtx, drx) = sync_channel::<()>(0);
1172 
1173         let t = thread::spawn(move || {
1174             for _ in 0..AMT * NTHREADS {
1175                 assert_eq!(rx.recv().unwrap(), 1);
1176             }
1177             match rx.try_recv() {
1178                 Ok(..) => panic!(),
1179                 _ => {}
1180             }
1181             dtx.send(()).unwrap();
1182         });
1183 
1184         let mut ts = Vec::with_capacity(NTHREADS as usize);
1185         for _ in 0..NTHREADS {
1186             let tx = tx.clone();
1187             let t = thread::spawn(move || {
1188                 for _ in 0..AMT {
1189                     tx.send(1).unwrap();
1190                 }
1191             });
1192             ts.push(t);
1193         }
1194         drop(tx);
1195         drx.recv().unwrap();
1196         for t in ts {
1197             t.join().unwrap();
1198         }
1199         t.join().unwrap();
1200     }
1201 
1202     #[test]
oneshot_single_thread_close_port_first()1203     fn oneshot_single_thread_close_port_first() {
1204         // Simple test of closing without sending
1205         let (_tx, rx) = sync_channel::<i32>(0);
1206         drop(rx);
1207     }
1208 
1209     #[test]
oneshot_single_thread_close_chan_first()1210     fn oneshot_single_thread_close_chan_first() {
1211         // Simple test of closing without sending
1212         let (tx, _rx) = sync_channel::<i32>(0);
1213         drop(tx);
1214     }
1215 
1216     #[test]
oneshot_single_thread_send_port_close()1217     fn oneshot_single_thread_send_port_close() {
1218         // Testing that the sender cleans up the payload if receiver is closed
1219         let (tx, rx) = sync_channel::<Box<i32>>(0);
1220         drop(rx);
1221         assert!(tx.send(Box::new(0)).is_err());
1222     }
1223 
1224     #[test]
oneshot_single_thread_recv_chan_close()1225     fn oneshot_single_thread_recv_chan_close() {
1226         let (tx, rx) = sync_channel::<i32>(0);
1227         drop(tx);
1228         assert_eq!(rx.recv(), Err(RecvError));
1229     }
1230 
1231     #[test]
oneshot_single_thread_send_then_recv()1232     fn oneshot_single_thread_send_then_recv() {
1233         let (tx, rx) = sync_channel::<Box<i32>>(1);
1234         tx.send(Box::new(10)).unwrap();
1235         assert!(*rx.recv().unwrap() == 10);
1236     }
1237 
1238     #[test]
oneshot_single_thread_try_send_open()1239     fn oneshot_single_thread_try_send_open() {
1240         let (tx, rx) = sync_channel::<i32>(1);
1241         assert_eq!(tx.try_send(10), Ok(()));
1242         assert!(rx.recv().unwrap() == 10);
1243     }
1244 
1245     #[test]
oneshot_single_thread_try_send_closed()1246     fn oneshot_single_thread_try_send_closed() {
1247         let (tx, rx) = sync_channel::<i32>(0);
1248         drop(rx);
1249         assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1250     }
1251 
1252     #[test]
oneshot_single_thread_try_send_closed2()1253     fn oneshot_single_thread_try_send_closed2() {
1254         let (tx, _rx) = sync_channel::<i32>(0);
1255         assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1256     }
1257 
1258     #[test]
oneshot_single_thread_try_recv_open()1259     fn oneshot_single_thread_try_recv_open() {
1260         let (tx, rx) = sync_channel::<i32>(1);
1261         tx.send(10).unwrap();
1262         assert!(rx.recv() == Ok(10));
1263     }
1264 
1265     #[test]
oneshot_single_thread_try_recv_closed()1266     fn oneshot_single_thread_try_recv_closed() {
1267         let (tx, rx) = sync_channel::<i32>(0);
1268         drop(tx);
1269         assert!(rx.recv().is_err());
1270     }
1271 
1272     #[test]
oneshot_single_thread_try_recv_closed_with_data()1273     fn oneshot_single_thread_try_recv_closed_with_data() {
1274         let (tx, rx) = sync_channel::<i32>(1);
1275         tx.send(10).unwrap();
1276         drop(tx);
1277         assert_eq!(rx.try_recv(), Ok(10));
1278         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1279     }
1280 
1281     #[test]
oneshot_single_thread_peek_data()1282     fn oneshot_single_thread_peek_data() {
1283         let (tx, rx) = sync_channel::<i32>(1);
1284         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1285         tx.send(10).unwrap();
1286         assert_eq!(rx.try_recv(), Ok(10));
1287     }
1288 
1289     #[test]
oneshot_single_thread_peek_close()1290     fn oneshot_single_thread_peek_close() {
1291         let (tx, rx) = sync_channel::<i32>(0);
1292         drop(tx);
1293         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1294         assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1295     }
1296 
1297     #[test]
oneshot_single_thread_peek_open()1298     fn oneshot_single_thread_peek_open() {
1299         let (_tx, rx) = sync_channel::<i32>(0);
1300         assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1301     }
1302 
1303     #[test]
oneshot_multi_task_recv_then_send()1304     fn oneshot_multi_task_recv_then_send() {
1305         let (tx, rx) = sync_channel::<Box<i32>>(0);
1306         let t = thread::spawn(move || {
1307             assert!(*rx.recv().unwrap() == 10);
1308         });
1309 
1310         tx.send(Box::new(10)).unwrap();
1311         t.join().unwrap();
1312     }
1313 
1314     #[test]
oneshot_multi_task_recv_then_close()1315     fn oneshot_multi_task_recv_then_close() {
1316         let (tx, rx) = sync_channel::<Box<i32>>(0);
1317         let t = thread::spawn(move || {
1318             drop(tx);
1319         });
1320         thread::spawn(move || {
1321             assert_eq!(rx.recv(), Err(RecvError));
1322         })
1323         .join()
1324         .unwrap();
1325         t.join().unwrap();
1326     }
1327 
1328     #[test]
oneshot_multi_thread_close_stress()1329     fn oneshot_multi_thread_close_stress() {
1330         let stress_factor = stress_factor();
1331         let mut ts = Vec::with_capacity(stress_factor);
1332         for _ in 0..stress_factor {
1333             let (tx, rx) = sync_channel::<i32>(0);
1334             let t = thread::spawn(move || {
1335                 drop(rx);
1336             });
1337             ts.push(t);
1338             drop(tx);
1339         }
1340         for t in ts {
1341             t.join().unwrap();
1342         }
1343     }
1344 
1345     #[test]
oneshot_multi_thread_send_close_stress()1346     fn oneshot_multi_thread_send_close_stress() {
1347         let stress_factor = stress_factor();
1348         let mut ts = Vec::with_capacity(stress_factor);
1349         for _ in 0..stress_factor {
1350             let (tx, rx) = sync_channel::<i32>(0);
1351             let t = thread::spawn(move || {
1352                 drop(rx);
1353             });
1354             ts.push(t);
1355             thread::spawn(move || {
1356                 let _ = tx.send(1);
1357             })
1358             .join()
1359             .unwrap();
1360         }
1361         for t in ts {
1362             t.join().unwrap();
1363         }
1364     }
1365 
1366     #[test]
oneshot_multi_thread_recv_close_stress()1367     fn oneshot_multi_thread_recv_close_stress() {
1368         let stress_factor = stress_factor();
1369         let mut ts = Vec::with_capacity(2 * stress_factor);
1370         for _ in 0..stress_factor {
1371             let (tx, rx) = sync_channel::<i32>(0);
1372             let t = thread::spawn(move || {
1373                 thread::spawn(move || {
1374                     assert_eq!(rx.recv(), Err(RecvError));
1375                 })
1376                 .join()
1377                 .unwrap();
1378             });
1379             ts.push(t);
1380             let t2 = thread::spawn(move || {
1381                 thread::spawn(move || {
1382                     drop(tx);
1383                 });
1384             });
1385             ts.push(t2);
1386         }
1387         for t in ts {
1388             t.join().unwrap();
1389         }
1390     }
1391 
1392     #[test]
oneshot_multi_thread_send_recv_stress()1393     fn oneshot_multi_thread_send_recv_stress() {
1394         let stress_factor = stress_factor();
1395         let mut ts = Vec::with_capacity(stress_factor);
1396         for _ in 0..stress_factor {
1397             let (tx, rx) = sync_channel::<Box<i32>>(0);
1398             let t = thread::spawn(move || {
1399                 tx.send(Box::new(10)).unwrap();
1400             });
1401             ts.push(t);
1402             assert!(*rx.recv().unwrap() == 10);
1403         }
1404         for t in ts {
1405             t.join().unwrap();
1406         }
1407     }
1408 
1409     #[test]
stream_send_recv_stress()1410     fn stream_send_recv_stress() {
1411         let stress_factor = stress_factor();
1412         let mut ts = Vec::with_capacity(2 * stress_factor);
1413         for _ in 0..stress_factor {
1414             let (tx, rx) = sync_channel::<Box<i32>>(0);
1415 
1416             if let Some(t) = send(tx, 0) {
1417                 ts.push(t);
1418             }
1419             if let Some(t) = recv(rx, 0) {
1420                 ts.push(t);
1421             }
1422 
1423             fn send(tx: SyncSender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
1424                 if i == 10 {
1425                     return None;
1426                 }
1427 
1428                 Some(thread::spawn(move || {
1429                     tx.send(Box::new(i)).unwrap();
1430                     send(tx, i + 1);
1431                 }))
1432             }
1433 
1434             fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
1435                 if i == 10 {
1436                     return None;
1437                 }
1438 
1439                 Some(thread::spawn(move || {
1440                     assert!(*rx.recv().unwrap() == i);
1441                     recv(rx, i + 1);
1442                 }))
1443             }
1444         }
1445         for t in ts {
1446             t.join().unwrap();
1447         }
1448     }
1449 
1450     #[test]
recv_a_lot()1451     fn recv_a_lot() {
1452         // Regression test that we don't run out of stack in scheduler context
1453         let (tx, rx) = sync_channel(10000);
1454         for _ in 0..10000 {
1455             tx.send(()).unwrap();
1456         }
1457         for _ in 0..10000 {
1458             rx.recv().unwrap();
1459         }
1460     }
1461 
1462     #[test]
shared_chan_stress()1463     fn shared_chan_stress() {
1464         let (tx, rx) = sync_channel(0);
1465         let total = stress_factor() + 100;
1466         let mut ts = Vec::with_capacity(total);
1467         for _ in 0..total {
1468             let tx = tx.clone();
1469             let t = thread::spawn(move || {
1470                 tx.send(()).unwrap();
1471             });
1472             ts.push(t);
1473         }
1474 
1475         for _ in 0..total {
1476             rx.recv().unwrap();
1477         }
1478         for t in ts {
1479             t.join().unwrap();
1480         }
1481     }
1482 
1483     #[test]
test_nested_recv_iter()1484     fn test_nested_recv_iter() {
1485         let (tx, rx) = sync_channel::<i32>(0);
1486         let (total_tx, total_rx) = sync_channel::<i32>(0);
1487 
1488         let t = thread::spawn(move || {
1489             let mut acc = 0;
1490             for x in rx.iter() {
1491                 acc += x;
1492             }
1493             total_tx.send(acc).unwrap();
1494         });
1495 
1496         tx.send(3).unwrap();
1497         tx.send(1).unwrap();
1498         tx.send(2).unwrap();
1499         drop(tx);
1500         assert_eq!(total_rx.recv().unwrap(), 6);
1501         t.join().unwrap();
1502     }
1503 
1504     #[test]
test_recv_iter_break()1505     fn test_recv_iter_break() {
1506         let (tx, rx) = sync_channel::<i32>(0);
1507         let (count_tx, count_rx) = sync_channel(0);
1508 
1509         let t = thread::spawn(move || {
1510             let mut count = 0;
1511             for x in rx.iter() {
1512                 if count >= 3 {
1513                     break;
1514                 } else {
1515                     count += x;
1516                 }
1517             }
1518             count_tx.send(count).unwrap();
1519         });
1520 
1521         tx.send(2).unwrap();
1522         tx.send(2).unwrap();
1523         tx.send(2).unwrap();
1524         let _ = tx.try_send(2);
1525         drop(tx);
1526         assert_eq!(count_rx.recv().unwrap(), 4);
1527         t.join().unwrap();
1528     }
1529 
1530     #[test]
try_recv_states()1531     fn try_recv_states() {
1532         let (tx1, rx1) = sync_channel::<i32>(1);
1533         let (tx2, rx2) = sync_channel::<()>(1);
1534         let (tx3, rx3) = sync_channel::<()>(1);
1535         let t = thread::spawn(move || {
1536             rx2.recv().unwrap();
1537             tx1.send(1).unwrap();
1538             tx3.send(()).unwrap();
1539             rx2.recv().unwrap();
1540             drop(tx1);
1541             tx3.send(()).unwrap();
1542         });
1543 
1544         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1545         tx2.send(()).unwrap();
1546         rx3.recv().unwrap();
1547         assert_eq!(rx1.try_recv(), Ok(1));
1548         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1549         tx2.send(()).unwrap();
1550         rx3.recv().unwrap();
1551         assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1552         t.join().unwrap();
1553     }
1554 
1555     // This bug used to end up in a livelock inside of the Receiver destructor
1556     // because the internal state of the Shared packet was corrupted
1557     #[test]
destroy_upgraded_shared_port_when_sender_still_active()1558     fn destroy_upgraded_shared_port_when_sender_still_active() {
1559         let (tx, rx) = sync_channel::<()>(0);
1560         let (tx2, rx2) = sync_channel::<()>(0);
1561         let t = thread::spawn(move || {
1562             rx.recv().unwrap(); // wait on a oneshot
1563             drop(rx); // destroy a shared
1564             tx2.send(()).unwrap();
1565         });
1566         // make sure the other thread has gone to sleep
1567         for _ in 0..5000 {
1568             thread::yield_now();
1569         }
1570 
1571         // upgrade to a shared chan and send a message
1572         let tx2 = tx.clone();
1573         drop(tx);
1574         tx2.send(()).unwrap();
1575 
1576         // wait for the child thread to exit before we exit
1577         rx2.recv().unwrap();
1578         t.join().unwrap();
1579     }
1580 
1581     #[test]
send1()1582     fn send1() {
1583         let (tx, rx) = sync_channel::<i32>(0);
1584         let t = thread::spawn(move || {
1585             rx.recv().unwrap();
1586         });
1587         assert_eq!(tx.send(1), Ok(()));
1588         t.join().unwrap();
1589     }
1590 
1591     #[test]
send2()1592     fn send2() {
1593         let (tx, rx) = sync_channel::<i32>(0);
1594         let t = thread::spawn(move || {
1595             drop(rx);
1596         });
1597         assert!(tx.send(1).is_err());
1598         t.join().unwrap();
1599     }
1600 
1601     #[test]
send3()1602     fn send3() {
1603         let (tx, rx) = sync_channel::<i32>(1);
1604         assert_eq!(tx.send(1), Ok(()));
1605         let t = thread::spawn(move || {
1606             drop(rx);
1607         });
1608         assert!(tx.send(1).is_err());
1609         t.join().unwrap();
1610     }
1611 
1612     #[test]
send4()1613     fn send4() {
1614         let (tx, rx) = sync_channel::<i32>(0);
1615         let tx2 = tx.clone();
1616         let (done, donerx) = channel();
1617         let done2 = done.clone();
1618         let t = thread::spawn(move || {
1619             assert!(tx.send(1).is_err());
1620             done.send(()).unwrap();
1621         });
1622         let t2 = thread::spawn(move || {
1623             assert!(tx2.send(2).is_err());
1624             done2.send(()).unwrap();
1625         });
1626         drop(rx);
1627         donerx.recv().unwrap();
1628         donerx.recv().unwrap();
1629         t.join().unwrap();
1630         t2.join().unwrap();
1631     }
1632 
1633     #[test]
try_send1()1634     fn try_send1() {
1635         let (tx, _rx) = sync_channel::<i32>(0);
1636         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
1637     }
1638 
1639     #[test]
try_send2()1640     fn try_send2() {
1641         let (tx, _rx) = sync_channel::<i32>(1);
1642         assert_eq!(tx.try_send(1), Ok(()));
1643         assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
1644     }
1645 
1646     #[test]
try_send3()1647     fn try_send3() {
1648         let (tx, rx) = sync_channel::<i32>(1);
1649         assert_eq!(tx.try_send(1), Ok(()));
1650         drop(rx);
1651         assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
1652     }
1653 
1654     #[test]
issue_15761()1655     fn issue_15761() {
1656         fn repro() {
1657             let (tx1, rx1) = sync_channel::<()>(3);
1658             let (tx2, rx2) = sync_channel::<()>(3);
1659 
1660             let _t = thread::spawn(move || {
1661                 rx1.recv().unwrap();
1662                 tx2.try_send(()).unwrap();
1663             });
1664 
1665             tx1.try_send(()).unwrap();
1666             rx2.recv().unwrap();
1667         }
1668 
1669         for _ in 0..100 {
1670             repro()
1671         }
1672     }
1673 }
1674 
1675 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/select.rs
1676 mod select_tests {
1677     use super::*;
1678 
1679     use std::thread;
1680 
1681     #[test]
smoke()1682     fn smoke() {
1683         let (tx1, rx1) = channel::<i32>();
1684         let (tx2, rx2) = channel::<i32>();
1685         tx1.send(1).unwrap();
1686         select! {
1687             foo = rx1.recv() => assert_eq!(foo.unwrap(), 1),
1688             _bar = rx2.recv() => panic!()
1689         }
1690         tx2.send(2).unwrap();
1691         select! {
1692             _foo = rx1.recv() => panic!(),
1693             bar = rx2.recv() => assert_eq!(bar.unwrap(), 2)
1694         }
1695         drop(tx1);
1696         select! {
1697             foo = rx1.recv() => assert!(foo.is_err()),
1698             _bar = rx2.recv() => panic!()
1699         }
1700         drop(tx2);
1701         select! {
1702             bar = rx2.recv() => assert!(bar.is_err())
1703         }
1704     }
1705 
1706     #[test]
smoke2()1707     fn smoke2() {
1708         let (_tx1, rx1) = channel::<i32>();
1709         let (_tx2, rx2) = channel::<i32>();
1710         let (_tx3, rx3) = channel::<i32>();
1711         let (_tx4, rx4) = channel::<i32>();
1712         let (tx5, rx5) = channel::<i32>();
1713         tx5.send(4).unwrap();
1714         select! {
1715             _foo = rx1.recv() => panic!("1"),
1716             _foo = rx2.recv() => panic!("2"),
1717             _foo = rx3.recv() => panic!("3"),
1718             _foo = rx4.recv() => panic!("4"),
1719             foo = rx5.recv() => assert_eq!(foo.unwrap(), 4)
1720         }
1721     }
1722 
1723     #[test]
closed()1724     fn closed() {
1725         let (_tx1, rx1) = channel::<i32>();
1726         let (tx2, rx2) = channel::<i32>();
1727         drop(tx2);
1728 
1729         select! {
1730             _a1 = rx1.recv() => panic!(),
1731             a2 = rx2.recv() => assert!(a2.is_err())
1732         }
1733     }
1734 
1735     #[test]
unblocks()1736     fn unblocks() {
1737         let (tx1, rx1) = channel::<i32>();
1738         let (_tx2, rx2) = channel::<i32>();
1739         let (tx3, rx3) = channel::<i32>();
1740 
1741         let t = thread::spawn(move || {
1742             for _ in 0..20 {
1743                 thread::yield_now();
1744             }
1745             tx1.send(1).unwrap();
1746             rx3.recv().unwrap();
1747             for _ in 0..20 {
1748                 thread::yield_now();
1749             }
1750         });
1751 
1752         select! {
1753             a = rx1.recv() => assert_eq!(a.unwrap(), 1),
1754             _b = rx2.recv() => panic!()
1755         }
1756         tx3.send(1).unwrap();
1757         select! {
1758             a = rx1.recv() => assert!(a.is_err()),
1759             _b = rx2.recv() => panic!()
1760         }
1761         t.join().unwrap();
1762     }
1763 
1764     #[test]
both_ready()1765     fn both_ready() {
1766         let (tx1, rx1) = channel::<i32>();
1767         let (tx2, rx2) = channel::<i32>();
1768         let (tx3, rx3) = channel::<()>();
1769 
1770         let t = thread::spawn(move || {
1771             for _ in 0..20 {
1772                 thread::yield_now();
1773             }
1774             tx1.send(1).unwrap();
1775             tx2.send(2).unwrap();
1776             rx3.recv().unwrap();
1777         });
1778 
1779         select! {
1780             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
1781             a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
1782         }
1783         select! {
1784             a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
1785             a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
1786         }
1787         assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1788         assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
1789         tx3.send(()).unwrap();
1790         t.join().unwrap();
1791     }
1792 
1793     #[test]
stress()1794     fn stress() {
1795         const AMT: i32 = 10000;
1796         let (tx1, rx1) = channel::<i32>();
1797         let (tx2, rx2) = channel::<i32>();
1798         let (tx3, rx3) = channel::<()>();
1799 
1800         let t = thread::spawn(move || {
1801             for i in 0..AMT {
1802                 if i % 2 == 0 {
1803                     tx1.send(i).unwrap();
1804                 } else {
1805                     tx2.send(i).unwrap();
1806                 }
1807                 rx3.recv().unwrap();
1808             }
1809         });
1810 
1811         for i in 0..AMT {
1812             select! {
1813                 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
1814                 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
1815             }
1816             tx3.send(()).unwrap();
1817         }
1818         t.join().unwrap();
1819     }
1820 
1821     #[allow(unused_must_use)]
1822     #[test]
cloning()1823     fn cloning() {
1824         let (tx1, rx1) = channel::<i32>();
1825         let (_tx2, rx2) = channel::<i32>();
1826         let (tx3, rx3) = channel::<()>();
1827 
1828         let t = thread::spawn(move || {
1829             rx3.recv().unwrap();
1830             tx1.clone();
1831             assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
1832             tx1.send(2).unwrap();
1833             rx3.recv().unwrap();
1834         });
1835 
1836         tx3.send(()).unwrap();
1837         select! {
1838             _i1 = rx1.recv() => {},
1839             _i2 = rx2.recv() => panic!()
1840         }
1841         tx3.send(()).unwrap();
1842         t.join().unwrap();
1843     }
1844 
1845     #[allow(unused_must_use)]
1846     #[test]
cloning2()1847     fn cloning2() {
1848         let (tx1, rx1) = channel::<i32>();
1849         let (_tx2, rx2) = channel::<i32>();
1850         let (tx3, rx3) = channel::<()>();
1851 
1852         let t = thread::spawn(move || {
1853             rx3.recv().unwrap();
1854             tx1.clone();
1855             assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
1856             tx1.send(2).unwrap();
1857             rx3.recv().unwrap();
1858         });
1859 
1860         tx3.send(()).unwrap();
1861         select! {
1862             _i1 = rx1.recv() => {},
1863             _i2 = rx2.recv() => panic!()
1864         }
1865         tx3.send(()).unwrap();
1866         t.join().unwrap();
1867     }
1868 
1869     #[test]
cloning3()1870     fn cloning3() {
1871         let (tx1, rx1) = channel::<()>();
1872         let (tx2, rx2) = channel::<()>();
1873         let (tx3, rx3) = channel::<()>();
1874         let t = thread::spawn(move || {
1875             select! {
1876                 _ = rx1.recv() => panic!(),
1877                 _ = rx2.recv() => {}
1878             }
1879             tx3.send(()).unwrap();
1880         });
1881 
1882         for _ in 0..1000 {
1883             thread::yield_now();
1884         }
1885         drop(tx1.clone());
1886         tx2.send(()).unwrap();
1887         rx3.recv().unwrap();
1888         t.join().unwrap();
1889     }
1890 
1891     #[test]
preflight1()1892     fn preflight1() {
1893         let (tx, rx) = channel();
1894         tx.send(()).unwrap();
1895         select! {
1896             _n = rx.recv() => {}
1897         }
1898     }
1899 
1900     #[test]
preflight2()1901     fn preflight2() {
1902         let (tx, rx) = channel();
1903         tx.send(()).unwrap();
1904         tx.send(()).unwrap();
1905         select! {
1906             _n = rx.recv() => {}
1907         }
1908     }
1909 
1910     #[test]
preflight3()1911     fn preflight3() {
1912         let (tx, rx) = channel();
1913         drop(tx.clone());
1914         tx.send(()).unwrap();
1915         select! {
1916             _n = rx.recv() => {}
1917         }
1918     }
1919 
1920     #[test]
preflight4()1921     fn preflight4() {
1922         let (tx, rx) = channel();
1923         tx.send(()).unwrap();
1924         select! {
1925             _ = rx.recv() => {}
1926         }
1927     }
1928 
1929     #[test]
preflight5()1930     fn preflight5() {
1931         let (tx, rx) = channel();
1932         tx.send(()).unwrap();
1933         tx.send(()).unwrap();
1934         select! {
1935             _ = rx.recv() => {}
1936         }
1937     }
1938 
1939     #[test]
preflight6()1940     fn preflight6() {
1941         let (tx, rx) = channel();
1942         drop(tx.clone());
1943         tx.send(()).unwrap();
1944         select! {
1945             _ = rx.recv() => {}
1946         }
1947     }
1948 
1949     #[test]
preflight7()1950     fn preflight7() {
1951         let (tx, rx) = channel::<()>();
1952         drop(tx);
1953         select! {
1954             _ = rx.recv() => {}
1955         }
1956     }
1957 
1958     #[test]
preflight8()1959     fn preflight8() {
1960         let (tx, rx) = channel();
1961         tx.send(()).unwrap();
1962         drop(tx);
1963         rx.recv().unwrap();
1964         select! {
1965             _ = rx.recv() => {}
1966         }
1967     }
1968 
1969     #[test]
preflight9()1970     fn preflight9() {
1971         let (tx, rx) = channel();
1972         drop(tx.clone());
1973         tx.send(()).unwrap();
1974         drop(tx);
1975         rx.recv().unwrap();
1976         select! {
1977             _ = rx.recv() => {}
1978         }
1979     }
1980 
1981     #[test]
oneshot_data_waiting()1982     fn oneshot_data_waiting() {
1983         let (tx1, rx1) = channel();
1984         let (tx2, rx2) = channel();
1985         let t = thread::spawn(move || {
1986             select! {
1987                 _n = rx1.recv() => {}
1988             }
1989             tx2.send(()).unwrap();
1990         });
1991 
1992         for _ in 0..100 {
1993             thread::yield_now()
1994         }
1995         tx1.send(()).unwrap();
1996         rx2.recv().unwrap();
1997         t.join().unwrap();
1998     }
1999 
2000     #[test]
stream_data_waiting()2001     fn stream_data_waiting() {
2002         let (tx1, rx1) = channel();
2003         let (tx2, rx2) = channel();
2004         tx1.send(()).unwrap();
2005         tx1.send(()).unwrap();
2006         rx1.recv().unwrap();
2007         rx1.recv().unwrap();
2008         let t = thread::spawn(move || {
2009             select! {
2010                 _n = rx1.recv() => {}
2011             }
2012             tx2.send(()).unwrap();
2013         });
2014 
2015         for _ in 0..100 {
2016             thread::yield_now()
2017         }
2018         tx1.send(()).unwrap();
2019         rx2.recv().unwrap();
2020         t.join().unwrap();
2021     }
2022 
2023     #[test]
shared_data_waiting()2024     fn shared_data_waiting() {
2025         let (tx1, rx1) = channel();
2026         let (tx2, rx2) = channel();
2027         drop(tx1.clone());
2028         tx1.send(()).unwrap();
2029         rx1.recv().unwrap();
2030         let t = thread::spawn(move || {
2031             select! {
2032                 _n = rx1.recv() => {}
2033             }
2034             tx2.send(()).unwrap();
2035         });
2036 
2037         for _ in 0..100 {
2038             thread::yield_now()
2039         }
2040         tx1.send(()).unwrap();
2041         rx2.recv().unwrap();
2042         t.join().unwrap();
2043     }
2044 
2045     #[test]
sync1()2046     fn sync1() {
2047         let (tx, rx) = sync_channel::<i32>(1);
2048         tx.send(1).unwrap();
2049         select! {
2050             n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
2051         }
2052     }
2053 
2054     #[test]
sync2()2055     fn sync2() {
2056         let (tx, rx) = sync_channel::<i32>(0);
2057         let t = thread::spawn(move || {
2058             for _ in 0..100 {
2059                 thread::yield_now()
2060             }
2061             tx.send(1).unwrap();
2062         });
2063         select! {
2064             n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
2065         }
2066         t.join().unwrap();
2067     }
2068 
2069     #[test]
sync3()2070     fn sync3() {
2071         let (tx1, rx1) = sync_channel::<i32>(0);
2072         let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel();
2073         let t = thread::spawn(move || {
2074             tx1.send(1).unwrap();
2075         });
2076         let t2 = thread::spawn(move || {
2077             tx2.send(2).unwrap();
2078         });
2079         select! {
2080             n = rx1.recv() => {
2081                 let n = n.unwrap();
2082                 assert_eq!(n, 1);
2083                 assert_eq!(rx2.recv().unwrap(), 2);
2084             },
2085             n = rx2.recv() => {
2086                 let n = n.unwrap();
2087                 assert_eq!(n, 2);
2088                 assert_eq!(rx1.recv().unwrap(), 1);
2089             }
2090         }
2091         t.join().unwrap();
2092         t2.join().unwrap();
2093     }
2094 }
2095