1 use futures::channel::mpsc;
2 use futures::executor::block_on;
3 use futures::future::Future;
4 use futures::sink::SinkExt;
5 use futures::stream::StreamExt;
6 use futures::task::{Context, Poll};
7 use std::pin::Pin;
8 use std::sync::{Arc, Weak};
9 use std::thread;
10 use std::time::{Duration, Instant};
11
12 #[test]
smoke()13 fn smoke() {
14 let (mut sender, receiver) = mpsc::channel(1);
15
16 let t = thread::spawn(move || while let Ok(()) = block_on(sender.send(42)) {});
17
18 // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
19 block_on(receiver.take(3).for_each(|_| futures::future::ready(())));
20
21 t.join().unwrap()
22 }
23
24 #[test]
multiple_senders_disconnect()25 fn multiple_senders_disconnect() {
26 {
27 let (mut tx1, mut rx) = mpsc::channel(1);
28 let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
29
30 // disconnect, dropping and Sink::poll_close should all close this sender but leave the
31 // channel open for other senders
32 tx1.disconnect();
33 drop(tx2);
34 block_on(tx3.close()).unwrap();
35
36 assert!(tx1.is_closed());
37 assert!(tx3.is_closed());
38 assert!(!tx4.is_closed());
39
40 block_on(tx4.send(5)).unwrap();
41 assert_eq!(block_on(rx.next()), Some(5));
42
43 // dropping the final sender will close the channel
44 drop(tx4);
45 assert_eq!(block_on(rx.next()), None);
46 }
47
48 {
49 let (mut tx1, mut rx) = mpsc::unbounded();
50 let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
51
52 // disconnect, dropping and Sink::poll_close should all close this sender but leave the
53 // channel open for other senders
54 tx1.disconnect();
55 drop(tx2);
56 block_on(tx3.close()).unwrap();
57
58 assert!(tx1.is_closed());
59 assert!(tx3.is_closed());
60 assert!(!tx4.is_closed());
61
62 block_on(tx4.send(5)).unwrap();
63 assert_eq!(block_on(rx.next()), Some(5));
64
65 // dropping the final sender will close the channel
66 drop(tx4);
67 assert_eq!(block_on(rx.next()), None);
68 }
69 }
70
71 #[test]
multiple_senders_close_channel()72 fn multiple_senders_close_channel() {
73 {
74 let (mut tx1, mut rx) = mpsc::channel(1);
75 let mut tx2 = tx1.clone();
76
77 // close_channel should shut down the whole channel
78 tx1.close_channel();
79
80 assert!(tx1.is_closed());
81 assert!(tx2.is_closed());
82
83 let err = block_on(tx2.send(5)).unwrap_err();
84 assert!(err.is_disconnected());
85
86 assert_eq!(block_on(rx.next()), None);
87 }
88
89 {
90 let (tx1, mut rx) = mpsc::unbounded();
91 let mut tx2 = tx1.clone();
92
93 // close_channel should shut down the whole channel
94 tx1.close_channel();
95
96 assert!(tx1.is_closed());
97 assert!(tx2.is_closed());
98
99 let err = block_on(tx2.send(5)).unwrap_err();
100 assert!(err.is_disconnected());
101
102 assert_eq!(block_on(rx.next()), None);
103 }
104 }
105
106 #[test]
single_receiver_drop_closes_channel_and_drains()107 fn single_receiver_drop_closes_channel_and_drains() {
108 {
109 let ref_count = Arc::new(0);
110 let weak_ref = Arc::downgrade(&ref_count);
111
112 let (sender, receiver) = mpsc::unbounded();
113 sender.unbounded_send(ref_count).expect("failed to send");
114
115 // Verify that the sent message is still live.
116 assert!(weak_ref.upgrade().is_some());
117
118 drop(receiver);
119
120 // The sender should know the channel is closed.
121 assert!(sender.is_closed());
122
123 // Verify that the sent message has been dropped.
124 assert!(weak_ref.upgrade().is_none());
125 }
126
127 {
128 let ref_count = Arc::new(0);
129 let weak_ref = Arc::downgrade(&ref_count);
130
131 let (mut sender, receiver) = mpsc::channel(1);
132 sender.try_send(ref_count).expect("failed to send");
133
134 // Verify that the sent message is still live.
135 assert!(weak_ref.upgrade().is_some());
136
137 drop(receiver);
138
139 // The sender should know the channel is closed.
140 assert!(sender.is_closed());
141
142 // Verify that the sent message has been dropped.
143 assert!(weak_ref.upgrade().is_none());
144 assert!(sender.is_closed());
145 }
146 }
147
148 // Stress test that `try_send()`s occurring concurrently with receiver
149 // close/drops don't appear as successful sends.
150 #[test]
stress_try_send_as_receiver_closes()151 fn stress_try_send_as_receiver_closes() {
152 const AMT: usize = 10000;
153 // To provide variable timing characteristics (in the hopes of
154 // reproducing the collision that leads to a race), we busy-re-poll
155 // the test MPSC receiver a variable number of times before actually
156 // stopping. We vary this countdown between 1 and the following
157 // value.
158 const MAX_COUNTDOWN: usize = 20;
159 // When we detect that a successfully sent item is still in the
160 // queue after a disconnect, we spin for up to 100ms to confirm that
161 // it is a persistent condition and not a concurrency illusion.
162 const SPIN_TIMEOUT_S: u64 = 10;
163 const SPIN_SLEEP_MS: u64 = 10;
164 struct TestRx {
165 rx: mpsc::Receiver<Arc<()>>,
166 // The number of times to query `rx` before dropping it.
167 poll_count: usize,
168 }
169 struct TestTask {
170 command_rx: mpsc::Receiver<TestRx>,
171 test_rx: Option<mpsc::Receiver<Arc<()>>>,
172 countdown: usize,
173 }
174 impl TestTask {
175 /// Create a new TestTask
176 fn new() -> (TestTask, mpsc::Sender<TestRx>) {
177 let (command_tx, command_rx) = mpsc::channel::<TestRx>(0);
178 (
179 TestTask {
180 command_rx,
181 test_rx: None,
182 countdown: 0, // 0 means no countdown is in progress.
183 },
184 command_tx,
185 )
186 }
187 }
188 impl Future for TestTask {
189 type Output = ();
190
191 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
192 // Poll the test channel, if one is present.
193 if let Some(rx) = &mut self.test_rx {
194 if let Poll::Ready(v) = rx.poll_next_unpin(cx) {
195 let _ = v.expect("test finished unexpectedly!");
196 }
197 self.countdown -= 1;
198 // Busy-poll until the countdown is finished.
199 cx.waker().wake_by_ref();
200 }
201 // Accept any newly submitted MPSC channels for testing.
202 match self.command_rx.poll_next_unpin(cx) {
203 Poll::Ready(Some(TestRx { rx, poll_count })) => {
204 self.test_rx = Some(rx);
205 self.countdown = poll_count;
206 cx.waker().wake_by_ref();
207 }
208 Poll::Ready(None) => return Poll::Ready(()),
209 Poll::Pending => {}
210 }
211 if self.countdown == 0 {
212 // Countdown complete -- drop the Receiver.
213 self.test_rx = None;
214 }
215 Poll::Pending
216 }
217 }
218 let (f, mut cmd_tx) = TestTask::new();
219 let bg = thread::spawn(move || block_on(f));
220 for i in 0..AMT {
221 let (mut test_tx, rx) = mpsc::channel(0);
222 let poll_count = i % MAX_COUNTDOWN;
223 cmd_tx.try_send(TestRx { rx, poll_count }).unwrap();
224 let mut prev_weak: Option<Weak<()>> = None;
225 let mut attempted_sends = 0;
226 let mut successful_sends = 0;
227 loop {
228 // Create a test item.
229 let item = Arc::new(());
230 let weak = Arc::downgrade(&item);
231 match test_tx.try_send(item) {
232 Ok(_) => {
233 prev_weak = Some(weak);
234 successful_sends += 1;
235 }
236 Err(ref e) if e.is_full() => {}
237 Err(ref e) if e.is_disconnected() => {
238 // Test for evidence of the race condition.
239 if let Some(prev_weak) = prev_weak {
240 if prev_weak.upgrade().is_some() {
241 // The previously sent item is still allocated.
242 // However, there appears to be some aspect of the
243 // concurrency that can legitimately cause the Arc
244 // to be momentarily valid. Spin for up to 100ms
245 // waiting for the previously sent item to be
246 // dropped.
247 let t0 = Instant::now();
248 let mut spins = 0;
249 loop {
250 if prev_weak.upgrade().is_none() {
251 break;
252 }
253 assert!(
254 t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
255 "item not dropped on iteration {} after \
256 {} sends ({} successful). spin=({})",
257 i,
258 attempted_sends,
259 successful_sends,
260 spins
261 );
262 spins += 1;
263 thread::sleep(Duration::from_millis(SPIN_SLEEP_MS));
264 }
265 }
266 }
267 break;
268 }
269 Err(ref e) => panic!("unexpected error: {}", e),
270 }
271 attempted_sends += 1;
272 }
273 }
274 drop(cmd_tx);
275 bg.join().expect("background thread join");
276 }
277
278 #[test]
unbounded_try_next_after_none()279 fn unbounded_try_next_after_none() {
280 let (tx, mut rx) = mpsc::unbounded::<String>();
281 // Drop the sender, close the channel.
282 drop(tx);
283 // Receive the end of channel.
284 assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
285 // None received, check we can call `try_next` again.
286 assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
287 }
288
289 #[test]
bounded_try_next_after_none()290 fn bounded_try_next_after_none() {
291 let (tx, mut rx) = mpsc::channel::<String>(17);
292 // Drop the sender, close the channel.
293 drop(tx);
294 // Receive the end of channel.
295 assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
296 // None received, check we can call `try_next` again.
297 assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
298 }
299