1 use tokio::stream::{self, StreamExt};
2 use tokio::sync::mpsc;
3 use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
4 
5 use bytes::{Bytes, BytesMut};
6 
7 #[allow(clippy::let_unit_value)]
8 #[tokio::test]
empty_unit()9 async fn empty_unit() {
10     // Drains the stream.
11     let mut iter = vec![(), (), ()].into_iter();
12     let _: () = stream::iter(&mut iter).collect().await;
13     assert!(iter.next().is_none());
14 }
15 
16 #[tokio::test]
empty_vec()17 async fn empty_vec() {
18     let coll: Vec<u32> = stream::empty().collect().await;
19     assert!(coll.is_empty());
20 }
21 
22 #[tokio::test]
empty_box_slice()23 async fn empty_box_slice() {
24     let coll: Box<[u32]> = stream::empty().collect().await;
25     assert!(coll.is_empty());
26 }
27 
28 #[tokio::test]
empty_bytes()29 async fn empty_bytes() {
30     let coll: Bytes = stream::empty::<&[u8]>().collect().await;
31     assert!(coll.is_empty());
32 }
33 
34 #[tokio::test]
empty_bytes_mut()35 async fn empty_bytes_mut() {
36     let coll: BytesMut = stream::empty::<&[u8]>().collect().await;
37     assert!(coll.is_empty());
38 }
39 
40 #[tokio::test]
empty_string()41 async fn empty_string() {
42     let coll: String = stream::empty::<&str>().collect().await;
43     assert!(coll.is_empty());
44 }
45 
46 #[tokio::test]
empty_result()47 async fn empty_result() {
48     let coll: Result<Vec<u32>, &str> = stream::empty().collect().await;
49     assert_eq!(Ok(vec![]), coll);
50 }
51 
52 #[tokio::test]
collect_vec_items()53 async fn collect_vec_items() {
54     let (tx, rx) = mpsc::unbounded_channel();
55     let mut fut = task::spawn(rx.collect::<Vec<i32>>());
56 
57     assert_pending!(fut.poll());
58 
59     tx.send(1).unwrap();
60     assert!(fut.is_woken());
61     assert_pending!(fut.poll());
62 
63     tx.send(2).unwrap();
64     assert!(fut.is_woken());
65     assert_pending!(fut.poll());
66 
67     drop(tx);
68     assert!(fut.is_woken());
69     let coll = assert_ready!(fut.poll());
70     assert_eq!(vec![1, 2], coll);
71 }
72 
73 #[tokio::test]
collect_string_items()74 async fn collect_string_items() {
75     let (tx, rx) = mpsc::unbounded_channel();
76     let mut fut = task::spawn(rx.collect::<String>());
77 
78     assert_pending!(fut.poll());
79 
80     tx.send("hello ".to_string()).unwrap();
81     assert!(fut.is_woken());
82     assert_pending!(fut.poll());
83 
84     tx.send("world".to_string()).unwrap();
85     assert!(fut.is_woken());
86     assert_pending!(fut.poll());
87 
88     drop(tx);
89     assert!(fut.is_woken());
90     let coll = assert_ready!(fut.poll());
91     assert_eq!("hello world", coll);
92 }
93 
94 #[tokio::test]
collect_str_items()95 async fn collect_str_items() {
96     let (tx, rx) = mpsc::unbounded_channel();
97     let mut fut = task::spawn(rx.collect::<String>());
98 
99     assert_pending!(fut.poll());
100 
101     tx.send("hello ").unwrap();
102     assert!(fut.is_woken());
103     assert_pending!(fut.poll());
104 
105     tx.send("world").unwrap();
106     assert!(fut.is_woken());
107     assert_pending!(fut.poll());
108 
109     drop(tx);
110     assert!(fut.is_woken());
111     let coll = assert_ready!(fut.poll());
112     assert_eq!("hello world", coll);
113 }
114 
115 #[tokio::test]
collect_bytes()116 async fn collect_bytes() {
117     let (tx, rx) = mpsc::unbounded_channel();
118     let mut fut = task::spawn(rx.collect::<Bytes>());
119 
120     assert_pending!(fut.poll());
121 
122     tx.send(&b"hello "[..]).unwrap();
123     assert!(fut.is_woken());
124     assert_pending!(fut.poll());
125 
126     tx.send(&b"world"[..]).unwrap();
127     assert!(fut.is_woken());
128     assert_pending!(fut.poll());
129 
130     drop(tx);
131     assert!(fut.is_woken());
132     let coll = assert_ready!(fut.poll());
133     assert_eq!(&b"hello world"[..], coll);
134 }
135 
136 #[tokio::test]
collect_results_ok()137 async fn collect_results_ok() {
138     let (tx, rx) = mpsc::unbounded_channel();
139     let mut fut = task::spawn(rx.collect::<Result<String, &str>>());
140 
141     assert_pending!(fut.poll());
142 
143     tx.send(Ok("hello ")).unwrap();
144     assert!(fut.is_woken());
145     assert_pending!(fut.poll());
146 
147     tx.send(Ok("world")).unwrap();
148     assert!(fut.is_woken());
149     assert_pending!(fut.poll());
150 
151     drop(tx);
152     assert!(fut.is_woken());
153     let coll = assert_ready_ok!(fut.poll());
154     assert_eq!("hello world", coll);
155 }
156 
157 #[tokio::test]
collect_results_err()158 async fn collect_results_err() {
159     let (tx, rx) = mpsc::unbounded_channel();
160     let mut fut = task::spawn(rx.collect::<Result<String, &str>>());
161 
162     assert_pending!(fut.poll());
163 
164     tx.send(Ok("hello ")).unwrap();
165     assert!(fut.is_woken());
166     assert_pending!(fut.poll());
167 
168     tx.send(Err("oh no")).unwrap();
169     assert!(fut.is_woken());
170     let err = assert_ready_err!(fut.poll());
171     assert_eq!("oh no", err);
172 }
173