1 use tokio::stream::{self, Stream, StreamExt};
2 use tokio::sync::mpsc;
3 use tokio_test::{assert_pending, assert_ready, task};
4 
5 #[tokio::test]
basic_usage()6 async fn basic_usage() {
7     let one = stream::iter(vec![1, 2, 3]);
8     let two = stream::iter(vec![4, 5, 6]);
9 
10     let mut stream = one.chain(two);
11 
12     assert_eq!(stream.size_hint(), (6, Some(6)));
13     assert_eq!(stream.next().await, Some(1));
14 
15     assert_eq!(stream.size_hint(), (5, Some(5)));
16     assert_eq!(stream.next().await, Some(2));
17 
18     assert_eq!(stream.size_hint(), (4, Some(4)));
19     assert_eq!(stream.next().await, Some(3));
20 
21     assert_eq!(stream.size_hint(), (3, Some(3)));
22     assert_eq!(stream.next().await, Some(4));
23 
24     assert_eq!(stream.size_hint(), (2, Some(2)));
25     assert_eq!(stream.next().await, Some(5));
26 
27     assert_eq!(stream.size_hint(), (1, Some(1)));
28     assert_eq!(stream.next().await, Some(6));
29 
30     assert_eq!(stream.size_hint(), (0, Some(0)));
31     assert_eq!(stream.next().await, None);
32 
33     assert_eq!(stream.size_hint(), (0, Some(0)));
34     assert_eq!(stream.next().await, None);
35 }
36 
37 #[tokio::test]
pending_first()38 async fn pending_first() {
39     let (tx1, rx1) = mpsc::unbounded_channel();
40     let (tx2, rx2) = mpsc::unbounded_channel();
41 
42     let mut stream = task::spawn(rx1.chain(rx2));
43     assert_eq!(stream.size_hint(), (0, None));
44 
45     assert_pending!(stream.poll_next());
46 
47     tx2.send(2).unwrap();
48     assert!(!stream.is_woken());
49 
50     assert_pending!(stream.poll_next());
51 
52     tx1.send(1).unwrap();
53     assert!(stream.is_woken());
54     assert_eq!(Some(1), assert_ready!(stream.poll_next()));
55 
56     assert_pending!(stream.poll_next());
57 
58     drop(tx1);
59 
60     assert_eq!(stream.size_hint(), (0, None));
61 
62     assert!(stream.is_woken());
63     assert_eq!(Some(2), assert_ready!(stream.poll_next()));
64 
65     assert_eq!(stream.size_hint(), (0, None));
66 
67     drop(tx2);
68 
69     assert_eq!(stream.size_hint(), (0, None));
70     assert_eq!(None, assert_ready!(stream.poll_next()));
71 }
72 
73 #[test]
size_overflow()74 fn size_overflow() {
75     struct Monster;
76 
77     impl tokio::stream::Stream for Monster {
78         type Item = ();
79         fn poll_next(
80             self: std::pin::Pin<&mut Self>,
81             _cx: &mut std::task::Context<'_>,
82         ) -> std::task::Poll<Option<()>> {
83             panic!()
84         }
85 
86         fn size_hint(&self) -> (usize, Option<usize>) {
87             (usize::max_value(), Some(usize::max_value()))
88         }
89     }
90 
91     let m1 = Monster;
92     let m2 = Monster;
93     let m = m1.chain(m2);
94     assert_eq!(m.size_hint(), (usize::max_value(), None));
95 }
96