1 use futures::channel::{mpsc, oneshot};
2 use futures::executor::{block_on, block_on_stream};
3 use futures::future::{poll_fn, FutureExt};
4 use futures::pin_mut;
5 use futures::sink::{Sink, SinkExt};
6 use futures::stream::{Stream, StreamExt};
7 use futures::task::{Context, Poll};
8 use futures_test::task::{new_count_waker, noop_context};
9 use std::sync::atomic::{AtomicUsize, Ordering};
10 use std::sync::{Arc, Mutex};
11 use std::thread;
12 
13 trait AssertSend: Send {}
14 impl AssertSend for mpsc::Sender<i32> {}
15 impl AssertSend for mpsc::Receiver<i32> {}
16 
17 #[test]
send_recv()18 fn send_recv() {
19     let (mut tx, rx) = mpsc::channel::<i32>(16);
20 
21     block_on(tx.send(1)).unwrap();
22     drop(tx);
23     let v: Vec<_> = block_on(rx.collect());
24     assert_eq!(v, vec![1]);
25 }
26 
27 #[test]
send_recv_no_buffer()28 fn send_recv_no_buffer() {
29     // Run on a task context
30     block_on(poll_fn(move |cx| {
31         let (tx, rx) = mpsc::channel::<i32>(0);
32         pin_mut!(tx, rx);
33 
34         assert!(tx.as_mut().poll_flush(cx).is_ready());
35         assert!(tx.as_mut().poll_ready(cx).is_ready());
36 
37         // Send first message
38         assert!(tx.as_mut().start_send(1).is_ok());
39         assert!(tx.as_mut().poll_ready(cx).is_pending());
40 
41         // poll_ready said Pending, so no room in buffer, therefore new sends
42         // should get rejected with is_full.
43         assert!(tx.as_mut().start_send(0).unwrap_err().is_full());
44         assert!(tx.as_mut().poll_ready(cx).is_pending());
45 
46         // Take the value
47         assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(1)));
48         assert!(tx.as_mut().poll_ready(cx).is_ready());
49 
50         // Send second message
51         assert!(tx.as_mut().poll_ready(cx).is_ready());
52         assert!(tx.as_mut().start_send(2).is_ok());
53         assert!(tx.as_mut().poll_ready(cx).is_pending());
54 
55         // Take the value
56         assert_eq!(rx.as_mut().poll_next(cx), Poll::Ready(Some(2)));
57         assert!(tx.as_mut().poll_ready(cx).is_ready());
58 
59         Poll::Ready(())
60     }));
61 }
62 
63 #[test]
send_shared_recv()64 fn send_shared_recv() {
65     let (mut tx1, rx) = mpsc::channel::<i32>(16);
66     let mut rx = block_on_stream(rx);
67     let mut tx2 = tx1.clone();
68 
69     block_on(tx1.send(1)).unwrap();
70     assert_eq!(rx.next(), Some(1));
71 
72     block_on(tx2.send(2)).unwrap();
73     assert_eq!(rx.next(), Some(2));
74 }
75 
76 #[test]
send_recv_threads()77 fn send_recv_threads() {
78     let (mut tx, rx) = mpsc::channel::<i32>(16);
79 
80     let t = thread::spawn(move || {
81         block_on(tx.send(1)).unwrap();
82     });
83 
84     let v: Vec<_> = block_on(rx.take(1).collect());
85     assert_eq!(v, vec![1]);
86 
87     t.join().unwrap();
88 }
89 
90 #[test]
send_recv_threads_no_capacity()91 fn send_recv_threads_no_capacity() {
92     let (mut tx, rx) = mpsc::channel::<i32>(0);
93 
94     let t = thread::spawn(move || {
95         block_on(tx.send(1)).unwrap();
96         block_on(tx.send(2)).unwrap();
97     });
98 
99     let v: Vec<_> = block_on(rx.collect());
100     assert_eq!(v, vec![1, 2]);
101 
102     t.join().unwrap();
103 }
104 
105 #[test]
recv_close_gets_none()106 fn recv_close_gets_none() {
107     let (mut tx, mut rx) = mpsc::channel::<i32>(10);
108 
109     // Run on a task context
110     block_on(poll_fn(move |cx| {
111         rx.close();
112 
113         assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None));
114         match tx.poll_ready(cx) {
115             Poll::Pending | Poll::Ready(Ok(_)) => panic!(),
116             Poll::Ready(Err(e)) => assert!(e.is_disconnected()),
117         };
118 
119         Poll::Ready(())
120     }));
121 }
122 
123 #[test]
tx_close_gets_none()124 fn tx_close_gets_none() {
125     let (_, mut rx) = mpsc::channel::<i32>(10);
126 
127     // Run on a task context
128     block_on(poll_fn(move |cx| {
129         assert_eq!(rx.poll_next_unpin(cx), Poll::Ready(None));
130         Poll::Ready(())
131     }));
132 }
133 
134 // #[test]
135 // fn spawn_sends_items() {
136 //     let core = local_executor::Core::new();
137 //     let stream = unfold(0, |i| Some(ok::<_,u8>((i, i + 1))));
138 //     let rx = mpsc::spawn(stream, &core, 1);
139 //     assert_eq!(core.run(rx.take(4).collect()).unwrap(),
140 //                [0, 1, 2, 3]);
141 // }
142 
143 // #[test]
144 // fn spawn_kill_dead_stream() {
145 //     use std::thread;
146 //     use std::time::Duration;
147 //     use futures::future::Either;
148 //     use futures::sync::oneshot;
149 //
150 //     // a stream which never returns anything (maybe a remote end isn't
151 //     // responding), but dropping it leads to observable side effects
152 //     // (like closing connections, releasing limited resources, ...)
153 //     #[derive(Debug)]
154 //     struct Dead {
155 //         // when dropped you should get Err(oneshot::Canceled) on the
156 //         // receiving end
157 //         done: oneshot::Sender<()>,
158 //     }
159 //     impl Stream for Dead {
160 //         type Item = ();
161 //         type Error = ();
162 //
163 //         fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
164 //             Ok(Poll::Pending)
165 //         }
166 //     }
167 //
168 //     // need to implement a timeout for the test, as it would hang
169 //     // forever right now
170 //     let (timeout_tx, timeout_rx) = oneshot::channel();
171 //     thread::spawn(move || {
172 //         thread::sleep(Duration::from_millis(1000));
173 //         let _ = timeout_tx.send(());
174 //     });
175 //
176 //     let core = local_executor::Core::new();
177 //     let (done_tx, done_rx) = oneshot::channel();
178 //     let stream = Dead{done: done_tx};
179 //     let rx = mpsc::spawn(stream, &core, 1);
180 //     let res = core.run(
181 //         Ok::<_, ()>(())
182 //         .into_future()
183 //         .then(move |_| {
184 //             // now drop the spawned stream: maybe some timeout exceeded,
185 //             // or some connection on this end was closed by the remote
186 //             // end.
187 //             drop(rx);
188 //             // and wait for the spawned stream to release its resources
189 //             done_rx
190 //         })
191 //         .select2(timeout_rx)
192 //     );
193 //     match res {
194 //         Err(Either::A((oneshot::Canceled, _))) => (),
195 //         _ => {
196 //             panic!("dead stream wasn't canceled");
197 //         },
198 //     }
199 // }
200 
201 #[test]
stress_shared_unbounded()202 fn stress_shared_unbounded() {
203     #[cfg(miri)]
204     const AMT: u32 = 100;
205     #[cfg(not(miri))]
206     const AMT: u32 = 10000;
207     const NTHREADS: u32 = 8;
208     let (tx, rx) = mpsc::unbounded::<i32>();
209 
210     let t = thread::spawn(move || {
211         let result: Vec<_> = block_on(rx.collect());
212         assert_eq!(result.len(), (AMT * NTHREADS) as usize);
213         for item in result {
214             assert_eq!(item, 1);
215         }
216     });
217 
218     for _ in 0..NTHREADS {
219         let tx = tx.clone();
220 
221         thread::spawn(move || {
222             for _ in 0..AMT {
223                 tx.unbounded_send(1).unwrap();
224             }
225         });
226     }
227 
228     drop(tx);
229 
230     t.join().ok().unwrap();
231 }
232 
233 #[test]
stress_shared_bounded_hard()234 fn stress_shared_bounded_hard() {
235     #[cfg(miri)]
236     const AMT: u32 = 100;
237     #[cfg(not(miri))]
238     const AMT: u32 = 10000;
239     const NTHREADS: u32 = 8;
240     let (tx, rx) = mpsc::channel::<i32>(0);
241 
242     let t = thread::spawn(move || {
243         let result: Vec<_> = block_on(rx.collect());
244         assert_eq!(result.len(), (AMT * NTHREADS) as usize);
245         for item in result {
246             assert_eq!(item, 1);
247         }
248     });
249 
250     for _ in 0..NTHREADS {
251         let mut tx = tx.clone();
252 
253         thread::spawn(move || {
254             for _ in 0..AMT {
255                 block_on(tx.send(1)).unwrap();
256             }
257         });
258     }
259 
260     drop(tx);
261 
262     t.join().unwrap();
263 }
264 
265 #[allow(clippy::same_item_push)]
266 #[test]
stress_receiver_multi_task_bounded_hard()267 fn stress_receiver_multi_task_bounded_hard() {
268     #[cfg(miri)]
269     const AMT: usize = 100;
270     #[cfg(not(miri))]
271     const AMT: usize = 10_000;
272     const NTHREADS: u32 = 2;
273 
274     let (mut tx, rx) = mpsc::channel::<usize>(0);
275     let rx = Arc::new(Mutex::new(Some(rx)));
276     let n = Arc::new(AtomicUsize::new(0));
277 
278     let mut th = vec![];
279 
280     for _ in 0..NTHREADS {
281         let rx = rx.clone();
282         let n = n.clone();
283 
284         let t = thread::spawn(move || {
285             let mut i = 0;
286 
287             loop {
288                 i += 1;
289                 let mut rx_opt = rx.lock().unwrap();
290                 if let Some(rx) = &mut *rx_opt {
291                     if i % 5 == 0 {
292                         let item = block_on(rx.next());
293 
294                         if item.is_none() {
295                             *rx_opt = None;
296                             break;
297                         }
298 
299                         n.fetch_add(1, Ordering::Relaxed);
300                     } else {
301                         // Just poll
302                         let n = n.clone();
303                         match rx.poll_next_unpin(&mut noop_context()) {
304                             Poll::Ready(Some(_)) => {
305                                 n.fetch_add(1, Ordering::Relaxed);
306                             }
307                             Poll::Ready(None) => {
308                                 *rx_opt = None;
309                                 break;
310                             }
311                             Poll::Pending => {}
312                         }
313                     }
314                 } else {
315                     break;
316                 }
317             }
318         });
319 
320         th.push(t);
321     }
322 
323     for i in 0..AMT {
324         block_on(tx.send(i)).unwrap();
325     }
326     drop(tx);
327 
328     for t in th {
329         t.join().unwrap();
330     }
331 
332     assert_eq!(AMT, n.load(Ordering::Relaxed));
333 }
334 
335 /// Stress test that receiver properly receives all the messages
336 /// after sender dropped.
337 #[test]
stress_drop_sender()338 fn stress_drop_sender() {
339     #[cfg(miri)]
340     const ITER: usize = 100;
341     #[cfg(not(miri))]
342     const ITER: usize = 10000;
343 
344     fn list() -> impl Stream<Item = i32> {
345         let (tx, rx) = mpsc::channel(1);
346         thread::spawn(move || {
347             block_on(send_one_two_three(tx));
348         });
349         rx
350     }
351 
352     for _ in 0..ITER {
353         let v: Vec<_> = block_on(list().collect());
354         assert_eq!(v, vec![1, 2, 3]);
355     }
356 }
357 
send_one_two_three(mut tx: mpsc::Sender<i32>)358 async fn send_one_two_three(mut tx: mpsc::Sender<i32>) {
359     for i in 1..=3 {
360         tx.send(i).await.unwrap();
361     }
362 }
363 
364 /// Stress test that after receiver dropped,
365 /// no messages are lost.
stress_close_receiver_iter()366 fn stress_close_receiver_iter() {
367     let (tx, rx) = mpsc::unbounded();
368     let mut rx = block_on_stream(rx);
369     let (unwritten_tx, unwritten_rx) = std::sync::mpsc::channel();
370     let th = thread::spawn(move || {
371         for i in 1.. {
372             if tx.unbounded_send(i).is_err() {
373                 unwritten_tx.send(i).expect("unwritten_tx");
374                 return;
375             }
376         }
377     });
378 
379     // Read one message to make sure thread effectively started
380     assert_eq!(Some(1), rx.next());
381 
382     rx.close();
383 
384     for i in 2.. {
385         match rx.next() {
386             Some(r) => assert!(i == r),
387             None => {
388                 let unwritten = unwritten_rx.recv().expect("unwritten_rx");
389                 assert_eq!(unwritten, i);
390                 th.join().unwrap();
391                 return;
392             }
393         }
394     }
395 }
396 
397 #[cfg_attr(miri, ignore)] // Miri is too slow
398 #[test]
stress_close_receiver()399 fn stress_close_receiver() {
400     const ITER: usize = 10000;
401 
402     for _ in 0..ITER {
403         stress_close_receiver_iter();
404     }
405 }
406 
stress_poll_ready_sender(mut sender: mpsc::Sender<u32>, count: u32)407 async fn stress_poll_ready_sender(mut sender: mpsc::Sender<u32>, count: u32) {
408     for i in (1..=count).rev() {
409         sender.send(i).await.unwrap();
410     }
411 }
412 
413 /// Tests that after `poll_ready` indicates capacity a channel can always send without waiting.
414 #[allow(clippy::same_item_push)]
415 #[test]
stress_poll_ready()416 fn stress_poll_ready() {
417     #[cfg(miri)]
418     const AMT: u32 = 100;
419     #[cfg(not(miri))]
420     const AMT: u32 = 1000;
421     const NTHREADS: u32 = 8;
422 
423     /// Run a stress test using the specified channel capacity.
424     fn stress(capacity: usize) {
425         let (tx, rx) = mpsc::channel(capacity);
426         let mut threads = Vec::new();
427         for _ in 0..NTHREADS {
428             let sender = tx.clone();
429             threads.push(thread::spawn(move || block_on(stress_poll_ready_sender(sender, AMT))));
430         }
431         drop(tx);
432 
433         let result: Vec<_> = block_on(rx.collect());
434         assert_eq!(result.len() as u32, AMT * NTHREADS);
435 
436         for thread in threads {
437             thread.join().unwrap();
438         }
439     }
440 
441     stress(0);
442     stress(1);
443     stress(8);
444     stress(16);
445 }
446 
447 #[cfg_attr(miri, ignore)] // Miri is too slow
448 #[test]
try_send_1()449 fn try_send_1() {
450     const N: usize = 3000;
451     let (mut tx, rx) = mpsc::channel(0);
452 
453     let t = thread::spawn(move || {
454         for i in 0..N {
455             loop {
456                 if tx.try_send(i).is_ok() {
457                     break;
458                 }
459             }
460         }
461     });
462 
463     let result: Vec<_> = block_on(rx.collect());
464     for (i, j) in result.into_iter().enumerate() {
465         assert_eq!(i, j);
466     }
467 
468     t.join().unwrap();
469 }
470 
471 #[test]
try_send_2()472 fn try_send_2() {
473     let (mut tx, rx) = mpsc::channel(0);
474     let mut rx = block_on_stream(rx);
475 
476     tx.try_send("hello").unwrap();
477 
478     let (readytx, readyrx) = oneshot::channel::<()>();
479 
480     let th = thread::spawn(move || {
481         block_on(poll_fn(|cx| {
482             assert!(tx.poll_ready(cx).is_pending());
483             Poll::Ready(())
484         }));
485 
486         drop(readytx);
487         block_on(tx.send("goodbye")).unwrap();
488     });
489 
490     let _ = block_on(readyrx);
491     assert_eq!(rx.next(), Some("hello"));
492     assert_eq!(rx.next(), Some("goodbye"));
493     assert_eq!(rx.next(), None);
494 
495     th.join().unwrap();
496 }
497 
498 #[test]
try_send_fail()499 fn try_send_fail() {
500     let (mut tx, rx) = mpsc::channel(0);
501     let mut rx = block_on_stream(rx);
502 
503     tx.try_send("hello").unwrap();
504 
505     // This should fail
506     assert!(tx.try_send("fail").is_err());
507 
508     assert_eq!(rx.next(), Some("hello"));
509 
510     tx.try_send("goodbye").unwrap();
511     drop(tx);
512 
513     assert_eq!(rx.next(), Some("goodbye"));
514     assert_eq!(rx.next(), None);
515 }
516 
517 #[test]
try_send_recv()518 fn try_send_recv() {
519     let (mut tx, mut rx) = mpsc::channel(1);
520     tx.try_send("hello").unwrap();
521     tx.try_send("hello").unwrap();
522     tx.try_send("hello").unwrap_err(); // should be full
523     rx.try_next().unwrap();
524     rx.try_next().unwrap();
525     rx.try_next().unwrap_err(); // should be empty
526     tx.try_send("hello").unwrap();
527     rx.try_next().unwrap();
528     rx.try_next().unwrap_err(); // should be empty
529 }
530 
531 #[test]
same_receiver()532 fn same_receiver() {
533     let (mut txa1, _) = mpsc::channel::<i32>(1);
534     let txa2 = txa1.clone();
535 
536     let (mut txb1, _) = mpsc::channel::<i32>(1);
537     let txb2 = txb1.clone();
538 
539     assert!(txa1.same_receiver(&txa2));
540     assert!(txb1.same_receiver(&txb2));
541     assert!(!txa1.same_receiver(&txb1));
542 
543     txa1.disconnect();
544     txb1.close_channel();
545 
546     assert!(!txa1.same_receiver(&txa2));
547     assert!(txb1.same_receiver(&txb2));
548 }
549 
550 #[test]
is_connected_to()551 fn is_connected_to() {
552     let (txa, rxa) = mpsc::channel::<i32>(1);
553     let (txb, rxb) = mpsc::channel::<i32>(1);
554 
555     assert!(txa.is_connected_to(&rxa));
556     assert!(txb.is_connected_to(&rxb));
557     assert!(!txa.is_connected_to(&rxb));
558     assert!(!txb.is_connected_to(&rxa));
559 }
560 
561 #[test]
hash_receiver()562 fn hash_receiver() {
563     use std::collections::hash_map::DefaultHasher;
564     use std::hash::Hasher;
565 
566     let mut hasher_a1 = DefaultHasher::new();
567     let mut hasher_a2 = DefaultHasher::new();
568     let mut hasher_b1 = DefaultHasher::new();
569     let mut hasher_b2 = DefaultHasher::new();
570     let (mut txa1, _) = mpsc::channel::<i32>(1);
571     let txa2 = txa1.clone();
572 
573     let (mut txb1, _) = mpsc::channel::<i32>(1);
574     let txb2 = txb1.clone();
575 
576     txa1.hash_receiver(&mut hasher_a1);
577     let hash_a1 = hasher_a1.finish();
578     txa2.hash_receiver(&mut hasher_a2);
579     let hash_a2 = hasher_a2.finish();
580     txb1.hash_receiver(&mut hasher_b1);
581     let hash_b1 = hasher_b1.finish();
582     txb2.hash_receiver(&mut hasher_b2);
583     let hash_b2 = hasher_b2.finish();
584 
585     assert_eq!(hash_a1, hash_a2);
586     assert_eq!(hash_b1, hash_b2);
587     assert!(hash_a1 != hash_b1);
588 
589     txa1.disconnect();
590     txb1.close_channel();
591 
592     let mut hasher_a1 = DefaultHasher::new();
593     let mut hasher_a2 = DefaultHasher::new();
594     let mut hasher_b1 = DefaultHasher::new();
595     let mut hasher_b2 = DefaultHasher::new();
596 
597     txa1.hash_receiver(&mut hasher_a1);
598     let hash_a1 = hasher_a1.finish();
599     txa2.hash_receiver(&mut hasher_a2);
600     let hash_a2 = hasher_a2.finish();
601     txb1.hash_receiver(&mut hasher_b1);
602     let hash_b1 = hasher_b1.finish();
603     txb2.hash_receiver(&mut hasher_b2);
604     let hash_b2 = hasher_b2.finish();
605 
606     assert!(hash_a1 != hash_a2);
607     assert_eq!(hash_b1, hash_b2);
608 }
609 
610 #[test]
send_backpressure()611 fn send_backpressure() {
612     let (waker, counter) = new_count_waker();
613     let mut cx = Context::from_waker(&waker);
614 
615     let (mut tx, mut rx) = mpsc::channel(1);
616     block_on(tx.send(1)).unwrap();
617 
618     let mut task = tx.send(2);
619     assert_eq!(task.poll_unpin(&mut cx), Poll::Pending);
620     assert_eq!(counter, 0);
621 
622     let item = block_on(rx.next()).unwrap();
623     assert_eq!(item, 1);
624     assert_eq!(counter, 1);
625     assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(())));
626 
627     let item = block_on(rx.next()).unwrap();
628     assert_eq!(item, 2);
629 }
630 
631 #[test]
send_backpressure_multi_senders()632 fn send_backpressure_multi_senders() {
633     let (waker, counter) = new_count_waker();
634     let mut cx = Context::from_waker(&waker);
635 
636     let (mut tx1, mut rx) = mpsc::channel(1);
637     let mut tx2 = tx1.clone();
638     block_on(tx1.send(1)).unwrap();
639 
640     let mut task = tx2.send(2);
641     assert_eq!(task.poll_unpin(&mut cx), Poll::Pending);
642     assert_eq!(counter, 0);
643 
644     let item = block_on(rx.next()).unwrap();
645     assert_eq!(item, 1);
646     assert_eq!(counter, 1);
647     assert_eq!(task.poll_unpin(&mut cx), Poll::Ready(Ok(())));
648 
649     let item = block_on(rx.next()).unwrap();
650     assert_eq!(item, 2);
651 }
652