1 use futures::channel::mpsc;
2 use futures::executor::block_on;
3 use futures::sink::SinkExt;
4 use futures::stream::StreamExt;
5 use std::sync::Arc;
6 use std::thread;
7
8 #[test]
smoke()9 fn smoke() {
10 let (mut sender, receiver) = mpsc::channel(1);
11
12 let t = thread::spawn(move || {
13 while let Ok(()) = block_on(sender.send(42)) {}
14 });
15
16 // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
17 block_on(receiver.take(3).for_each(|_| futures::future::ready(())));
18
19 t.join().unwrap()
20 }
21
22 #[test]
multiple_senders_disconnect()23 fn multiple_senders_disconnect() {
24 {
25 let (mut tx1, mut rx) = mpsc::channel(1);
26 let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
27
28 // disconnect, dropping and Sink::poll_close should all close this sender but leave the
29 // channel open for other senders
30 tx1.disconnect();
31 drop(tx2);
32 block_on(tx3.close()).unwrap();
33
34 assert!(tx1.is_closed());
35 assert!(tx3.is_closed());
36 assert!(!tx4.is_closed());
37
38 block_on(tx4.send(5)).unwrap();
39 assert_eq!(block_on(rx.next()), Some(5));
40
41 // dropping the final sender will close the channel
42 drop(tx4);
43 assert_eq!(block_on(rx.next()), None);
44 }
45
46 {
47 let (mut tx1, mut rx) = mpsc::unbounded();
48 let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
49
50 // disconnect, dropping and Sink::poll_close should all close this sender but leave the
51 // channel open for other senders
52 tx1.disconnect();
53 drop(tx2);
54 block_on(tx3.close()).unwrap();
55
56 assert!(tx1.is_closed());
57 assert!(tx3.is_closed());
58 assert!(!tx4.is_closed());
59
60 block_on(tx4.send(5)).unwrap();
61 assert_eq!(block_on(rx.next()), Some(5));
62
63 // dropping the final sender will close the channel
64 drop(tx4);
65 assert_eq!(block_on(rx.next()), None);
66 }
67 }
68
69 #[test]
multiple_senders_close_channel()70 fn multiple_senders_close_channel() {
71 {
72 let (mut tx1, mut rx) = mpsc::channel(1);
73 let mut tx2 = tx1.clone();
74
75 // close_channel should shut down the whole channel
76 tx1.close_channel();
77
78 assert!(tx1.is_closed());
79 assert!(tx2.is_closed());
80
81 let err = block_on(tx2.send(5)).unwrap_err();
82 assert!(err.is_disconnected());
83
84 assert_eq!(block_on(rx.next()), None);
85 }
86
87 {
88 let (tx1, mut rx) = mpsc::unbounded();
89 let mut tx2 = tx1.clone();
90
91 // close_channel should shut down the whole channel
92 tx1.close_channel();
93
94 assert!(tx1.is_closed());
95 assert!(tx2.is_closed());
96
97 let err = block_on(tx2.send(5)).unwrap_err();
98 assert!(err.is_disconnected());
99
100 assert_eq!(block_on(rx.next()), None);
101 }
102 }
103
104 #[test]
single_receiver_drop_closes_channel_and_drains()105 fn single_receiver_drop_closes_channel_and_drains() {
106 {
107 let ref_count = Arc::new(0);
108 let weak_ref = Arc::downgrade(&ref_count);
109
110 let (sender, receiver) = mpsc::unbounded();
111 sender.unbounded_send(ref_count).expect("failed to send");
112
113 // Verify that the sent message is still live.
114 assert!(weak_ref.upgrade().is_some());
115
116 drop(receiver);
117
118 // The sender should know the channel is closed.
119 assert!(sender.is_closed());
120
121 // Verify that the sent message has been dropped.
122 assert!(weak_ref.upgrade().is_none());
123 }
124
125 {
126 let ref_count = Arc::new(0);
127 let weak_ref = Arc::downgrade(&ref_count);
128
129 let (mut sender, receiver) = mpsc::channel(1);
130 sender.try_send(ref_count).expect("failed to send");
131
132 // Verify that the sent message is still live.
133 assert!(weak_ref.upgrade().is_some());
134
135 drop(receiver);
136
137 // The sender should know the channel is closed.
138 assert!(sender.is_closed());
139
140 // Verify that the sent message has been dropped.
141 assert!(weak_ref.upgrade().is_none());
142 assert!(sender.is_closed());
143 }
144 }
145