1 use futures::channel::mpsc;
2 use futures::executor::block_on;
3 use futures::sink::SinkExt;
4 use futures::stream::StreamExt;
5 use std::thread;
6
7 #[test]
smoke()8 fn smoke() {
9 let (mut sender, receiver) = mpsc::channel(1);
10
11 let t = thread::spawn(move || {
12 while let Ok(()) = block_on(sender.send(42)) {}
13 });
14
15 // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
16 drop(block_on(receiver.take(3).for_each(|_| futures::future::ready(()))));
17
18 t.join().unwrap()
19 }
20
21 #[test]
multiple_senders_disconnect()22 fn multiple_senders_disconnect() {
23 {
24 let (mut tx1, mut rx) = mpsc::channel(1);
25 let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
26
27 // disconnect, dropping and Sink::poll_close should all close this sender but leave the
28 // channel open for other senders
29 tx1.disconnect();
30 drop(tx2);
31 block_on(tx3.close()).unwrap();
32
33 assert!(tx1.is_closed());
34 assert!(tx3.is_closed());
35 assert!(!tx4.is_closed());
36
37 block_on(tx4.send(5)).unwrap();
38 assert_eq!(block_on(rx.next()), Some(5));
39
40 // dropping the final sender will close the channel
41 drop(tx4);
42 assert_eq!(block_on(rx.next()), None);
43 }
44
45 {
46 let (mut tx1, mut rx) = mpsc::unbounded();
47 let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
48
49 // disconnect, dropping and Sink::poll_close should all close this sender but leave the
50 // channel open for other senders
51 tx1.disconnect();
52 drop(tx2);
53 block_on(tx3.close()).unwrap();
54
55 assert!(tx1.is_closed());
56 assert!(tx3.is_closed());
57 assert!(!tx4.is_closed());
58
59 block_on(tx4.send(5)).unwrap();
60 assert_eq!(block_on(rx.next()), Some(5));
61
62 // dropping the final sender will close the channel
63 drop(tx4);
64 assert_eq!(block_on(rx.next()), None);
65 }
66 }
67
68 #[test]
multiple_senders_close_channel()69 fn multiple_senders_close_channel() {
70 {
71 let (mut tx1, mut rx) = mpsc::channel(1);
72 let mut tx2 = tx1.clone();
73
74 // close_channel should shut down the whole channel
75 tx1.close_channel();
76
77 assert!(tx1.is_closed());
78 assert!(tx2.is_closed());
79
80 let err = block_on(tx2.send(5)).unwrap_err();
81 assert!(err.is_disconnected());
82
83 assert_eq!(block_on(rx.next()), None);
84 }
85
86 {
87 let (tx1, mut rx) = mpsc::unbounded();
88 let mut tx2 = tx1.clone();
89
90 // close_channel should shut down the whole channel
91 tx1.close_channel();
92
93 assert!(tx1.is_closed());
94 assert!(tx2.is_closed());
95
96 let err = block_on(tx2.send(5)).unwrap_err();
97 assert!(err.is_disconnected());
98
99 assert_eq!(block_on(rx.next()), None);
100 }
101 }
102