1 use futures::channel::mpsc;
2 use futures::executor::block_on;
3 use futures::sink::SinkExt;
4 use futures::stream::StreamExt;
5 use std::sync::Arc;
6 use std::thread;
7 
8 #[test]
smoke()9 fn smoke() {
10     let (mut sender, receiver) = mpsc::channel(1);
11 
12     let t = thread::spawn(move || {
13         while let Ok(()) = block_on(sender.send(42)) {}
14     });
15 
16     // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
17     block_on(receiver.take(3).for_each(|_| futures::future::ready(())));
18 
19     t.join().unwrap()
20 }
21 
22 #[test]
multiple_senders_disconnect()23 fn multiple_senders_disconnect() {
24     {
25         let (mut tx1, mut rx) = mpsc::channel(1);
26         let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
27 
28         // disconnect, dropping and Sink::poll_close should all close this sender but leave the
29         // channel open for other senders
30         tx1.disconnect();
31         drop(tx2);
32         block_on(tx3.close()).unwrap();
33 
34         assert!(tx1.is_closed());
35         assert!(tx3.is_closed());
36         assert!(!tx4.is_closed());
37 
38         block_on(tx4.send(5)).unwrap();
39         assert_eq!(block_on(rx.next()), Some(5));
40 
41         // dropping the final sender will close the channel
42         drop(tx4);
43         assert_eq!(block_on(rx.next()), None);
44     }
45 
46     {
47         let (mut tx1, mut rx) = mpsc::unbounded();
48         let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
49 
50         // disconnect, dropping and Sink::poll_close should all close this sender but leave the
51         // channel open for other senders
52         tx1.disconnect();
53         drop(tx2);
54         block_on(tx3.close()).unwrap();
55 
56         assert!(tx1.is_closed());
57         assert!(tx3.is_closed());
58         assert!(!tx4.is_closed());
59 
60         block_on(tx4.send(5)).unwrap();
61         assert_eq!(block_on(rx.next()), Some(5));
62 
63         // dropping the final sender will close the channel
64         drop(tx4);
65         assert_eq!(block_on(rx.next()), None);
66     }
67 }
68 
69 #[test]
multiple_senders_close_channel()70 fn multiple_senders_close_channel() {
71     {
72         let (mut tx1, mut rx) = mpsc::channel(1);
73         let mut tx2 = tx1.clone();
74 
75         // close_channel should shut down the whole channel
76         tx1.close_channel();
77 
78         assert!(tx1.is_closed());
79         assert!(tx2.is_closed());
80 
81         let err = block_on(tx2.send(5)).unwrap_err();
82         assert!(err.is_disconnected());
83 
84         assert_eq!(block_on(rx.next()), None);
85     }
86 
87     {
88         let (tx1, mut rx) = mpsc::unbounded();
89         let mut tx2 = tx1.clone();
90 
91         // close_channel should shut down the whole channel
92         tx1.close_channel();
93 
94         assert!(tx1.is_closed());
95         assert!(tx2.is_closed());
96 
97         let err = block_on(tx2.send(5)).unwrap_err();
98         assert!(err.is_disconnected());
99 
100         assert_eq!(block_on(rx.next()), None);
101     }
102 }
103 
104 #[test]
single_receiver_drop_closes_channel_and_drains()105 fn single_receiver_drop_closes_channel_and_drains() {
106     {
107         let ref_count = Arc::new(0);
108         let weak_ref = Arc::downgrade(&ref_count);
109 
110         let (sender, receiver) = mpsc::unbounded();
111         sender.unbounded_send(ref_count).expect("failed to send");
112 
113         // Verify that the sent message is still live.
114         assert!(weak_ref.upgrade().is_some());
115 
116         drop(receiver);
117 
118         // The sender should know the channel is closed.
119         assert!(sender.is_closed());
120 
121         // Verify that the sent message has been dropped.
122         assert!(weak_ref.upgrade().is_none());
123     }
124 
125     {
126         let ref_count = Arc::new(0);
127         let weak_ref = Arc::downgrade(&ref_count);
128 
129         let (mut sender, receiver) = mpsc::channel(1);
130         sender.try_send(ref_count).expect("failed to send");
131 
132         // Verify that the sent message is still live.
133         assert!(weak_ref.upgrade().is_some());
134 
135         drop(receiver);
136 
137         // The sender should know the channel is closed.
138         assert!(sender.is_closed());
139 
140         // Verify that the sent message has been dropped.
141         assert!(weak_ref.upgrade().is_none());
142         assert!(sender.is_closed());
143     }
144 }
145