1 use tokio::stream::{self, pending, Stream, StreamExt, StreamMap};
2 use tokio::sync::mpsc;
3 use tokio_test::{assert_ok, assert_pending, assert_ready, task};
4
5 use std::pin::Pin;
6
7 macro_rules! assert_ready_some {
8 ($($t:tt)*) => {
9 match assert_ready!($($t)*) {
10 Some(v) => v,
11 None => panic!("expected `Some`, got `None`"),
12 }
13 };
14 }
15
16 macro_rules! assert_ready_none {
17 ($($t:tt)*) => {
18 match assert_ready!($($t)*) {
19 None => {}
20 Some(v) => panic!("expected `None`, got `Some({:?})`", v),
21 }
22 };
23 }
24
25 #[tokio::test]
empty()26 async fn empty() {
27 let mut map = StreamMap::<&str, stream::Pending<()>>::new();
28
29 assert_eq!(map.len(), 0);
30 assert!(map.is_empty());
31
32 assert!(map.next().await.is_none());
33 assert!(map.next().await.is_none());
34
35 assert!(map.remove("foo").is_none());
36 }
37
38 #[tokio::test]
single_entry()39 async fn single_entry() {
40 let mut map = task::spawn(StreamMap::new());
41 let (tx, rx) = mpsc::unbounded_channel();
42
43 assert_ready_none!(map.poll_next());
44
45 assert!(map.insert("foo", rx).is_none());
46 assert!(map.contains_key("foo"));
47 assert!(!map.contains_key("bar"));
48
49 assert_eq!(map.len(), 1);
50 assert!(!map.is_empty());
51
52 assert_pending!(map.poll_next());
53
54 assert_ok!(tx.send(1));
55
56 assert!(map.is_woken());
57 let (k, v) = assert_ready_some!(map.poll_next());
58 assert_eq!(k, "foo");
59 assert_eq!(v, 1);
60
61 assert_pending!(map.poll_next());
62
63 assert_ok!(tx.send(2));
64
65 assert!(map.is_woken());
66 let (k, v) = assert_ready_some!(map.poll_next());
67 assert_eq!(k, "foo");
68 assert_eq!(v, 2);
69
70 assert_pending!(map.poll_next());
71 drop(tx);
72 assert!(map.is_woken());
73 assert_ready_none!(map.poll_next());
74 }
75
76 #[tokio::test]
multiple_entries()77 async fn multiple_entries() {
78 let mut map = task::spawn(StreamMap::new());
79 let (tx1, rx1) = mpsc::unbounded_channel();
80 let (tx2, rx2) = mpsc::unbounded_channel();
81
82 map.insert("foo", rx1);
83 map.insert("bar", rx2);
84
85 assert_pending!(map.poll_next());
86
87 assert_ok!(tx1.send(1));
88
89 assert!(map.is_woken());
90 let (k, v) = assert_ready_some!(map.poll_next());
91 assert_eq!(k, "foo");
92 assert_eq!(v, 1);
93
94 assert_pending!(map.poll_next());
95
96 assert_ok!(tx2.send(2));
97
98 assert!(map.is_woken());
99 let (k, v) = assert_ready_some!(map.poll_next());
100 assert_eq!(k, "bar");
101 assert_eq!(v, 2);
102
103 assert_pending!(map.poll_next());
104
105 assert_ok!(tx1.send(3));
106 assert_ok!(tx2.send(4));
107
108 assert!(map.is_woken());
109
110 // Given the randomization, there is no guarantee what order the values will
111 // be received in.
112 let mut v = (0..2)
113 .map(|_| assert_ready_some!(map.poll_next()))
114 .collect::<Vec<_>>();
115
116 assert_pending!(map.poll_next());
117
118 v.sort();
119 assert_eq!(v[0].0, "bar");
120 assert_eq!(v[0].1, 4);
121 assert_eq!(v[1].0, "foo");
122 assert_eq!(v[1].1, 3);
123
124 drop(tx1);
125 assert!(map.is_woken());
126 assert_pending!(map.poll_next());
127 drop(tx2);
128
129 assert_ready_none!(map.poll_next());
130 }
131
132 #[tokio::test]
insert_remove()133 async fn insert_remove() {
134 let mut map = task::spawn(StreamMap::new());
135 let (tx, rx) = mpsc::unbounded_channel();
136
137 assert_ready_none!(map.poll_next());
138
139 assert!(map.insert("foo", rx).is_none());
140 let rx = map.remove("foo").unwrap();
141
142 assert_ok!(tx.send(1));
143
144 assert!(!map.is_woken());
145 assert_ready_none!(map.poll_next());
146
147 assert!(map.insert("bar", rx).is_none());
148
149 let v = assert_ready_some!(map.poll_next());
150 assert_eq!(v.0, "bar");
151 assert_eq!(v.1, 1);
152
153 assert!(map.remove("bar").is_some());
154 assert_ready_none!(map.poll_next());
155
156 assert!(map.is_empty());
157 assert_eq!(0, map.len());
158 }
159
160 #[tokio::test]
replace()161 async fn replace() {
162 let mut map = task::spawn(StreamMap::new());
163 let (tx1, rx1) = mpsc::unbounded_channel();
164 let (tx2, rx2) = mpsc::unbounded_channel();
165
166 assert!(map.insert("foo", rx1).is_none());
167
168 assert_pending!(map.poll_next());
169
170 let _rx1 = map.insert("foo", rx2).unwrap();
171
172 assert_pending!(map.poll_next());
173
174 tx1.send(1).unwrap();
175 assert_pending!(map.poll_next());
176
177 tx2.send(2).unwrap();
178 assert!(map.is_woken());
179 let v = assert_ready_some!(map.poll_next());
180 assert_eq!(v.0, "foo");
181 assert_eq!(v.1, 2);
182 }
183
184 #[test]
size_hint_with_upper()185 fn size_hint_with_upper() {
186 let mut map = StreamMap::new();
187
188 map.insert("a", stream::iter(vec![1]));
189 map.insert("b", stream::iter(vec![1, 2]));
190 map.insert("c", stream::iter(vec![1, 2, 3]));
191
192 assert_eq!(3, map.len());
193 assert!(!map.is_empty());
194
195 let size_hint = map.size_hint();
196 assert_eq!(size_hint, (6, Some(6)));
197 }
198
199 #[test]
size_hint_without_upper()200 fn size_hint_without_upper() {
201 let mut map = StreamMap::new();
202
203 map.insert("a", pin_box(stream::iter(vec![1])));
204 map.insert("b", pin_box(stream::iter(vec![1, 2])));
205 map.insert("c", pin_box(pending()));
206
207 let size_hint = map.size_hint();
208 assert_eq!(size_hint, (3, None));
209 }
210
211 #[test]
new_capacity_zero()212 fn new_capacity_zero() {
213 let map = StreamMap::<&str, stream::Pending<()>>::new();
214 assert_eq!(0, map.capacity());
215
216 let keys = map.keys().collect::<Vec<_>>();
217 assert!(keys.is_empty());
218 }
219
220 #[test]
with_capacity()221 fn with_capacity() {
222 let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10);
223 assert!(10 <= map.capacity());
224
225 let keys = map.keys().collect::<Vec<_>>();
226 assert!(keys.is_empty());
227 }
228
229 #[test]
iter_keys()230 fn iter_keys() {
231 let mut map = StreamMap::new();
232
233 map.insert("a", pending::<i32>());
234 map.insert("b", pending());
235 map.insert("c", pending());
236
237 let mut keys = map.keys().collect::<Vec<_>>();
238 keys.sort();
239
240 assert_eq!(&keys[..], &[&"a", &"b", &"c"]);
241 }
242
243 #[test]
iter_values()244 fn iter_values() {
245 let mut map = StreamMap::new();
246
247 map.insert("a", stream::iter(vec![1]));
248 map.insert("b", stream::iter(vec![1, 2]));
249 map.insert("c", stream::iter(vec![1, 2, 3]));
250
251 let mut size_hints = map.values().map(|s| s.size_hint().0).collect::<Vec<_>>();
252
253 size_hints.sort();
254
255 assert_eq!(&size_hints[..], &[1, 2, 3]);
256 }
257
258 #[test]
iter_values_mut()259 fn iter_values_mut() {
260 let mut map = StreamMap::new();
261
262 map.insert("a", stream::iter(vec![1]));
263 map.insert("b", stream::iter(vec![1, 2]));
264 map.insert("c", stream::iter(vec![1, 2, 3]));
265
266 let mut size_hints = map
267 .values_mut()
268 .map(|s: &mut _| s.size_hint().0)
269 .collect::<Vec<_>>();
270
271 size_hints.sort();
272
273 assert_eq!(&size_hints[..], &[1, 2, 3]);
274 }
275
276 #[test]
clear()277 fn clear() {
278 let mut map = task::spawn(StreamMap::new());
279
280 map.insert("a", stream::iter(vec![1]));
281 map.insert("b", stream::iter(vec![1, 2]));
282 map.insert("c", stream::iter(vec![1, 2, 3]));
283
284 assert_ready_some!(map.poll_next());
285
286 map.clear();
287
288 assert_ready_none!(map.poll_next());
289 assert!(map.is_empty());
290 }
291
292 #[test]
contains_key_borrow()293 fn contains_key_borrow() {
294 let mut map = StreamMap::new();
295 map.insert("foo".to_string(), pending::<()>());
296
297 assert!(map.contains_key("foo"));
298 }
299
300 #[test]
one_ready_many_none()301 fn one_ready_many_none() {
302 // Run a few times because of randomness
303 for _ in 0..100 {
304 let mut map = task::spawn(StreamMap::new());
305
306 map.insert(0, pin_box(stream::empty()));
307 map.insert(1, pin_box(stream::empty()));
308 map.insert(2, pin_box(stream::once("hello")));
309 map.insert(3, pin_box(stream::pending()));
310
311 let v = assert_ready_some!(map.poll_next());
312 assert_eq!(v, (2, "hello"));
313 }
314 }
315
316 proptest::proptest! {
317 #[test]
318 fn fuzz_pending_complete_mix(kinds: Vec<bool>) {
319 use std::task::{Context, Poll};
320
321 struct DidPoll<T> {
322 did_poll: bool,
323 inner: T,
324 }
325
326 impl<T: Stream + Unpin> Stream for DidPoll<T> {
327 type Item = T::Item;
328
329 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
330 -> Poll<Option<T::Item>>
331 {
332 self.did_poll = true;
333 Pin::new(&mut self.inner).poll_next(cx)
334 }
335 }
336
337 for _ in 0..10 {
338 let mut map = task::spawn(StreamMap::new());
339 let mut expect = 0;
340
341 for (i, &is_empty) in kinds.iter().enumerate() {
342 let inner = if is_empty {
343 pin_box(stream::empty::<()>())
344 } else {
345 expect += 1;
346 pin_box(stream::pending::<()>())
347 };
348
349 let stream = DidPoll {
350 did_poll: false,
351 inner,
352 };
353
354 map.insert(i, stream);
355 }
356
357 if expect == 0 {
358 assert_ready_none!(map.poll_next());
359 } else {
360 assert_pending!(map.poll_next());
361
362 assert_eq!(expect, map.values().count());
363
364 for stream in map.values() {
365 assert!(stream.did_poll);
366 }
367 }
368 }
369 }
370 }
371
pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>>372 fn pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>> {
373 Box::pin(s)
374 }
375