1 use tokio::stream::{self, StreamExt};
2 use tokio::sync::mpsc;
3 use tokio_test::{assert_pending, assert_ready, assert_ready_err, assert_ready_ok, task};
4
5 use bytes::{Bytes, BytesMut};
6
7 #[allow(clippy::let_unit_value)]
8 #[tokio::test]
empty_unit()9 async fn empty_unit() {
10 // Drains the stream.
11 let mut iter = vec![(), (), ()].into_iter();
12 let _: () = stream::iter(&mut iter).collect().await;
13 assert!(iter.next().is_none());
14 }
15
16 #[tokio::test]
empty_vec()17 async fn empty_vec() {
18 let coll: Vec<u32> = stream::empty().collect().await;
19 assert!(coll.is_empty());
20 }
21
22 #[tokio::test]
empty_box_slice()23 async fn empty_box_slice() {
24 let coll: Box<[u32]> = stream::empty().collect().await;
25 assert!(coll.is_empty());
26 }
27
28 #[tokio::test]
empty_bytes()29 async fn empty_bytes() {
30 let coll: Bytes = stream::empty::<&[u8]>().collect().await;
31 assert!(coll.is_empty());
32 }
33
34 #[tokio::test]
empty_bytes_mut()35 async fn empty_bytes_mut() {
36 let coll: BytesMut = stream::empty::<&[u8]>().collect().await;
37 assert!(coll.is_empty());
38 }
39
40 #[tokio::test]
empty_string()41 async fn empty_string() {
42 let coll: String = stream::empty::<&str>().collect().await;
43 assert!(coll.is_empty());
44 }
45
46 #[tokio::test]
empty_result()47 async fn empty_result() {
48 let coll: Result<Vec<u32>, &str> = stream::empty().collect().await;
49 assert_eq!(Ok(vec![]), coll);
50 }
51
52 #[tokio::test]
collect_vec_items()53 async fn collect_vec_items() {
54 let (tx, rx) = mpsc::unbounded_channel();
55 let mut fut = task::spawn(rx.collect::<Vec<i32>>());
56
57 assert_pending!(fut.poll());
58
59 tx.send(1).unwrap();
60 assert!(fut.is_woken());
61 assert_pending!(fut.poll());
62
63 tx.send(2).unwrap();
64 assert!(fut.is_woken());
65 assert_pending!(fut.poll());
66
67 drop(tx);
68 assert!(fut.is_woken());
69 let coll = assert_ready!(fut.poll());
70 assert_eq!(vec![1, 2], coll);
71 }
72
73 #[tokio::test]
collect_string_items()74 async fn collect_string_items() {
75 let (tx, rx) = mpsc::unbounded_channel();
76 let mut fut = task::spawn(rx.collect::<String>());
77
78 assert_pending!(fut.poll());
79
80 tx.send("hello ".to_string()).unwrap();
81 assert!(fut.is_woken());
82 assert_pending!(fut.poll());
83
84 tx.send("world".to_string()).unwrap();
85 assert!(fut.is_woken());
86 assert_pending!(fut.poll());
87
88 drop(tx);
89 assert!(fut.is_woken());
90 let coll = assert_ready!(fut.poll());
91 assert_eq!("hello world", coll);
92 }
93
94 #[tokio::test]
collect_str_items()95 async fn collect_str_items() {
96 let (tx, rx) = mpsc::unbounded_channel();
97 let mut fut = task::spawn(rx.collect::<String>());
98
99 assert_pending!(fut.poll());
100
101 tx.send("hello ").unwrap();
102 assert!(fut.is_woken());
103 assert_pending!(fut.poll());
104
105 tx.send("world").unwrap();
106 assert!(fut.is_woken());
107 assert_pending!(fut.poll());
108
109 drop(tx);
110 assert!(fut.is_woken());
111 let coll = assert_ready!(fut.poll());
112 assert_eq!("hello world", coll);
113 }
114
115 #[tokio::test]
collect_bytes()116 async fn collect_bytes() {
117 let (tx, rx) = mpsc::unbounded_channel();
118 let mut fut = task::spawn(rx.collect::<Bytes>());
119
120 assert_pending!(fut.poll());
121
122 tx.send(&b"hello "[..]).unwrap();
123 assert!(fut.is_woken());
124 assert_pending!(fut.poll());
125
126 tx.send(&b"world"[..]).unwrap();
127 assert!(fut.is_woken());
128 assert_pending!(fut.poll());
129
130 drop(tx);
131 assert!(fut.is_woken());
132 let coll = assert_ready!(fut.poll());
133 assert_eq!(&b"hello world"[..], coll);
134 }
135
136 #[tokio::test]
collect_results_ok()137 async fn collect_results_ok() {
138 let (tx, rx) = mpsc::unbounded_channel();
139 let mut fut = task::spawn(rx.collect::<Result<String, &str>>());
140
141 assert_pending!(fut.poll());
142
143 tx.send(Ok("hello ")).unwrap();
144 assert!(fut.is_woken());
145 assert_pending!(fut.poll());
146
147 tx.send(Ok("world")).unwrap();
148 assert!(fut.is_woken());
149 assert_pending!(fut.poll());
150
151 drop(tx);
152 assert!(fut.is_woken());
153 let coll = assert_ready_ok!(fut.poll());
154 assert_eq!("hello world", coll);
155 }
156
157 #[tokio::test]
collect_results_err()158 async fn collect_results_err() {
159 let (tx, rx) = mpsc::unbounded_channel();
160 let mut fut = task::spawn(rx.collect::<Result<String, &str>>());
161
162 assert_pending!(fut.poll());
163
164 tx.send(Ok("hello ")).unwrap();
165 assert!(fut.is_woken());
166 assert_pending!(fut.poll());
167
168 tx.send(Err("oh no")).unwrap();
169 assert!(fut.is_woken());
170 let err = assert_ready_err!(fut.poll());
171 assert_eq!("oh no", err);
172 }
173