1 //! Tests copied from `std::sync::mpsc`.
2 //!
3 //! This is a copy of tests for the `std::sync::mpsc` channels from the standard library, but
4 //! modified to work with `crossbeam-channel` instead.
5 //!
6 //! Minor tweaks were needed to make the tests compile:
7 //!
8 //! - Replace `box` syntax with `Box::new`.
9 //! - Replace all uses of `Select` with `select!`.
10 //! - Change the imports.
11 //! - Join all spawned threads.
12 //! - Removed assertion from oneshot_multi_thread_send_close_stress tests.
13 //!
14 //! Source:
15 //! - https://github.com/rust-lang/rust/tree/master/src/libstd/sync/mpsc
16 //!
17 //! Copyright & License:
18 //! - Copyright 2013-2014 The Rust Project Developers
19 //! - Apache License, Version 2.0 or MIT license, at your option
20 //! - https://github.com/rust-lang/rust/blob/master/COPYRIGHT
21 //! - https://www.rust-lang.org/en-US/legal.html
22
23 use std::sync::mpsc::{RecvError, RecvTimeoutError, TryRecvError};
24 use std::sync::mpsc::{SendError, TrySendError};
25 use std::thread::JoinHandle;
26 use std::time::Duration;
27
28 use crossbeam_channel as cc;
29
30 pub struct Sender<T> {
31 pub inner: cc::Sender<T>,
32 }
33
34 impl<T> Sender<T> {
send(&self, t: T) -> Result<(), SendError<T>>35 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
36 self.inner.send(t).map_err(|cc::SendError(m)| SendError(m))
37 }
38 }
39
40 impl<T> Clone for Sender<T> {
clone(&self) -> Sender<T>41 fn clone(&self) -> Sender<T> {
42 Sender {
43 inner: self.inner.clone(),
44 }
45 }
46 }
47
48 pub struct SyncSender<T> {
49 pub inner: cc::Sender<T>,
50 }
51
52 impl<T> SyncSender<T> {
send(&self, t: T) -> Result<(), SendError<T>>53 pub fn send(&self, t: T) -> Result<(), SendError<T>> {
54 self.inner.send(t).map_err(|cc::SendError(m)| SendError(m))
55 }
56
try_send(&self, t: T) -> Result<(), TrySendError<T>>57 pub fn try_send(&self, t: T) -> Result<(), TrySendError<T>> {
58 self.inner.try_send(t).map_err(|err| match err {
59 cc::TrySendError::Full(m) => TrySendError::Full(m),
60 cc::TrySendError::Disconnected(m) => TrySendError::Disconnected(m),
61 })
62 }
63 }
64
65 impl<T> Clone for SyncSender<T> {
clone(&self) -> SyncSender<T>66 fn clone(&self) -> SyncSender<T> {
67 SyncSender {
68 inner: self.inner.clone(),
69 }
70 }
71 }
72
73 pub struct Receiver<T> {
74 pub inner: cc::Receiver<T>,
75 }
76
77 impl<T> Receiver<T> {
try_recv(&self) -> Result<T, TryRecvError>78 pub fn try_recv(&self) -> Result<T, TryRecvError> {
79 self.inner.try_recv().map_err(|err| match err {
80 cc::TryRecvError::Empty => TryRecvError::Empty,
81 cc::TryRecvError::Disconnected => TryRecvError::Disconnected,
82 })
83 }
84
recv(&self) -> Result<T, RecvError>85 pub fn recv(&self) -> Result<T, RecvError> {
86 self.inner.recv().map_err(|_| RecvError)
87 }
88
recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError>89 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
90 self.inner.recv_timeout(timeout).map_err(|err| match err {
91 cc::RecvTimeoutError::Timeout => RecvTimeoutError::Timeout,
92 cc::RecvTimeoutError::Disconnected => RecvTimeoutError::Disconnected,
93 })
94 }
95
iter(&self) -> Iter<T>96 pub fn iter(&self) -> Iter<T> {
97 Iter { inner: self }
98 }
99
try_iter(&self) -> TryIter<T>100 pub fn try_iter(&self) -> TryIter<T> {
101 TryIter { inner: self }
102 }
103 }
104
105 impl<'a, T> IntoIterator for &'a Receiver<T> {
106 type Item = T;
107 type IntoIter = Iter<'a, T>;
108
into_iter(self) -> Iter<'a, T>109 fn into_iter(self) -> Iter<'a, T> {
110 self.iter()
111 }
112 }
113
114 impl<T> IntoIterator for Receiver<T> {
115 type Item = T;
116 type IntoIter = IntoIter<T>;
117
into_iter(self) -> IntoIter<T>118 fn into_iter(self) -> IntoIter<T> {
119 IntoIter { inner: self }
120 }
121 }
122
123 pub struct TryIter<'a, T: 'a> {
124 inner: &'a Receiver<T>,
125 }
126
127 impl<'a, T> Iterator for TryIter<'a, T> {
128 type Item = T;
129
next(&mut self) -> Option<T>130 fn next(&mut self) -> Option<T> {
131 self.inner.try_recv().ok()
132 }
133 }
134
135 pub struct Iter<'a, T: 'a> {
136 inner: &'a Receiver<T>,
137 }
138
139 impl<'a, T> Iterator for Iter<'a, T> {
140 type Item = T;
141
next(&mut self) -> Option<T>142 fn next(&mut self) -> Option<T> {
143 self.inner.recv().ok()
144 }
145 }
146
147 pub struct IntoIter<T> {
148 inner: Receiver<T>,
149 }
150
151 impl<T> Iterator for IntoIter<T> {
152 type Item = T;
153
next(&mut self) -> Option<T>154 fn next(&mut self) -> Option<T> {
155 self.inner.recv().ok()
156 }
157 }
158
channel<T>() -> (Sender<T>, Receiver<T>)159 pub fn channel<T>() -> (Sender<T>, Receiver<T>) {
160 let (s, r) = cc::unbounded();
161 let s = Sender { inner: s };
162 let r = Receiver { inner: r };
163 (s, r)
164 }
165
sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>)166 pub fn sync_channel<T>(bound: usize) -> (SyncSender<T>, Receiver<T>) {
167 let (s, r) = cc::bounded(bound);
168 let s = SyncSender { inner: s };
169 let r = Receiver { inner: r };
170 (s, r)
171 }
172
173 macro_rules! select {
174 (
175 $($name:pat = $rx:ident.$meth:ident() => $code:expr),+
176 ) => ({
177 cc::crossbeam_channel_internal! {
178 $(
179 recv(($rx).inner) -> res => {
180 let $name = res.map_err(|_| ::std::sync::mpsc::RecvError);
181 $code
182 }
183 )+
184 }
185 })
186 }
187
188 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs
189 mod channel_tests {
190 use super::*;
191
192 use std::env;
193 use std::thread;
194 use std::time::{Duration, Instant};
195
stress_factor() -> usize196 pub fn stress_factor() -> usize {
197 match env::var("RUST_TEST_STRESS") {
198 Ok(val) => val.parse().unwrap(),
199 Err(..) => 1,
200 }
201 }
202
203 #[test]
smoke()204 fn smoke() {
205 let (tx, rx) = channel::<i32>();
206 tx.send(1).unwrap();
207 assert_eq!(rx.recv().unwrap(), 1);
208 }
209
210 #[test]
drop_full()211 fn drop_full() {
212 let (tx, _rx) = channel::<Box<isize>>();
213 tx.send(Box::new(1)).unwrap();
214 }
215
216 #[test]
drop_full_shared()217 fn drop_full_shared() {
218 let (tx, _rx) = channel::<Box<isize>>();
219 drop(tx.clone());
220 drop(tx.clone());
221 tx.send(Box::new(1)).unwrap();
222 }
223
224 #[test]
smoke_shared()225 fn smoke_shared() {
226 let (tx, rx) = channel::<i32>();
227 tx.send(1).unwrap();
228 assert_eq!(rx.recv().unwrap(), 1);
229 let tx = tx.clone();
230 tx.send(1).unwrap();
231 assert_eq!(rx.recv().unwrap(), 1);
232 }
233
234 #[test]
smoke_threads()235 fn smoke_threads() {
236 let (tx, rx) = channel::<i32>();
237 let t = thread::spawn(move || {
238 tx.send(1).unwrap();
239 });
240 assert_eq!(rx.recv().unwrap(), 1);
241 t.join().unwrap();
242 }
243
244 #[test]
smoke_port_gone()245 fn smoke_port_gone() {
246 let (tx, rx) = channel::<i32>();
247 drop(rx);
248 assert!(tx.send(1).is_err());
249 }
250
251 #[test]
smoke_shared_port_gone()252 fn smoke_shared_port_gone() {
253 let (tx, rx) = channel::<i32>();
254 drop(rx);
255 assert!(tx.send(1).is_err())
256 }
257
258 #[test]
smoke_shared_port_gone2()259 fn smoke_shared_port_gone2() {
260 let (tx, rx) = channel::<i32>();
261 drop(rx);
262 let tx2 = tx.clone();
263 drop(tx);
264 assert!(tx2.send(1).is_err());
265 }
266
267 #[test]
port_gone_concurrent()268 fn port_gone_concurrent() {
269 let (tx, rx) = channel::<i32>();
270 let t = thread::spawn(move || {
271 rx.recv().unwrap();
272 });
273 while tx.send(1).is_ok() {}
274 t.join().unwrap();
275 }
276
277 #[test]
port_gone_concurrent_shared()278 fn port_gone_concurrent_shared() {
279 let (tx, rx) = channel::<i32>();
280 let tx2 = tx.clone();
281 let t = thread::spawn(move || {
282 rx.recv().unwrap();
283 });
284 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
285 t.join().unwrap();
286 }
287
288 #[test]
smoke_chan_gone()289 fn smoke_chan_gone() {
290 let (tx, rx) = channel::<i32>();
291 drop(tx);
292 assert!(rx.recv().is_err());
293 }
294
295 #[test]
smoke_chan_gone_shared()296 fn smoke_chan_gone_shared() {
297 let (tx, rx) = channel::<()>();
298 let tx2 = tx.clone();
299 drop(tx);
300 drop(tx2);
301 assert!(rx.recv().is_err());
302 }
303
304 #[test]
chan_gone_concurrent()305 fn chan_gone_concurrent() {
306 let (tx, rx) = channel::<i32>();
307 let t = thread::spawn(move || {
308 tx.send(1).unwrap();
309 tx.send(1).unwrap();
310 });
311 while rx.recv().is_ok() {}
312 t.join().unwrap();
313 }
314
315 #[test]
stress()316 fn stress() {
317 let (tx, rx) = channel::<i32>();
318 let t = thread::spawn(move || {
319 for _ in 0..10000 {
320 tx.send(1).unwrap();
321 }
322 });
323 for _ in 0..10000 {
324 assert_eq!(rx.recv().unwrap(), 1);
325 }
326 t.join().ok().unwrap();
327 }
328
329 #[test]
stress_shared()330 fn stress_shared() {
331 const AMT: u32 = 10000;
332 const NTHREADS: u32 = 8;
333 let (tx, rx) = channel::<i32>();
334
335 let t = thread::spawn(move || {
336 for _ in 0..AMT * NTHREADS {
337 assert_eq!(rx.recv().unwrap(), 1);
338 }
339 match rx.try_recv() {
340 Ok(..) => panic!(),
341 _ => {}
342 }
343 });
344
345 let mut ts = Vec::with_capacity(NTHREADS as usize);
346 for _ in 0..NTHREADS {
347 let tx = tx.clone();
348 let t = thread::spawn(move || {
349 for _ in 0..AMT {
350 tx.send(1).unwrap();
351 }
352 });
353 ts.push(t);
354 }
355 drop(tx);
356 t.join().ok().unwrap();
357 for t in ts {
358 t.join().unwrap();
359 }
360 }
361
362 #[test]
send_from_outside_runtime()363 fn send_from_outside_runtime() {
364 let (tx1, rx1) = channel::<()>();
365 let (tx2, rx2) = channel::<i32>();
366 let t1 = thread::spawn(move || {
367 tx1.send(()).unwrap();
368 for _ in 0..40 {
369 assert_eq!(rx2.recv().unwrap(), 1);
370 }
371 });
372 rx1.recv().unwrap();
373 let t2 = thread::spawn(move || {
374 for _ in 0..40 {
375 tx2.send(1).unwrap();
376 }
377 });
378 t1.join().ok().unwrap();
379 t2.join().ok().unwrap();
380 }
381
382 #[test]
recv_from_outside_runtime()383 fn recv_from_outside_runtime() {
384 let (tx, rx) = channel::<i32>();
385 let t = thread::spawn(move || {
386 for _ in 0..40 {
387 assert_eq!(rx.recv().unwrap(), 1);
388 }
389 });
390 for _ in 0..40 {
391 tx.send(1).unwrap();
392 }
393 t.join().ok().unwrap();
394 }
395
396 #[test]
no_runtime()397 fn no_runtime() {
398 let (tx1, rx1) = channel::<i32>();
399 let (tx2, rx2) = channel::<i32>();
400 let t1 = thread::spawn(move || {
401 assert_eq!(rx1.recv().unwrap(), 1);
402 tx2.send(2).unwrap();
403 });
404 let t2 = thread::spawn(move || {
405 tx1.send(1).unwrap();
406 assert_eq!(rx2.recv().unwrap(), 2);
407 });
408 t1.join().ok().unwrap();
409 t2.join().ok().unwrap();
410 }
411
412 #[test]
oneshot_single_thread_close_port_first()413 fn oneshot_single_thread_close_port_first() {
414 // Simple test of closing without sending
415 let (_tx, rx) = channel::<i32>();
416 drop(rx);
417 }
418
419 #[test]
oneshot_single_thread_close_chan_first()420 fn oneshot_single_thread_close_chan_first() {
421 // Simple test of closing without sending
422 let (tx, _rx) = channel::<i32>();
423 drop(tx);
424 }
425
426 #[test]
oneshot_single_thread_send_port_close()427 fn oneshot_single_thread_send_port_close() {
428 // Testing that the sender cleans up the payload if receiver is closed
429 let (tx, rx) = channel::<Box<i32>>();
430 drop(rx);
431 assert!(tx.send(Box::new(0)).is_err());
432 }
433
434 #[test]
oneshot_single_thread_recv_chan_close()435 fn oneshot_single_thread_recv_chan_close() {
436 let (tx, rx) = channel::<i32>();
437 drop(tx);
438 assert_eq!(rx.recv(), Err(RecvError));
439 }
440
441 #[test]
oneshot_single_thread_send_then_recv()442 fn oneshot_single_thread_send_then_recv() {
443 let (tx, rx) = channel::<Box<i32>>();
444 tx.send(Box::new(10)).unwrap();
445 assert!(*rx.recv().unwrap() == 10);
446 }
447
448 #[test]
oneshot_single_thread_try_send_open()449 fn oneshot_single_thread_try_send_open() {
450 let (tx, rx) = channel::<i32>();
451 assert!(tx.send(10).is_ok());
452 assert!(rx.recv().unwrap() == 10);
453 }
454
455 #[test]
oneshot_single_thread_try_send_closed()456 fn oneshot_single_thread_try_send_closed() {
457 let (tx, rx) = channel::<i32>();
458 drop(rx);
459 assert!(tx.send(10).is_err());
460 }
461
462 #[test]
oneshot_single_thread_try_recv_open()463 fn oneshot_single_thread_try_recv_open() {
464 let (tx, rx) = channel::<i32>();
465 tx.send(10).unwrap();
466 assert!(rx.recv() == Ok(10));
467 }
468
469 #[test]
oneshot_single_thread_try_recv_closed()470 fn oneshot_single_thread_try_recv_closed() {
471 let (tx, rx) = channel::<i32>();
472 drop(tx);
473 assert!(rx.recv().is_err());
474 }
475
476 #[test]
oneshot_single_thread_peek_data()477 fn oneshot_single_thread_peek_data() {
478 let (tx, rx) = channel::<i32>();
479 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
480 tx.send(10).unwrap();
481 assert_eq!(rx.try_recv(), Ok(10));
482 }
483
484 #[test]
oneshot_single_thread_peek_close()485 fn oneshot_single_thread_peek_close() {
486 let (tx, rx) = channel::<i32>();
487 drop(tx);
488 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
489 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
490 }
491
492 #[test]
oneshot_single_thread_peek_open()493 fn oneshot_single_thread_peek_open() {
494 let (_tx, rx) = channel::<i32>();
495 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
496 }
497
498 #[test]
oneshot_multi_task_recv_then_send()499 fn oneshot_multi_task_recv_then_send() {
500 let (tx, rx) = channel::<Box<i32>>();
501 let t = thread::spawn(move || {
502 assert!(*rx.recv().unwrap() == 10);
503 });
504
505 tx.send(Box::new(10)).unwrap();
506 t.join().unwrap();
507 }
508
509 #[test]
oneshot_multi_task_recv_then_close()510 fn oneshot_multi_task_recv_then_close() {
511 let (tx, rx) = channel::<Box<i32>>();
512 let t = thread::spawn(move || {
513 drop(tx);
514 });
515 thread::spawn(move || {
516 assert_eq!(rx.recv(), Err(RecvError));
517 })
518 .join()
519 .unwrap();
520 t.join().unwrap();
521 }
522
523 #[test]
oneshot_multi_thread_close_stress()524 fn oneshot_multi_thread_close_stress() {
525 let stress_factor = stress_factor();
526 let mut ts = Vec::with_capacity(stress_factor);
527 for _ in 0..stress_factor {
528 let (tx, rx) = channel::<i32>();
529 let t = thread::spawn(move || {
530 drop(rx);
531 });
532 ts.push(t);
533 drop(tx);
534 }
535 for t in ts {
536 t.join().unwrap();
537 }
538 }
539
540 #[test]
oneshot_multi_thread_send_close_stress()541 fn oneshot_multi_thread_send_close_stress() {
542 let stress_factor = stress_factor();
543 let mut ts = Vec::with_capacity(2 * stress_factor);
544 for _ in 0..stress_factor {
545 let (tx, rx) = channel::<i32>();
546 let t = thread::spawn(move || {
547 drop(rx);
548 });
549 ts.push(t);
550 thread::spawn(move || {
551 let _ = tx.send(1);
552 })
553 .join()
554 .unwrap();
555 }
556 for t in ts {
557 t.join().unwrap();
558 }
559 }
560
561 #[test]
oneshot_multi_thread_recv_close_stress()562 fn oneshot_multi_thread_recv_close_stress() {
563 let stress_factor = stress_factor();
564 let mut ts = Vec::with_capacity(2 * stress_factor);
565 for _ in 0..stress_factor {
566 let (tx, rx) = channel::<i32>();
567 let t = thread::spawn(move || {
568 thread::spawn(move || {
569 assert_eq!(rx.recv(), Err(RecvError));
570 })
571 .join()
572 .unwrap();
573 });
574 ts.push(t);
575 let t2 = thread::spawn(move || {
576 let t = thread::spawn(move || {
577 drop(tx);
578 });
579 t.join().unwrap();
580 });
581 ts.push(t2);
582 }
583 for t in ts {
584 t.join().unwrap();
585 }
586 }
587
588 #[test]
oneshot_multi_thread_send_recv_stress()589 fn oneshot_multi_thread_send_recv_stress() {
590 let stress_factor = stress_factor();
591 let mut ts = Vec::with_capacity(stress_factor);
592 for _ in 0..stress_factor {
593 let (tx, rx) = channel::<Box<isize>>();
594 let t = thread::spawn(move || {
595 tx.send(Box::new(10)).unwrap();
596 });
597 ts.push(t);
598 assert!(*rx.recv().unwrap() == 10);
599 }
600 for t in ts {
601 t.join().unwrap();
602 }
603 }
604
605 #[test]
stream_send_recv_stress()606 fn stream_send_recv_stress() {
607 let stress_factor = stress_factor();
608 let mut ts = Vec::with_capacity(2 * stress_factor);
609 for _ in 0..stress_factor {
610 let (tx, rx) = channel();
611
612 if let Some(t) = send(tx, 0) {
613 ts.push(t);
614 }
615 if let Some(t2) = recv(rx, 0) {
616 ts.push(t2);
617 }
618
619 fn send(tx: Sender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
620 if i == 10 {
621 return None;
622 }
623
624 Some(thread::spawn(move || {
625 tx.send(Box::new(i)).unwrap();
626 send(tx, i + 1);
627 }))
628 }
629
630 fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
631 if i == 10 {
632 return None;
633 }
634
635 Some(thread::spawn(move || {
636 assert!(*rx.recv().unwrap() == i);
637 recv(rx, i + 1);
638 }))
639 }
640 }
641 for t in ts {
642 t.join().unwrap();
643 }
644 }
645
646 #[test]
oneshot_single_thread_recv_timeout()647 fn oneshot_single_thread_recv_timeout() {
648 let (tx, rx) = channel();
649 tx.send(()).unwrap();
650 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
651 assert_eq!(
652 rx.recv_timeout(Duration::from_millis(1)),
653 Err(RecvTimeoutError::Timeout)
654 );
655 tx.send(()).unwrap();
656 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
657 }
658
659 #[test]
stress_recv_timeout_two_threads()660 fn stress_recv_timeout_two_threads() {
661 let (tx, rx) = channel();
662 let stress = stress_factor() + 100;
663 let timeout = Duration::from_millis(100);
664
665 let t = thread::spawn(move || {
666 for i in 0..stress {
667 if i % 2 == 0 {
668 thread::sleep(timeout * 2);
669 }
670 tx.send(1usize).unwrap();
671 }
672 });
673
674 let mut recv_count = 0;
675 loop {
676 match rx.recv_timeout(timeout) {
677 Ok(n) => {
678 assert_eq!(n, 1usize);
679 recv_count += 1;
680 }
681 Err(RecvTimeoutError::Timeout) => continue,
682 Err(RecvTimeoutError::Disconnected) => break,
683 }
684 }
685
686 assert_eq!(recv_count, stress);
687 t.join().unwrap()
688 }
689
690 #[test]
recv_timeout_upgrade()691 fn recv_timeout_upgrade() {
692 let (tx, rx) = channel::<()>();
693 let timeout = Duration::from_millis(1);
694 let _tx_clone = tx.clone();
695
696 let start = Instant::now();
697 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
698 assert!(Instant::now() >= start + timeout);
699 }
700
701 #[test]
stress_recv_timeout_shared()702 fn stress_recv_timeout_shared() {
703 let (tx, rx) = channel();
704 let stress = stress_factor() + 100;
705
706 let mut ts = Vec::with_capacity(stress);
707 for i in 0..stress {
708 let tx = tx.clone();
709 let t = thread::spawn(move || {
710 thread::sleep(Duration::from_millis(i as u64 * 10));
711 tx.send(1usize).unwrap();
712 });
713 ts.push(t);
714 }
715
716 drop(tx);
717
718 let mut recv_count = 0;
719 loop {
720 match rx.recv_timeout(Duration::from_millis(10)) {
721 Ok(n) => {
722 assert_eq!(n, 1usize);
723 recv_count += 1;
724 }
725 Err(RecvTimeoutError::Timeout) => continue,
726 Err(RecvTimeoutError::Disconnected) => break,
727 }
728 }
729
730 assert_eq!(recv_count, stress);
731 for t in ts {
732 t.join().unwrap();
733 }
734 }
735
736 #[test]
recv_a_lot()737 fn recv_a_lot() {
738 // Regression test that we don't run out of stack in scheduler context
739 let (tx, rx) = channel();
740 for _ in 0..10000 {
741 tx.send(()).unwrap();
742 }
743 for _ in 0..10000 {
744 rx.recv().unwrap();
745 }
746 }
747
748 #[test]
shared_recv_timeout()749 fn shared_recv_timeout() {
750 let (tx, rx) = channel();
751 let total = 5;
752 let mut ts = Vec::with_capacity(total);
753 for _ in 0..total {
754 let tx = tx.clone();
755 let t = thread::spawn(move || {
756 tx.send(()).unwrap();
757 });
758 ts.push(t);
759 }
760
761 for _ in 0..total {
762 rx.recv().unwrap();
763 }
764
765 assert_eq!(
766 rx.recv_timeout(Duration::from_millis(1)),
767 Err(RecvTimeoutError::Timeout)
768 );
769 tx.send(()).unwrap();
770 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(()));
771 for t in ts {
772 t.join().unwrap();
773 }
774 }
775
776 #[test]
shared_chan_stress()777 fn shared_chan_stress() {
778 let (tx, rx) = channel();
779 let total = stress_factor() + 100;
780 let mut ts = Vec::with_capacity(total);
781 for _ in 0..total {
782 let tx = tx.clone();
783 let t = thread::spawn(move || {
784 tx.send(()).unwrap();
785 });
786 ts.push(t);
787 }
788
789 for _ in 0..total {
790 rx.recv().unwrap();
791 }
792 for t in ts {
793 t.join().unwrap();
794 }
795 }
796
797 #[test]
test_nested_recv_iter()798 fn test_nested_recv_iter() {
799 let (tx, rx) = channel::<i32>();
800 let (total_tx, total_rx) = channel::<i32>();
801
802 let t = thread::spawn(move || {
803 let mut acc = 0;
804 for x in rx.iter() {
805 acc += x;
806 }
807 total_tx.send(acc).unwrap();
808 });
809
810 tx.send(3).unwrap();
811 tx.send(1).unwrap();
812 tx.send(2).unwrap();
813 drop(tx);
814 assert_eq!(total_rx.recv().unwrap(), 6);
815 t.join().unwrap();
816 }
817
818 #[test]
test_recv_iter_break()819 fn test_recv_iter_break() {
820 let (tx, rx) = channel::<i32>();
821 let (count_tx, count_rx) = channel();
822
823 let t = thread::spawn(move || {
824 let mut count = 0;
825 for x in rx.iter() {
826 if count >= 3 {
827 break;
828 } else {
829 count += x;
830 }
831 }
832 count_tx.send(count).unwrap();
833 });
834
835 tx.send(2).unwrap();
836 tx.send(2).unwrap();
837 tx.send(2).unwrap();
838 let _ = tx.send(2);
839 drop(tx);
840 assert_eq!(count_rx.recv().unwrap(), 4);
841 t.join().unwrap();
842 }
843
844 #[test]
test_recv_try_iter()845 fn test_recv_try_iter() {
846 let (request_tx, request_rx) = channel();
847 let (response_tx, response_rx) = channel();
848
849 // Request `x`s until we have `6`.
850 let t = thread::spawn(move || {
851 let mut count = 0;
852 loop {
853 for x in response_rx.try_iter() {
854 count += x;
855 if count == 6 {
856 return count;
857 }
858 }
859 request_tx.send(()).unwrap();
860 }
861 });
862
863 for _ in request_rx.iter() {
864 if response_tx.send(2).is_err() {
865 break;
866 }
867 }
868
869 assert_eq!(t.join().unwrap(), 6);
870 }
871
872 #[test]
test_recv_into_iter_owned()873 fn test_recv_into_iter_owned() {
874 let mut iter = {
875 let (tx, rx) = channel::<i32>();
876 tx.send(1).unwrap();
877 tx.send(2).unwrap();
878
879 rx.into_iter()
880 };
881 assert_eq!(iter.next().unwrap(), 1);
882 assert_eq!(iter.next().unwrap(), 2);
883 assert_eq!(iter.next().is_none(), true);
884 }
885
886 #[test]
test_recv_into_iter_borrowed()887 fn test_recv_into_iter_borrowed() {
888 let (tx, rx) = channel::<i32>();
889 tx.send(1).unwrap();
890 tx.send(2).unwrap();
891 drop(tx);
892 let mut iter = (&rx).into_iter();
893 assert_eq!(iter.next().unwrap(), 1);
894 assert_eq!(iter.next().unwrap(), 2);
895 assert_eq!(iter.next().is_none(), true);
896 }
897
898 #[test]
try_recv_states()899 fn try_recv_states() {
900 let (tx1, rx1) = channel::<i32>();
901 let (tx2, rx2) = channel::<()>();
902 let (tx3, rx3) = channel::<()>();
903 let t = thread::spawn(move || {
904 rx2.recv().unwrap();
905 tx1.send(1).unwrap();
906 tx3.send(()).unwrap();
907 rx2.recv().unwrap();
908 drop(tx1);
909 tx3.send(()).unwrap();
910 });
911
912 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
913 tx2.send(()).unwrap();
914 rx3.recv().unwrap();
915 assert_eq!(rx1.try_recv(), Ok(1));
916 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
917 tx2.send(()).unwrap();
918 rx3.recv().unwrap();
919 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
920 t.join().unwrap();
921 }
922
923 // This bug used to end up in a livelock inside of the Receiver destructor
924 // because the internal state of the Shared packet was corrupted
925 #[test]
destroy_upgraded_shared_port_when_sender_still_active()926 fn destroy_upgraded_shared_port_when_sender_still_active() {
927 let (tx, rx) = channel();
928 let (tx2, rx2) = channel();
929 let t = thread::spawn(move || {
930 rx.recv().unwrap(); // wait on a oneshot
931 drop(rx); // destroy a shared
932 tx2.send(()).unwrap();
933 });
934 // make sure the other thread has gone to sleep
935 for _ in 0..5000 {
936 thread::yield_now();
937 }
938
939 // upgrade to a shared chan and send a message
940 let tx2 = tx.clone();
941 drop(tx);
942 tx2.send(()).unwrap();
943
944 // wait for the child thread to exit before we exit
945 rx2.recv().unwrap();
946 t.join().unwrap();
947 }
948
949 #[test]
issue_32114()950 fn issue_32114() {
951 let (tx, _) = channel();
952 let _ = tx.send(123);
953 assert_eq!(tx.send(123), Err(SendError(123)));
954 }
955 }
956
957 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/mod.rs
958 mod sync_channel_tests {
959 use super::*;
960
961 use std::env;
962 use std::thread;
963 use std::time::Duration;
964
stress_factor() -> usize965 pub fn stress_factor() -> usize {
966 match env::var("RUST_TEST_STRESS") {
967 Ok(val) => val.parse().unwrap(),
968 Err(..) => 1,
969 }
970 }
971
972 #[test]
smoke()973 fn smoke() {
974 let (tx, rx) = sync_channel::<i32>(1);
975 tx.send(1).unwrap();
976 assert_eq!(rx.recv().unwrap(), 1);
977 }
978
979 #[test]
drop_full()980 fn drop_full() {
981 let (tx, _rx) = sync_channel::<Box<isize>>(1);
982 tx.send(Box::new(1)).unwrap();
983 }
984
985 #[test]
smoke_shared()986 fn smoke_shared() {
987 let (tx, rx) = sync_channel::<i32>(1);
988 tx.send(1).unwrap();
989 assert_eq!(rx.recv().unwrap(), 1);
990 let tx = tx.clone();
991 tx.send(1).unwrap();
992 assert_eq!(rx.recv().unwrap(), 1);
993 }
994
995 #[test]
recv_timeout()996 fn recv_timeout() {
997 let (tx, rx) = sync_channel::<i32>(1);
998 assert_eq!(
999 rx.recv_timeout(Duration::from_millis(1)),
1000 Err(RecvTimeoutError::Timeout)
1001 );
1002 tx.send(1).unwrap();
1003 assert_eq!(rx.recv_timeout(Duration::from_millis(1)), Ok(1));
1004 }
1005
1006 #[test]
smoke_threads()1007 fn smoke_threads() {
1008 let (tx, rx) = sync_channel::<i32>(0);
1009 let t = thread::spawn(move || {
1010 tx.send(1).unwrap();
1011 });
1012 assert_eq!(rx.recv().unwrap(), 1);
1013 t.join().unwrap();
1014 }
1015
1016 #[test]
smoke_port_gone()1017 fn smoke_port_gone() {
1018 let (tx, rx) = sync_channel::<i32>(0);
1019 drop(rx);
1020 assert!(tx.send(1).is_err());
1021 }
1022
1023 #[test]
smoke_shared_port_gone2()1024 fn smoke_shared_port_gone2() {
1025 let (tx, rx) = sync_channel::<i32>(0);
1026 drop(rx);
1027 let tx2 = tx.clone();
1028 drop(tx);
1029 assert!(tx2.send(1).is_err());
1030 }
1031
1032 #[test]
port_gone_concurrent()1033 fn port_gone_concurrent() {
1034 let (tx, rx) = sync_channel::<i32>(0);
1035 let t = thread::spawn(move || {
1036 rx.recv().unwrap();
1037 });
1038 while tx.send(1).is_ok() {}
1039 t.join().unwrap();
1040 }
1041
1042 #[test]
port_gone_concurrent_shared()1043 fn port_gone_concurrent_shared() {
1044 let (tx, rx) = sync_channel::<i32>(0);
1045 let tx2 = tx.clone();
1046 let t = thread::spawn(move || {
1047 rx.recv().unwrap();
1048 });
1049 while tx.send(1).is_ok() && tx2.send(1).is_ok() {}
1050 t.join().unwrap();
1051 }
1052
1053 #[test]
smoke_chan_gone()1054 fn smoke_chan_gone() {
1055 let (tx, rx) = sync_channel::<i32>(0);
1056 drop(tx);
1057 assert!(rx.recv().is_err());
1058 }
1059
1060 #[test]
smoke_chan_gone_shared()1061 fn smoke_chan_gone_shared() {
1062 let (tx, rx) = sync_channel::<()>(0);
1063 let tx2 = tx.clone();
1064 drop(tx);
1065 drop(tx2);
1066 assert!(rx.recv().is_err());
1067 }
1068
1069 #[test]
chan_gone_concurrent()1070 fn chan_gone_concurrent() {
1071 let (tx, rx) = sync_channel::<i32>(0);
1072 let t = thread::spawn(move || {
1073 tx.send(1).unwrap();
1074 tx.send(1).unwrap();
1075 });
1076 while rx.recv().is_ok() {}
1077 t.join().unwrap();
1078 }
1079
1080 #[test]
stress()1081 fn stress() {
1082 let (tx, rx) = sync_channel::<i32>(0);
1083 let t = thread::spawn(move || {
1084 for _ in 0..10000 {
1085 tx.send(1).unwrap();
1086 }
1087 });
1088 for _ in 0..10000 {
1089 assert_eq!(rx.recv().unwrap(), 1);
1090 }
1091 t.join().unwrap();
1092 }
1093
1094 #[test]
stress_recv_timeout_two_threads()1095 fn stress_recv_timeout_two_threads() {
1096 let (tx, rx) = sync_channel::<i32>(0);
1097
1098 let t = thread::spawn(move || {
1099 for _ in 0..10000 {
1100 tx.send(1).unwrap();
1101 }
1102 });
1103
1104 let mut recv_count = 0;
1105 loop {
1106 match rx.recv_timeout(Duration::from_millis(1)) {
1107 Ok(v) => {
1108 assert_eq!(v, 1);
1109 recv_count += 1;
1110 }
1111 Err(RecvTimeoutError::Timeout) => continue,
1112 Err(RecvTimeoutError::Disconnected) => break,
1113 }
1114 }
1115
1116 assert_eq!(recv_count, 10000);
1117 t.join().unwrap();
1118 }
1119
1120 #[test]
stress_recv_timeout_shared()1121 fn stress_recv_timeout_shared() {
1122 const AMT: u32 = 1000;
1123 const NTHREADS: u32 = 8;
1124 let (tx, rx) = sync_channel::<i32>(0);
1125 let (dtx, drx) = sync_channel::<()>(0);
1126
1127 let t = thread::spawn(move || {
1128 let mut recv_count = 0;
1129 loop {
1130 match rx.recv_timeout(Duration::from_millis(10)) {
1131 Ok(v) => {
1132 assert_eq!(v, 1);
1133 recv_count += 1;
1134 }
1135 Err(RecvTimeoutError::Timeout) => continue,
1136 Err(RecvTimeoutError::Disconnected) => break,
1137 }
1138 }
1139
1140 assert_eq!(recv_count, AMT * NTHREADS);
1141 assert!(rx.try_recv().is_err());
1142
1143 dtx.send(()).unwrap();
1144 });
1145
1146 let mut ts = Vec::with_capacity(NTHREADS as usize);
1147 for _ in 0..NTHREADS {
1148 let tx = tx.clone();
1149 let t = thread::spawn(move || {
1150 for _ in 0..AMT {
1151 tx.send(1).unwrap();
1152 }
1153 });
1154 ts.push(t);
1155 }
1156
1157 drop(tx);
1158
1159 drx.recv().unwrap();
1160 for t in ts {
1161 t.join().unwrap();
1162 }
1163 t.join().unwrap();
1164 }
1165
1166 #[test]
stress_shared()1167 fn stress_shared() {
1168 const AMT: u32 = 1000;
1169 const NTHREADS: u32 = 8;
1170 let (tx, rx) = sync_channel::<i32>(0);
1171 let (dtx, drx) = sync_channel::<()>(0);
1172
1173 let t = thread::spawn(move || {
1174 for _ in 0..AMT * NTHREADS {
1175 assert_eq!(rx.recv().unwrap(), 1);
1176 }
1177 match rx.try_recv() {
1178 Ok(..) => panic!(),
1179 _ => {}
1180 }
1181 dtx.send(()).unwrap();
1182 });
1183
1184 let mut ts = Vec::with_capacity(NTHREADS as usize);
1185 for _ in 0..NTHREADS {
1186 let tx = tx.clone();
1187 let t = thread::spawn(move || {
1188 for _ in 0..AMT {
1189 tx.send(1).unwrap();
1190 }
1191 });
1192 ts.push(t);
1193 }
1194 drop(tx);
1195 drx.recv().unwrap();
1196 for t in ts {
1197 t.join().unwrap();
1198 }
1199 t.join().unwrap();
1200 }
1201
1202 #[test]
oneshot_single_thread_close_port_first()1203 fn oneshot_single_thread_close_port_first() {
1204 // Simple test of closing without sending
1205 let (_tx, rx) = sync_channel::<i32>(0);
1206 drop(rx);
1207 }
1208
1209 #[test]
oneshot_single_thread_close_chan_first()1210 fn oneshot_single_thread_close_chan_first() {
1211 // Simple test of closing without sending
1212 let (tx, _rx) = sync_channel::<i32>(0);
1213 drop(tx);
1214 }
1215
1216 #[test]
oneshot_single_thread_send_port_close()1217 fn oneshot_single_thread_send_port_close() {
1218 // Testing that the sender cleans up the payload if receiver is closed
1219 let (tx, rx) = sync_channel::<Box<i32>>(0);
1220 drop(rx);
1221 assert!(tx.send(Box::new(0)).is_err());
1222 }
1223
1224 #[test]
oneshot_single_thread_recv_chan_close()1225 fn oneshot_single_thread_recv_chan_close() {
1226 let (tx, rx) = sync_channel::<i32>(0);
1227 drop(tx);
1228 assert_eq!(rx.recv(), Err(RecvError));
1229 }
1230
1231 #[test]
oneshot_single_thread_send_then_recv()1232 fn oneshot_single_thread_send_then_recv() {
1233 let (tx, rx) = sync_channel::<Box<i32>>(1);
1234 tx.send(Box::new(10)).unwrap();
1235 assert!(*rx.recv().unwrap() == 10);
1236 }
1237
1238 #[test]
oneshot_single_thread_try_send_open()1239 fn oneshot_single_thread_try_send_open() {
1240 let (tx, rx) = sync_channel::<i32>(1);
1241 assert_eq!(tx.try_send(10), Ok(()));
1242 assert!(rx.recv().unwrap() == 10);
1243 }
1244
1245 #[test]
oneshot_single_thread_try_send_closed()1246 fn oneshot_single_thread_try_send_closed() {
1247 let (tx, rx) = sync_channel::<i32>(0);
1248 drop(rx);
1249 assert_eq!(tx.try_send(10), Err(TrySendError::Disconnected(10)));
1250 }
1251
1252 #[test]
oneshot_single_thread_try_send_closed2()1253 fn oneshot_single_thread_try_send_closed2() {
1254 let (tx, _rx) = sync_channel::<i32>(0);
1255 assert_eq!(tx.try_send(10), Err(TrySendError::Full(10)));
1256 }
1257
1258 #[test]
oneshot_single_thread_try_recv_open()1259 fn oneshot_single_thread_try_recv_open() {
1260 let (tx, rx) = sync_channel::<i32>(1);
1261 tx.send(10).unwrap();
1262 assert!(rx.recv() == Ok(10));
1263 }
1264
1265 #[test]
oneshot_single_thread_try_recv_closed()1266 fn oneshot_single_thread_try_recv_closed() {
1267 let (tx, rx) = sync_channel::<i32>(0);
1268 drop(tx);
1269 assert!(rx.recv().is_err());
1270 }
1271
1272 #[test]
oneshot_single_thread_try_recv_closed_with_data()1273 fn oneshot_single_thread_try_recv_closed_with_data() {
1274 let (tx, rx) = sync_channel::<i32>(1);
1275 tx.send(10).unwrap();
1276 drop(tx);
1277 assert_eq!(rx.try_recv(), Ok(10));
1278 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1279 }
1280
1281 #[test]
oneshot_single_thread_peek_data()1282 fn oneshot_single_thread_peek_data() {
1283 let (tx, rx) = sync_channel::<i32>(1);
1284 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1285 tx.send(10).unwrap();
1286 assert_eq!(rx.try_recv(), Ok(10));
1287 }
1288
1289 #[test]
oneshot_single_thread_peek_close()1290 fn oneshot_single_thread_peek_close() {
1291 let (tx, rx) = sync_channel::<i32>(0);
1292 drop(tx);
1293 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1294 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1295 }
1296
1297 #[test]
oneshot_single_thread_peek_open()1298 fn oneshot_single_thread_peek_open() {
1299 let (_tx, rx) = sync_channel::<i32>(0);
1300 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1301 }
1302
1303 #[test]
oneshot_multi_task_recv_then_send()1304 fn oneshot_multi_task_recv_then_send() {
1305 let (tx, rx) = sync_channel::<Box<i32>>(0);
1306 let t = thread::spawn(move || {
1307 assert!(*rx.recv().unwrap() == 10);
1308 });
1309
1310 tx.send(Box::new(10)).unwrap();
1311 t.join().unwrap();
1312 }
1313
1314 #[test]
oneshot_multi_task_recv_then_close()1315 fn oneshot_multi_task_recv_then_close() {
1316 let (tx, rx) = sync_channel::<Box<i32>>(0);
1317 let t = thread::spawn(move || {
1318 drop(tx);
1319 });
1320 thread::spawn(move || {
1321 assert_eq!(rx.recv(), Err(RecvError));
1322 })
1323 .join()
1324 .unwrap();
1325 t.join().unwrap();
1326 }
1327
1328 #[test]
oneshot_multi_thread_close_stress()1329 fn oneshot_multi_thread_close_stress() {
1330 let stress_factor = stress_factor();
1331 let mut ts = Vec::with_capacity(stress_factor);
1332 for _ in 0..stress_factor {
1333 let (tx, rx) = sync_channel::<i32>(0);
1334 let t = thread::spawn(move || {
1335 drop(rx);
1336 });
1337 ts.push(t);
1338 drop(tx);
1339 }
1340 for t in ts {
1341 t.join().unwrap();
1342 }
1343 }
1344
1345 #[test]
oneshot_multi_thread_send_close_stress()1346 fn oneshot_multi_thread_send_close_stress() {
1347 let stress_factor = stress_factor();
1348 let mut ts = Vec::with_capacity(stress_factor);
1349 for _ in 0..stress_factor {
1350 let (tx, rx) = sync_channel::<i32>(0);
1351 let t = thread::spawn(move || {
1352 drop(rx);
1353 });
1354 ts.push(t);
1355 thread::spawn(move || {
1356 let _ = tx.send(1);
1357 })
1358 .join()
1359 .unwrap();
1360 }
1361 for t in ts {
1362 t.join().unwrap();
1363 }
1364 }
1365
1366 #[test]
oneshot_multi_thread_recv_close_stress()1367 fn oneshot_multi_thread_recv_close_stress() {
1368 let stress_factor = stress_factor();
1369 let mut ts = Vec::with_capacity(2 * stress_factor);
1370 for _ in 0..stress_factor {
1371 let (tx, rx) = sync_channel::<i32>(0);
1372 let t = thread::spawn(move || {
1373 thread::spawn(move || {
1374 assert_eq!(rx.recv(), Err(RecvError));
1375 })
1376 .join()
1377 .unwrap();
1378 });
1379 ts.push(t);
1380 let t2 = thread::spawn(move || {
1381 thread::spawn(move || {
1382 drop(tx);
1383 });
1384 });
1385 ts.push(t2);
1386 }
1387 for t in ts {
1388 t.join().unwrap();
1389 }
1390 }
1391
1392 #[test]
oneshot_multi_thread_send_recv_stress()1393 fn oneshot_multi_thread_send_recv_stress() {
1394 let stress_factor = stress_factor();
1395 let mut ts = Vec::with_capacity(stress_factor);
1396 for _ in 0..stress_factor {
1397 let (tx, rx) = sync_channel::<Box<i32>>(0);
1398 let t = thread::spawn(move || {
1399 tx.send(Box::new(10)).unwrap();
1400 });
1401 ts.push(t);
1402 assert!(*rx.recv().unwrap() == 10);
1403 }
1404 for t in ts {
1405 t.join().unwrap();
1406 }
1407 }
1408
1409 #[test]
stream_send_recv_stress()1410 fn stream_send_recv_stress() {
1411 let stress_factor = stress_factor();
1412 let mut ts = Vec::with_capacity(2 * stress_factor);
1413 for _ in 0..stress_factor {
1414 let (tx, rx) = sync_channel::<Box<i32>>(0);
1415
1416 if let Some(t) = send(tx, 0) {
1417 ts.push(t);
1418 }
1419 if let Some(t) = recv(rx, 0) {
1420 ts.push(t);
1421 }
1422
1423 fn send(tx: SyncSender<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
1424 if i == 10 {
1425 return None;
1426 }
1427
1428 Some(thread::spawn(move || {
1429 tx.send(Box::new(i)).unwrap();
1430 send(tx, i + 1);
1431 }))
1432 }
1433
1434 fn recv(rx: Receiver<Box<i32>>, i: i32) -> Option<JoinHandle<()>> {
1435 if i == 10 {
1436 return None;
1437 }
1438
1439 Some(thread::spawn(move || {
1440 assert!(*rx.recv().unwrap() == i);
1441 recv(rx, i + 1);
1442 }))
1443 }
1444 }
1445 for t in ts {
1446 t.join().unwrap();
1447 }
1448 }
1449
1450 #[test]
recv_a_lot()1451 fn recv_a_lot() {
1452 // Regression test that we don't run out of stack in scheduler context
1453 let (tx, rx) = sync_channel(10000);
1454 for _ in 0..10000 {
1455 tx.send(()).unwrap();
1456 }
1457 for _ in 0..10000 {
1458 rx.recv().unwrap();
1459 }
1460 }
1461
1462 #[test]
shared_chan_stress()1463 fn shared_chan_stress() {
1464 let (tx, rx) = sync_channel(0);
1465 let total = stress_factor() + 100;
1466 let mut ts = Vec::with_capacity(total);
1467 for _ in 0..total {
1468 let tx = tx.clone();
1469 let t = thread::spawn(move || {
1470 tx.send(()).unwrap();
1471 });
1472 ts.push(t);
1473 }
1474
1475 for _ in 0..total {
1476 rx.recv().unwrap();
1477 }
1478 for t in ts {
1479 t.join().unwrap();
1480 }
1481 }
1482
1483 #[test]
test_nested_recv_iter()1484 fn test_nested_recv_iter() {
1485 let (tx, rx) = sync_channel::<i32>(0);
1486 let (total_tx, total_rx) = sync_channel::<i32>(0);
1487
1488 let t = thread::spawn(move || {
1489 let mut acc = 0;
1490 for x in rx.iter() {
1491 acc += x;
1492 }
1493 total_tx.send(acc).unwrap();
1494 });
1495
1496 tx.send(3).unwrap();
1497 tx.send(1).unwrap();
1498 tx.send(2).unwrap();
1499 drop(tx);
1500 assert_eq!(total_rx.recv().unwrap(), 6);
1501 t.join().unwrap();
1502 }
1503
1504 #[test]
test_recv_iter_break()1505 fn test_recv_iter_break() {
1506 let (tx, rx) = sync_channel::<i32>(0);
1507 let (count_tx, count_rx) = sync_channel(0);
1508
1509 let t = thread::spawn(move || {
1510 let mut count = 0;
1511 for x in rx.iter() {
1512 if count >= 3 {
1513 break;
1514 } else {
1515 count += x;
1516 }
1517 }
1518 count_tx.send(count).unwrap();
1519 });
1520
1521 tx.send(2).unwrap();
1522 tx.send(2).unwrap();
1523 tx.send(2).unwrap();
1524 let _ = tx.try_send(2);
1525 drop(tx);
1526 assert_eq!(count_rx.recv().unwrap(), 4);
1527 t.join().unwrap();
1528 }
1529
1530 #[test]
try_recv_states()1531 fn try_recv_states() {
1532 let (tx1, rx1) = sync_channel::<i32>(1);
1533 let (tx2, rx2) = sync_channel::<()>(1);
1534 let (tx3, rx3) = sync_channel::<()>(1);
1535 let t = thread::spawn(move || {
1536 rx2.recv().unwrap();
1537 tx1.send(1).unwrap();
1538 tx3.send(()).unwrap();
1539 rx2.recv().unwrap();
1540 drop(tx1);
1541 tx3.send(()).unwrap();
1542 });
1543
1544 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1545 tx2.send(()).unwrap();
1546 rx3.recv().unwrap();
1547 assert_eq!(rx1.try_recv(), Ok(1));
1548 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1549 tx2.send(()).unwrap();
1550 rx3.recv().unwrap();
1551 assert_eq!(rx1.try_recv(), Err(TryRecvError::Disconnected));
1552 t.join().unwrap();
1553 }
1554
1555 // This bug used to end up in a livelock inside of the Receiver destructor
1556 // because the internal state of the Shared packet was corrupted
1557 #[test]
destroy_upgraded_shared_port_when_sender_still_active()1558 fn destroy_upgraded_shared_port_when_sender_still_active() {
1559 let (tx, rx) = sync_channel::<()>(0);
1560 let (tx2, rx2) = sync_channel::<()>(0);
1561 let t = thread::spawn(move || {
1562 rx.recv().unwrap(); // wait on a oneshot
1563 drop(rx); // destroy a shared
1564 tx2.send(()).unwrap();
1565 });
1566 // make sure the other thread has gone to sleep
1567 for _ in 0..5000 {
1568 thread::yield_now();
1569 }
1570
1571 // upgrade to a shared chan and send a message
1572 let tx2 = tx.clone();
1573 drop(tx);
1574 tx2.send(()).unwrap();
1575
1576 // wait for the child thread to exit before we exit
1577 rx2.recv().unwrap();
1578 t.join().unwrap();
1579 }
1580
1581 #[test]
send1()1582 fn send1() {
1583 let (tx, rx) = sync_channel::<i32>(0);
1584 let t = thread::spawn(move || {
1585 rx.recv().unwrap();
1586 });
1587 assert_eq!(tx.send(1), Ok(()));
1588 t.join().unwrap();
1589 }
1590
1591 #[test]
send2()1592 fn send2() {
1593 let (tx, rx) = sync_channel::<i32>(0);
1594 let t = thread::spawn(move || {
1595 drop(rx);
1596 });
1597 assert!(tx.send(1).is_err());
1598 t.join().unwrap();
1599 }
1600
1601 #[test]
send3()1602 fn send3() {
1603 let (tx, rx) = sync_channel::<i32>(1);
1604 assert_eq!(tx.send(1), Ok(()));
1605 let t = thread::spawn(move || {
1606 drop(rx);
1607 });
1608 assert!(tx.send(1).is_err());
1609 t.join().unwrap();
1610 }
1611
1612 #[test]
send4()1613 fn send4() {
1614 let (tx, rx) = sync_channel::<i32>(0);
1615 let tx2 = tx.clone();
1616 let (done, donerx) = channel();
1617 let done2 = done.clone();
1618 let t = thread::spawn(move || {
1619 assert!(tx.send(1).is_err());
1620 done.send(()).unwrap();
1621 });
1622 let t2 = thread::spawn(move || {
1623 assert!(tx2.send(2).is_err());
1624 done2.send(()).unwrap();
1625 });
1626 drop(rx);
1627 donerx.recv().unwrap();
1628 donerx.recv().unwrap();
1629 t.join().unwrap();
1630 t2.join().unwrap();
1631 }
1632
1633 #[test]
try_send1()1634 fn try_send1() {
1635 let (tx, _rx) = sync_channel::<i32>(0);
1636 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
1637 }
1638
1639 #[test]
try_send2()1640 fn try_send2() {
1641 let (tx, _rx) = sync_channel::<i32>(1);
1642 assert_eq!(tx.try_send(1), Ok(()));
1643 assert_eq!(tx.try_send(1), Err(TrySendError::Full(1)));
1644 }
1645
1646 #[test]
try_send3()1647 fn try_send3() {
1648 let (tx, rx) = sync_channel::<i32>(1);
1649 assert_eq!(tx.try_send(1), Ok(()));
1650 drop(rx);
1651 assert_eq!(tx.try_send(1), Err(TrySendError::Disconnected(1)));
1652 }
1653
1654 #[test]
issue_15761()1655 fn issue_15761() {
1656 fn repro() {
1657 let (tx1, rx1) = sync_channel::<()>(3);
1658 let (tx2, rx2) = sync_channel::<()>(3);
1659
1660 let _t = thread::spawn(move || {
1661 rx1.recv().unwrap();
1662 tx2.try_send(()).unwrap();
1663 });
1664
1665 tx1.try_send(()).unwrap();
1666 rx2.recv().unwrap();
1667 }
1668
1669 for _ in 0..100 {
1670 repro()
1671 }
1672 }
1673 }
1674
1675 // Source: https://github.com/rust-lang/rust/blob/master/src/libstd/sync/mpsc/select.rs
1676 mod select_tests {
1677 use super::*;
1678
1679 use std::thread;
1680
1681 #[test]
smoke()1682 fn smoke() {
1683 let (tx1, rx1) = channel::<i32>();
1684 let (tx2, rx2) = channel::<i32>();
1685 tx1.send(1).unwrap();
1686 select! {
1687 foo = rx1.recv() => assert_eq!(foo.unwrap(), 1),
1688 _bar = rx2.recv() => panic!()
1689 }
1690 tx2.send(2).unwrap();
1691 select! {
1692 _foo = rx1.recv() => panic!(),
1693 bar = rx2.recv() => assert_eq!(bar.unwrap(), 2)
1694 }
1695 drop(tx1);
1696 select! {
1697 foo = rx1.recv() => assert!(foo.is_err()),
1698 _bar = rx2.recv() => panic!()
1699 }
1700 drop(tx2);
1701 select! {
1702 bar = rx2.recv() => assert!(bar.is_err())
1703 }
1704 }
1705
1706 #[test]
smoke2()1707 fn smoke2() {
1708 let (_tx1, rx1) = channel::<i32>();
1709 let (_tx2, rx2) = channel::<i32>();
1710 let (_tx3, rx3) = channel::<i32>();
1711 let (_tx4, rx4) = channel::<i32>();
1712 let (tx5, rx5) = channel::<i32>();
1713 tx5.send(4).unwrap();
1714 select! {
1715 _foo = rx1.recv() => panic!("1"),
1716 _foo = rx2.recv() => panic!("2"),
1717 _foo = rx3.recv() => panic!("3"),
1718 _foo = rx4.recv() => panic!("4"),
1719 foo = rx5.recv() => assert_eq!(foo.unwrap(), 4)
1720 }
1721 }
1722
1723 #[test]
closed()1724 fn closed() {
1725 let (_tx1, rx1) = channel::<i32>();
1726 let (tx2, rx2) = channel::<i32>();
1727 drop(tx2);
1728
1729 select! {
1730 _a1 = rx1.recv() => panic!(),
1731 a2 = rx2.recv() => assert!(a2.is_err())
1732 }
1733 }
1734
1735 #[test]
unblocks()1736 fn unblocks() {
1737 let (tx1, rx1) = channel::<i32>();
1738 let (_tx2, rx2) = channel::<i32>();
1739 let (tx3, rx3) = channel::<i32>();
1740
1741 let t = thread::spawn(move || {
1742 for _ in 0..20 {
1743 thread::yield_now();
1744 }
1745 tx1.send(1).unwrap();
1746 rx3.recv().unwrap();
1747 for _ in 0..20 {
1748 thread::yield_now();
1749 }
1750 });
1751
1752 select! {
1753 a = rx1.recv() => assert_eq!(a.unwrap(), 1),
1754 _b = rx2.recv() => panic!()
1755 }
1756 tx3.send(1).unwrap();
1757 select! {
1758 a = rx1.recv() => assert!(a.is_err()),
1759 _b = rx2.recv() => panic!()
1760 }
1761 t.join().unwrap();
1762 }
1763
1764 #[test]
both_ready()1765 fn both_ready() {
1766 let (tx1, rx1) = channel::<i32>();
1767 let (tx2, rx2) = channel::<i32>();
1768 let (tx3, rx3) = channel::<()>();
1769
1770 let t = thread::spawn(move || {
1771 for _ in 0..20 {
1772 thread::yield_now();
1773 }
1774 tx1.send(1).unwrap();
1775 tx2.send(2).unwrap();
1776 rx3.recv().unwrap();
1777 });
1778
1779 select! {
1780 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
1781 a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
1782 }
1783 select! {
1784 a = rx1.recv() => { assert_eq!(a.unwrap(), 1); },
1785 a = rx2.recv() => { assert_eq!(a.unwrap(), 2); }
1786 }
1787 assert_eq!(rx1.try_recv(), Err(TryRecvError::Empty));
1788 assert_eq!(rx2.try_recv(), Err(TryRecvError::Empty));
1789 tx3.send(()).unwrap();
1790 t.join().unwrap();
1791 }
1792
1793 #[test]
stress()1794 fn stress() {
1795 const AMT: i32 = 10000;
1796 let (tx1, rx1) = channel::<i32>();
1797 let (tx2, rx2) = channel::<i32>();
1798 let (tx3, rx3) = channel::<()>();
1799
1800 let t = thread::spawn(move || {
1801 for i in 0..AMT {
1802 if i % 2 == 0 {
1803 tx1.send(i).unwrap();
1804 } else {
1805 tx2.send(i).unwrap();
1806 }
1807 rx3.recv().unwrap();
1808 }
1809 });
1810
1811 for i in 0..AMT {
1812 select! {
1813 i1 = rx1.recv() => { assert!(i % 2 == 0 && i == i1.unwrap()); },
1814 i2 = rx2.recv() => { assert!(i % 2 == 1 && i == i2.unwrap()); }
1815 }
1816 tx3.send(()).unwrap();
1817 }
1818 t.join().unwrap();
1819 }
1820
1821 #[allow(unused_must_use)]
1822 #[test]
cloning()1823 fn cloning() {
1824 let (tx1, rx1) = channel::<i32>();
1825 let (_tx2, rx2) = channel::<i32>();
1826 let (tx3, rx3) = channel::<()>();
1827
1828 let t = thread::spawn(move || {
1829 rx3.recv().unwrap();
1830 tx1.clone();
1831 assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
1832 tx1.send(2).unwrap();
1833 rx3.recv().unwrap();
1834 });
1835
1836 tx3.send(()).unwrap();
1837 select! {
1838 _i1 = rx1.recv() => {},
1839 _i2 = rx2.recv() => panic!()
1840 }
1841 tx3.send(()).unwrap();
1842 t.join().unwrap();
1843 }
1844
1845 #[allow(unused_must_use)]
1846 #[test]
cloning2()1847 fn cloning2() {
1848 let (tx1, rx1) = channel::<i32>();
1849 let (_tx2, rx2) = channel::<i32>();
1850 let (tx3, rx3) = channel::<()>();
1851
1852 let t = thread::spawn(move || {
1853 rx3.recv().unwrap();
1854 tx1.clone();
1855 assert_eq!(rx3.try_recv(), Err(TryRecvError::Empty));
1856 tx1.send(2).unwrap();
1857 rx3.recv().unwrap();
1858 });
1859
1860 tx3.send(()).unwrap();
1861 select! {
1862 _i1 = rx1.recv() => {},
1863 _i2 = rx2.recv() => panic!()
1864 }
1865 tx3.send(()).unwrap();
1866 t.join().unwrap();
1867 }
1868
1869 #[test]
cloning3()1870 fn cloning3() {
1871 let (tx1, rx1) = channel::<()>();
1872 let (tx2, rx2) = channel::<()>();
1873 let (tx3, rx3) = channel::<()>();
1874 let t = thread::spawn(move || {
1875 select! {
1876 _ = rx1.recv() => panic!(),
1877 _ = rx2.recv() => {}
1878 }
1879 tx3.send(()).unwrap();
1880 });
1881
1882 for _ in 0..1000 {
1883 thread::yield_now();
1884 }
1885 drop(tx1.clone());
1886 tx2.send(()).unwrap();
1887 rx3.recv().unwrap();
1888 t.join().unwrap();
1889 }
1890
1891 #[test]
preflight1()1892 fn preflight1() {
1893 let (tx, rx) = channel();
1894 tx.send(()).unwrap();
1895 select! {
1896 _n = rx.recv() => {}
1897 }
1898 }
1899
1900 #[test]
preflight2()1901 fn preflight2() {
1902 let (tx, rx) = channel();
1903 tx.send(()).unwrap();
1904 tx.send(()).unwrap();
1905 select! {
1906 _n = rx.recv() => {}
1907 }
1908 }
1909
1910 #[test]
preflight3()1911 fn preflight3() {
1912 let (tx, rx) = channel();
1913 drop(tx.clone());
1914 tx.send(()).unwrap();
1915 select! {
1916 _n = rx.recv() => {}
1917 }
1918 }
1919
1920 #[test]
preflight4()1921 fn preflight4() {
1922 let (tx, rx) = channel();
1923 tx.send(()).unwrap();
1924 select! {
1925 _ = rx.recv() => {}
1926 }
1927 }
1928
1929 #[test]
preflight5()1930 fn preflight5() {
1931 let (tx, rx) = channel();
1932 tx.send(()).unwrap();
1933 tx.send(()).unwrap();
1934 select! {
1935 _ = rx.recv() => {}
1936 }
1937 }
1938
1939 #[test]
preflight6()1940 fn preflight6() {
1941 let (tx, rx) = channel();
1942 drop(tx.clone());
1943 tx.send(()).unwrap();
1944 select! {
1945 _ = rx.recv() => {}
1946 }
1947 }
1948
1949 #[test]
preflight7()1950 fn preflight7() {
1951 let (tx, rx) = channel::<()>();
1952 drop(tx);
1953 select! {
1954 _ = rx.recv() => {}
1955 }
1956 }
1957
1958 #[test]
preflight8()1959 fn preflight8() {
1960 let (tx, rx) = channel();
1961 tx.send(()).unwrap();
1962 drop(tx);
1963 rx.recv().unwrap();
1964 select! {
1965 _ = rx.recv() => {}
1966 }
1967 }
1968
1969 #[test]
preflight9()1970 fn preflight9() {
1971 let (tx, rx) = channel();
1972 drop(tx.clone());
1973 tx.send(()).unwrap();
1974 drop(tx);
1975 rx.recv().unwrap();
1976 select! {
1977 _ = rx.recv() => {}
1978 }
1979 }
1980
1981 #[test]
oneshot_data_waiting()1982 fn oneshot_data_waiting() {
1983 let (tx1, rx1) = channel();
1984 let (tx2, rx2) = channel();
1985 let t = thread::spawn(move || {
1986 select! {
1987 _n = rx1.recv() => {}
1988 }
1989 tx2.send(()).unwrap();
1990 });
1991
1992 for _ in 0..100 {
1993 thread::yield_now()
1994 }
1995 tx1.send(()).unwrap();
1996 rx2.recv().unwrap();
1997 t.join().unwrap();
1998 }
1999
2000 #[test]
stream_data_waiting()2001 fn stream_data_waiting() {
2002 let (tx1, rx1) = channel();
2003 let (tx2, rx2) = channel();
2004 tx1.send(()).unwrap();
2005 tx1.send(()).unwrap();
2006 rx1.recv().unwrap();
2007 rx1.recv().unwrap();
2008 let t = thread::spawn(move || {
2009 select! {
2010 _n = rx1.recv() => {}
2011 }
2012 tx2.send(()).unwrap();
2013 });
2014
2015 for _ in 0..100 {
2016 thread::yield_now()
2017 }
2018 tx1.send(()).unwrap();
2019 rx2.recv().unwrap();
2020 t.join().unwrap();
2021 }
2022
2023 #[test]
shared_data_waiting()2024 fn shared_data_waiting() {
2025 let (tx1, rx1) = channel();
2026 let (tx2, rx2) = channel();
2027 drop(tx1.clone());
2028 tx1.send(()).unwrap();
2029 rx1.recv().unwrap();
2030 let t = thread::spawn(move || {
2031 select! {
2032 _n = rx1.recv() => {}
2033 }
2034 tx2.send(()).unwrap();
2035 });
2036
2037 for _ in 0..100 {
2038 thread::yield_now()
2039 }
2040 tx1.send(()).unwrap();
2041 rx2.recv().unwrap();
2042 t.join().unwrap();
2043 }
2044
2045 #[test]
sync1()2046 fn sync1() {
2047 let (tx, rx) = sync_channel::<i32>(1);
2048 tx.send(1).unwrap();
2049 select! {
2050 n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
2051 }
2052 }
2053
2054 #[test]
sync2()2055 fn sync2() {
2056 let (tx, rx) = sync_channel::<i32>(0);
2057 let t = thread::spawn(move || {
2058 for _ in 0..100 {
2059 thread::yield_now()
2060 }
2061 tx.send(1).unwrap();
2062 });
2063 select! {
2064 n = rx.recv() => { assert_eq!(n.unwrap(), 1); }
2065 }
2066 t.join().unwrap();
2067 }
2068
2069 #[test]
sync3()2070 fn sync3() {
2071 let (tx1, rx1) = sync_channel::<i32>(0);
2072 let (tx2, rx2): (Sender<i32>, Receiver<i32>) = channel();
2073 let t = thread::spawn(move || {
2074 tx1.send(1).unwrap();
2075 });
2076 let t2 = thread::spawn(move || {
2077 tx2.send(2).unwrap();
2078 });
2079 select! {
2080 n = rx1.recv() => {
2081 let n = n.unwrap();
2082 assert_eq!(n, 1);
2083 assert_eq!(rx2.recv().unwrap(), 2);
2084 },
2085 n = rx2.recv() => {
2086 let n = n.unwrap();
2087 assert_eq!(n, 2);
2088 assert_eq!(rx1.recv().unwrap(), 1);
2089 }
2090 }
2091 t.join().unwrap();
2092 t2.join().unwrap();
2093 }
2094 }
2095