1 use tokio_stream::{self as stream, StreamExt};
2 use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
3 
4 mod support {
5     pub(crate) mod mpsc;
6 }
7 
8 use support::mpsc;
9 
10 #[allow(clippy::let_unit_value)]
11 #[tokio::test]
empty_unit()12 async fn empty_unit() {
13     // Drains the stream.
14     let mut iter = vec![(), (), ()].into_iter();
15     let _: () = stream::iter(&mut iter).collect().await;
16     assert!(iter.next().is_none());
17 }
18 
19 #[tokio::test]
empty_vec()20 async fn empty_vec() {
21     let coll: Vec<u32> = stream::empty().collect().await;
22     assert!(coll.is_empty());
23 }
24 
25 #[tokio::test]
empty_box_slice()26 async fn empty_box_slice() {
27     let coll: Box<[u32]> = stream::empty().collect().await;
28     assert!(coll.is_empty());
29 }
30 
31 #[tokio::test]
empty_string()32 async fn empty_string() {
33     let coll: String = stream::empty::<&str>().collect().await;
34     assert!(coll.is_empty());
35 }
36 
37 #[tokio::test]
empty_result()38 async fn empty_result() {
39     let coll: Result<Vec<u32>, &str> = stream::empty().collect().await;
40     assert_eq!(Ok(vec![]), coll);
41 }
42 
43 #[tokio::test]
collect_vec_items()44 async fn collect_vec_items() {
45     let (tx, rx) = mpsc::unbounded_channel_stream();
46     let mut fut = task::spawn(rx.collect::<Vec<i32>>());
47 
48     assert_pending!(fut.poll());
49 
50     tx.send(1).unwrap();
51     assert!(fut.is_woken());
52     assert_pending!(fut.poll());
53 
54     tx.send(2).unwrap();
55     assert!(fut.is_woken());
56     assert_pending!(fut.poll());
57 
58     drop(tx);
59     assert!(fut.is_woken());
60     let coll = assert_ready!(fut.poll());
61     assert_eq!(vec![1, 2], coll);
62 }
63 
64 #[tokio::test]
collect_string_items()65 async fn collect_string_items() {
66     let (tx, rx) = mpsc::unbounded_channel_stream();
67 
68     let mut fut = task::spawn(rx.collect::<String>());
69 
70     assert_pending!(fut.poll());
71 
72     tx.send("hello ".to_string()).unwrap();
73     assert!(fut.is_woken());
74     assert_pending!(fut.poll());
75 
76     tx.send("world".to_string()).unwrap();
77     assert!(fut.is_woken());
78     assert_pending!(fut.poll());
79 
80     drop(tx);
81     assert!(fut.is_woken());
82     let coll = assert_ready!(fut.poll());
83     assert_eq!("hello world", coll);
84 }
85 
86 #[tokio::test]
collect_str_items()87 async fn collect_str_items() {
88     let (tx, rx) = mpsc::unbounded_channel_stream();
89 
90     let mut fut = task::spawn(rx.collect::<String>());
91 
92     assert_pending!(fut.poll());
93 
94     tx.send("hello ").unwrap();
95     assert!(fut.is_woken());
96     assert_pending!(fut.poll());
97 
98     tx.send("world").unwrap();
99     assert!(fut.is_woken());
100     assert_pending!(fut.poll());
101 
102     drop(tx);
103     assert!(fut.is_woken());
104     let coll = assert_ready!(fut.poll());
105     assert_eq!("hello world", coll);
106 }
107 
108 #[tokio::test]
collect_results_ok()109 async fn collect_results_ok() {
110     let (tx, rx) = mpsc::unbounded_channel_stream();
111 
112     let mut fut = task::spawn(rx.collect::<Result<String, &str>>());
113 
114     assert_pending!(fut.poll());
115 
116     tx.send(Ok("hello ")).unwrap();
117     assert!(fut.is_woken());
118     assert_pending!(fut.poll());
119 
120     tx.send(Ok("world")).unwrap();
121     assert!(fut.is_woken());
122     assert_pending!(fut.poll());
123 
124     drop(tx);
125     assert!(fut.is_woken());
126     let coll = assert_ready_ok!(fut.poll());
127     assert_eq!("hello world", coll);
128 }
129 
130 #[tokio::test]
collect_results_err()131 async fn collect_results_err() {
132     let (tx, rx) = mpsc::unbounded_channel_stream();
133 
134     let mut fut = task::spawn(rx.collect::<Result<String, &str>>());
135 
136     assert_pending!(fut.poll());
137 
138     tx.send(Ok("hello ")).unwrap();
139     assert!(fut.is_woken());
140     assert_pending!(fut.poll());
141 
142     tx.send(Err("oh no")).unwrap();
143     assert!(fut.is_woken());
144     let err = assert_ready_err!(fut.poll());
145     assert_eq!("oh no", err);
146 }
147