1 use futures::channel::mpsc;
2 use futures::executor::block_on;
3 use futures::sink::SinkExt;
4 use futures::stream::StreamExt;
5 use std::thread;
6 
7 #[test]
smoke()8 fn smoke() {
9     let (mut sender, receiver) = mpsc::channel(1);
10 
11     let t = thread::spawn(move || {
12         while let Ok(()) = block_on(sender.send(42)) {}
13     });
14 
15     // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
16     drop(block_on(receiver.take(3).for_each(|_| futures::future::ready(()))));
17 
18     t.join().unwrap()
19 }
20 
21 #[test]
multiple_senders_disconnect()22 fn multiple_senders_disconnect() {
23     {
24         let (mut tx1, mut rx) = mpsc::channel(1);
25         let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
26 
27         // disconnect, dropping and Sink::poll_close should all close this sender but leave the
28         // channel open for other senders
29         tx1.disconnect();
30         drop(tx2);
31         block_on(tx3.close()).unwrap();
32 
33         assert!(tx1.is_closed());
34         assert!(tx3.is_closed());
35         assert!(!tx4.is_closed());
36 
37         block_on(tx4.send(5)).unwrap();
38         assert_eq!(block_on(rx.next()), Some(5));
39 
40         // dropping the final sender will close the channel
41         drop(tx4);
42         assert_eq!(block_on(rx.next()), None);
43     }
44 
45     {
46         let (mut tx1, mut rx) = mpsc::unbounded();
47         let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
48 
49         // disconnect, dropping and Sink::poll_close should all close this sender but leave the
50         // channel open for other senders
51         tx1.disconnect();
52         drop(tx2);
53         block_on(tx3.close()).unwrap();
54 
55         assert!(tx1.is_closed());
56         assert!(tx3.is_closed());
57         assert!(!tx4.is_closed());
58 
59         block_on(tx4.send(5)).unwrap();
60         assert_eq!(block_on(rx.next()), Some(5));
61 
62         // dropping the final sender will close the channel
63         drop(tx4);
64         assert_eq!(block_on(rx.next()), None);
65     }
66 }
67 
68 #[test]
multiple_senders_close_channel()69 fn multiple_senders_close_channel() {
70     {
71         let (mut tx1, mut rx) = mpsc::channel(1);
72         let mut tx2 = tx1.clone();
73 
74         // close_channel should shut down the whole channel
75         tx1.close_channel();
76 
77         assert!(tx1.is_closed());
78         assert!(tx2.is_closed());
79 
80         let err = block_on(tx2.send(5)).unwrap_err();
81         assert!(err.is_disconnected());
82 
83         assert_eq!(block_on(rx.next()), None);
84     }
85 
86     {
87         let (tx1, mut rx) = mpsc::unbounded();
88         let mut tx2 = tx1.clone();
89 
90         // close_channel should shut down the whole channel
91         tx1.close_channel();
92 
93         assert!(tx1.is_closed());
94         assert!(tx2.is_closed());
95 
96         let err = block_on(tx2.send(5)).unwrap_err();
97         assert!(err.is_disconnected());
98 
99         assert_eq!(block_on(rx.next()), None);
100     }
101 }
102