1 use futures::channel::mpsc;
2 use futures::executor::block_on;
3 use futures::future::Future;
4 use futures::sink::SinkExt;
5 use futures::stream::StreamExt;
6 use futures::task::{Context, Poll};
7 use std::pin::Pin;
8 use std::sync::{Arc, Weak};
9 use std::thread;
10 use std::time::{Duration, Instant};
11 
12 #[test]
smoke()13 fn smoke() {
14     let (mut sender, receiver) = mpsc::channel(1);
15 
16     let t = thread::spawn(move || while let Ok(()) = block_on(sender.send(42)) {});
17 
18     // `receiver` needs to be dropped for `sender` to stop sending and therefore before the join.
19     block_on(receiver.take(3).for_each(|_| futures::future::ready(())));
20 
21     t.join().unwrap()
22 }
23 
24 #[test]
multiple_senders_disconnect()25 fn multiple_senders_disconnect() {
26     {
27         let (mut tx1, mut rx) = mpsc::channel(1);
28         let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
29 
30         // disconnect, dropping and Sink::poll_close should all close this sender but leave the
31         // channel open for other senders
32         tx1.disconnect();
33         drop(tx2);
34         block_on(tx3.close()).unwrap();
35 
36         assert!(tx1.is_closed());
37         assert!(tx3.is_closed());
38         assert!(!tx4.is_closed());
39 
40         block_on(tx4.send(5)).unwrap();
41         assert_eq!(block_on(rx.next()), Some(5));
42 
43         // dropping the final sender will close the channel
44         drop(tx4);
45         assert_eq!(block_on(rx.next()), None);
46     }
47 
48     {
49         let (mut tx1, mut rx) = mpsc::unbounded();
50         let (tx2, mut tx3, mut tx4) = (tx1.clone(), tx1.clone(), tx1.clone());
51 
52         // disconnect, dropping and Sink::poll_close should all close this sender but leave the
53         // channel open for other senders
54         tx1.disconnect();
55         drop(tx2);
56         block_on(tx3.close()).unwrap();
57 
58         assert!(tx1.is_closed());
59         assert!(tx3.is_closed());
60         assert!(!tx4.is_closed());
61 
62         block_on(tx4.send(5)).unwrap();
63         assert_eq!(block_on(rx.next()), Some(5));
64 
65         // dropping the final sender will close the channel
66         drop(tx4);
67         assert_eq!(block_on(rx.next()), None);
68     }
69 }
70 
71 #[test]
multiple_senders_close_channel()72 fn multiple_senders_close_channel() {
73     {
74         let (mut tx1, mut rx) = mpsc::channel(1);
75         let mut tx2 = tx1.clone();
76 
77         // close_channel should shut down the whole channel
78         tx1.close_channel();
79 
80         assert!(tx1.is_closed());
81         assert!(tx2.is_closed());
82 
83         let err = block_on(tx2.send(5)).unwrap_err();
84         assert!(err.is_disconnected());
85 
86         assert_eq!(block_on(rx.next()), None);
87     }
88 
89     {
90         let (tx1, mut rx) = mpsc::unbounded();
91         let mut tx2 = tx1.clone();
92 
93         // close_channel should shut down the whole channel
94         tx1.close_channel();
95 
96         assert!(tx1.is_closed());
97         assert!(tx2.is_closed());
98 
99         let err = block_on(tx2.send(5)).unwrap_err();
100         assert!(err.is_disconnected());
101 
102         assert_eq!(block_on(rx.next()), None);
103     }
104 }
105 
106 #[test]
single_receiver_drop_closes_channel_and_drains()107 fn single_receiver_drop_closes_channel_and_drains() {
108     {
109         let ref_count = Arc::new(0);
110         let weak_ref = Arc::downgrade(&ref_count);
111 
112         let (sender, receiver) = mpsc::unbounded();
113         sender.unbounded_send(ref_count).expect("failed to send");
114 
115         // Verify that the sent message is still live.
116         assert!(weak_ref.upgrade().is_some());
117 
118         drop(receiver);
119 
120         // The sender should know the channel is closed.
121         assert!(sender.is_closed());
122 
123         // Verify that the sent message has been dropped.
124         assert!(weak_ref.upgrade().is_none());
125     }
126 
127     {
128         let ref_count = Arc::new(0);
129         let weak_ref = Arc::downgrade(&ref_count);
130 
131         let (mut sender, receiver) = mpsc::channel(1);
132         sender.try_send(ref_count).expect("failed to send");
133 
134         // Verify that the sent message is still live.
135         assert!(weak_ref.upgrade().is_some());
136 
137         drop(receiver);
138 
139         // The sender should know the channel is closed.
140         assert!(sender.is_closed());
141 
142         // Verify that the sent message has been dropped.
143         assert!(weak_ref.upgrade().is_none());
144         assert!(sender.is_closed());
145     }
146 }
147 
148 // Stress test that `try_send()`s occurring concurrently with receiver
149 // close/drops don't appear as successful sends.
150 #[test]
stress_try_send_as_receiver_closes()151 fn stress_try_send_as_receiver_closes() {
152     const AMT: usize = 10000;
153     // To provide variable timing characteristics (in the hopes of
154     // reproducing the collision that leads to a race), we busy-re-poll
155     // the test MPSC receiver a variable number of times before actually
156     // stopping.  We vary this countdown between 1 and the following
157     // value.
158     const MAX_COUNTDOWN: usize = 20;
159     // When we detect that a successfully sent item is still in the
160     // queue after a disconnect, we spin for up to 100ms to confirm that
161     // it is a persistent condition and not a concurrency illusion.
162     const SPIN_TIMEOUT_S: u64 = 10;
163     const SPIN_SLEEP_MS: u64 = 10;
164     struct TestRx {
165         rx: mpsc::Receiver<Arc<()>>,
166         // The number of times to query `rx` before dropping it.
167         poll_count: usize,
168     }
169     struct TestTask {
170         command_rx: mpsc::Receiver<TestRx>,
171         test_rx: Option<mpsc::Receiver<Arc<()>>>,
172         countdown: usize,
173     }
174     impl TestTask {
175         /// Create a new TestTask
176         fn new() -> (TestTask, mpsc::Sender<TestRx>) {
177             let (command_tx, command_rx) = mpsc::channel::<TestRx>(0);
178             (
179                 TestTask {
180                     command_rx,
181                     test_rx: None,
182                     countdown: 0, // 0 means no countdown is in progress.
183                 },
184                 command_tx,
185             )
186         }
187     }
188     impl Future for TestTask {
189         type Output = ();
190 
191         fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
192             // Poll the test channel, if one is present.
193             if let Some(rx) = &mut self.test_rx {
194                 if let Poll::Ready(v) = rx.poll_next_unpin(cx) {
195                     let _ = v.expect("test finished unexpectedly!");
196                 }
197                 self.countdown -= 1;
198                 // Busy-poll until the countdown is finished.
199                 cx.waker().wake_by_ref();
200             }
201             // Accept any newly submitted MPSC channels for testing.
202             match self.command_rx.poll_next_unpin(cx) {
203                 Poll::Ready(Some(TestRx { rx, poll_count })) => {
204                     self.test_rx = Some(rx);
205                     self.countdown = poll_count;
206                     cx.waker().wake_by_ref();
207                 }
208                 Poll::Ready(None) => return Poll::Ready(()),
209                 Poll::Pending => {}
210             }
211             if self.countdown == 0 {
212                 // Countdown complete -- drop the Receiver.
213                 self.test_rx = None;
214             }
215             Poll::Pending
216         }
217     }
218     let (f, mut cmd_tx) = TestTask::new();
219     let bg = thread::spawn(move || block_on(f));
220     for i in 0..AMT {
221         let (mut test_tx, rx) = mpsc::channel(0);
222         let poll_count = i % MAX_COUNTDOWN;
223         cmd_tx.try_send(TestRx { rx, poll_count }).unwrap();
224         let mut prev_weak: Option<Weak<()>> = None;
225         let mut attempted_sends = 0;
226         let mut successful_sends = 0;
227         loop {
228             // Create a test item.
229             let item = Arc::new(());
230             let weak = Arc::downgrade(&item);
231             match test_tx.try_send(item) {
232                 Ok(_) => {
233                     prev_weak = Some(weak);
234                     successful_sends += 1;
235                 }
236                 Err(ref e) if e.is_full() => {}
237                 Err(ref e) if e.is_disconnected() => {
238                     // Test for evidence of the race condition.
239                     if let Some(prev_weak) = prev_weak {
240                         if prev_weak.upgrade().is_some() {
241                             // The previously sent item is still allocated.
242                             // However, there appears to be some aspect of the
243                             // concurrency that can legitimately cause the Arc
244                             // to be momentarily valid.  Spin for up to 100ms
245                             // waiting for the previously sent item to be
246                             // dropped.
247                             let t0 = Instant::now();
248                             let mut spins = 0;
249                             loop {
250                                 if prev_weak.upgrade().is_none() {
251                                     break;
252                                 }
253                                 assert!(
254                                     t0.elapsed() < Duration::from_secs(SPIN_TIMEOUT_S),
255                                     "item not dropped on iteration {} after \
256                                      {} sends ({} successful). spin=({})",
257                                     i,
258                                     attempted_sends,
259                                     successful_sends,
260                                     spins
261                                 );
262                                 spins += 1;
263                                 thread::sleep(Duration::from_millis(SPIN_SLEEP_MS));
264                             }
265                         }
266                     }
267                     break;
268                 }
269                 Err(ref e) => panic!("unexpected error: {}", e),
270             }
271             attempted_sends += 1;
272         }
273     }
274     drop(cmd_tx);
275     bg.join().expect("background thread join");
276 }
277 
278 #[test]
unbounded_try_next_after_none()279 fn unbounded_try_next_after_none() {
280     let (tx, mut rx) = mpsc::unbounded::<String>();
281     // Drop the sender, close the channel.
282     drop(tx);
283     // Receive the end of channel.
284     assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
285     // None received, check we can call `try_next` again.
286     assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
287 }
288 
289 #[test]
bounded_try_next_after_none()290 fn bounded_try_next_after_none() {
291     let (tx, mut rx) = mpsc::channel::<String>(17);
292     // Drop the sender, close the channel.
293     drop(tx);
294     // Receive the end of channel.
295     assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
296     // None received, check we can call `try_next` again.
297     assert_eq!(Ok(None), rx.try_next().map_err(|_| ()));
298 }
299