1 use tokio::stream::{self, pending, Stream, StreamExt, StreamMap};
2 use tokio::sync::mpsc;
3 use tokio_test::{assert_ok, assert_pending, assert_ready, task};
4 
5 use std::pin::Pin;
6 
7 macro_rules! assert_ready_some {
8     ($($t:tt)*) => {
9         match assert_ready!($($t)*) {
10             Some(v) => v,
11             None => panic!("expected `Some`, got `None`"),
12         }
13     };
14 }
15 
16 macro_rules! assert_ready_none {
17     ($($t:tt)*) => {
18         match assert_ready!($($t)*) {
19             None => {}
20             Some(v) => panic!("expected `None`, got `Some({:?})`", v),
21         }
22     };
23 }
24 
25 #[tokio::test]
empty()26 async fn empty() {
27     let mut map = StreamMap::<&str, stream::Pending<()>>::new();
28 
29     assert_eq!(map.len(), 0);
30     assert!(map.is_empty());
31 
32     assert!(map.next().await.is_none());
33     assert!(map.next().await.is_none());
34 
35     assert!(map.remove("foo").is_none());
36 }
37 
38 #[tokio::test]
single_entry()39 async fn single_entry() {
40     let mut map = task::spawn(StreamMap::new());
41     let (tx, rx) = mpsc::unbounded_channel();
42 
43     assert_ready_none!(map.poll_next());
44 
45     assert!(map.insert("foo", rx).is_none());
46     assert!(map.contains_key("foo"));
47     assert!(!map.contains_key("bar"));
48 
49     assert_eq!(map.len(), 1);
50     assert!(!map.is_empty());
51 
52     assert_pending!(map.poll_next());
53 
54     assert_ok!(tx.send(1));
55 
56     assert!(map.is_woken());
57     let (k, v) = assert_ready_some!(map.poll_next());
58     assert_eq!(k, "foo");
59     assert_eq!(v, 1);
60 
61     assert_pending!(map.poll_next());
62 
63     assert_ok!(tx.send(2));
64 
65     assert!(map.is_woken());
66     let (k, v) = assert_ready_some!(map.poll_next());
67     assert_eq!(k, "foo");
68     assert_eq!(v, 2);
69 
70     assert_pending!(map.poll_next());
71     drop(tx);
72     assert!(map.is_woken());
73     assert_ready_none!(map.poll_next());
74 }
75 
76 #[tokio::test]
multiple_entries()77 async fn multiple_entries() {
78     let mut map = task::spawn(StreamMap::new());
79     let (tx1, rx1) = mpsc::unbounded_channel();
80     let (tx2, rx2) = mpsc::unbounded_channel();
81 
82     map.insert("foo", rx1);
83     map.insert("bar", rx2);
84 
85     assert_pending!(map.poll_next());
86 
87     assert_ok!(tx1.send(1));
88 
89     assert!(map.is_woken());
90     let (k, v) = assert_ready_some!(map.poll_next());
91     assert_eq!(k, "foo");
92     assert_eq!(v, 1);
93 
94     assert_pending!(map.poll_next());
95 
96     assert_ok!(tx2.send(2));
97 
98     assert!(map.is_woken());
99     let (k, v) = assert_ready_some!(map.poll_next());
100     assert_eq!(k, "bar");
101     assert_eq!(v, 2);
102 
103     assert_pending!(map.poll_next());
104 
105     assert_ok!(tx1.send(3));
106     assert_ok!(tx2.send(4));
107 
108     assert!(map.is_woken());
109 
110     // Given the randomization, there is no guarantee what order the values will
111     // be received in.
112     let mut v = (0..2)
113         .map(|_| assert_ready_some!(map.poll_next()))
114         .collect::<Vec<_>>();
115 
116     assert_pending!(map.poll_next());
117 
118     v.sort();
119     assert_eq!(v[0].0, "bar");
120     assert_eq!(v[0].1, 4);
121     assert_eq!(v[1].0, "foo");
122     assert_eq!(v[1].1, 3);
123 
124     drop(tx1);
125     assert!(map.is_woken());
126     assert_pending!(map.poll_next());
127     drop(tx2);
128 
129     assert_ready_none!(map.poll_next());
130 }
131 
132 #[tokio::test]
insert_remove()133 async fn insert_remove() {
134     let mut map = task::spawn(StreamMap::new());
135     let (tx, rx) = mpsc::unbounded_channel();
136 
137     assert_ready_none!(map.poll_next());
138 
139     assert!(map.insert("foo", rx).is_none());
140     let rx = map.remove("foo").unwrap();
141 
142     assert_ok!(tx.send(1));
143 
144     assert!(!map.is_woken());
145     assert_ready_none!(map.poll_next());
146 
147     assert!(map.insert("bar", rx).is_none());
148 
149     let v = assert_ready_some!(map.poll_next());
150     assert_eq!(v.0, "bar");
151     assert_eq!(v.1, 1);
152 
153     assert!(map.remove("bar").is_some());
154     assert_ready_none!(map.poll_next());
155 
156     assert!(map.is_empty());
157     assert_eq!(0, map.len());
158 }
159 
160 #[tokio::test]
replace()161 async fn replace() {
162     let mut map = task::spawn(StreamMap::new());
163     let (tx1, rx1) = mpsc::unbounded_channel();
164     let (tx2, rx2) = mpsc::unbounded_channel();
165 
166     assert!(map.insert("foo", rx1).is_none());
167 
168     assert_pending!(map.poll_next());
169 
170     let _rx1 = map.insert("foo", rx2).unwrap();
171 
172     assert_pending!(map.poll_next());
173 
174     tx1.send(1).unwrap();
175     assert_pending!(map.poll_next());
176 
177     tx2.send(2).unwrap();
178     assert!(map.is_woken());
179     let v = assert_ready_some!(map.poll_next());
180     assert_eq!(v.0, "foo");
181     assert_eq!(v.1, 2);
182 }
183 
184 #[test]
size_hint_with_upper()185 fn size_hint_with_upper() {
186     let mut map = StreamMap::new();
187 
188     map.insert("a", stream::iter(vec![1]));
189     map.insert("b", stream::iter(vec![1, 2]));
190     map.insert("c", stream::iter(vec![1, 2, 3]));
191 
192     assert_eq!(3, map.len());
193     assert!(!map.is_empty());
194 
195     let size_hint = map.size_hint();
196     assert_eq!(size_hint, (6, Some(6)));
197 }
198 
199 #[test]
size_hint_without_upper()200 fn size_hint_without_upper() {
201     let mut map = StreamMap::new();
202 
203     map.insert("a", pin_box(stream::iter(vec![1])));
204     map.insert("b", pin_box(stream::iter(vec![1, 2])));
205     map.insert("c", pin_box(pending()));
206 
207     let size_hint = map.size_hint();
208     assert_eq!(size_hint, (3, None));
209 }
210 
211 #[test]
new_capacity_zero()212 fn new_capacity_zero() {
213     let map = StreamMap::<&str, stream::Pending<()>>::new();
214     assert_eq!(0, map.capacity());
215 
216     let keys = map.keys().collect::<Vec<_>>();
217     assert!(keys.is_empty());
218 }
219 
220 #[test]
with_capacity()221 fn with_capacity() {
222     let map = StreamMap::<&str, stream::Pending<()>>::with_capacity(10);
223     assert!(10 <= map.capacity());
224 
225     let keys = map.keys().collect::<Vec<_>>();
226     assert!(keys.is_empty());
227 }
228 
229 #[test]
iter_keys()230 fn iter_keys() {
231     let mut map = StreamMap::new();
232 
233     map.insert("a", pending::<i32>());
234     map.insert("b", pending());
235     map.insert("c", pending());
236 
237     let mut keys = map.keys().collect::<Vec<_>>();
238     keys.sort();
239 
240     assert_eq!(&keys[..], &[&"a", &"b", &"c"]);
241 }
242 
243 #[test]
iter_values()244 fn iter_values() {
245     let mut map = StreamMap::new();
246 
247     map.insert("a", stream::iter(vec![1]));
248     map.insert("b", stream::iter(vec![1, 2]));
249     map.insert("c", stream::iter(vec![1, 2, 3]));
250 
251     let mut size_hints = map.values().map(|s| s.size_hint().0).collect::<Vec<_>>();
252 
253     size_hints.sort();
254 
255     assert_eq!(&size_hints[..], &[1, 2, 3]);
256 }
257 
258 #[test]
iter_values_mut()259 fn iter_values_mut() {
260     let mut map = StreamMap::new();
261 
262     map.insert("a", stream::iter(vec![1]));
263     map.insert("b", stream::iter(vec![1, 2]));
264     map.insert("c", stream::iter(vec![1, 2, 3]));
265 
266     let mut size_hints = map
267         .values_mut()
268         .map(|s: &mut _| s.size_hint().0)
269         .collect::<Vec<_>>();
270 
271     size_hints.sort();
272 
273     assert_eq!(&size_hints[..], &[1, 2, 3]);
274 }
275 
276 #[test]
clear()277 fn clear() {
278     let mut map = task::spawn(StreamMap::new());
279 
280     map.insert("a", stream::iter(vec![1]));
281     map.insert("b", stream::iter(vec![1, 2]));
282     map.insert("c", stream::iter(vec![1, 2, 3]));
283 
284     assert_ready_some!(map.poll_next());
285 
286     map.clear();
287 
288     assert_ready_none!(map.poll_next());
289     assert!(map.is_empty());
290 }
291 
292 #[test]
contains_key_borrow()293 fn contains_key_borrow() {
294     let mut map = StreamMap::new();
295     map.insert("foo".to_string(), pending::<()>());
296 
297     assert!(map.contains_key("foo"));
298 }
299 
300 #[test]
one_ready_many_none()301 fn one_ready_many_none() {
302     // Run a few times because of randomness
303     for _ in 0..100 {
304         let mut map = task::spawn(StreamMap::new());
305 
306         map.insert(0, pin_box(stream::empty()));
307         map.insert(1, pin_box(stream::empty()));
308         map.insert(2, pin_box(stream::once("hello")));
309         map.insert(3, pin_box(stream::pending()));
310 
311         let v = assert_ready_some!(map.poll_next());
312         assert_eq!(v, (2, "hello"));
313     }
314 }
315 
316 proptest::proptest! {
317     #[test]
318     fn fuzz_pending_complete_mix(kinds: Vec<bool>) {
319         use std::task::{Context, Poll};
320 
321         struct DidPoll<T> {
322             did_poll: bool,
323             inner: T,
324         }
325 
326         impl<T: Stream + Unpin> Stream for DidPoll<T> {
327             type Item = T::Item;
328 
329             fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>)
330                 -> Poll<Option<T::Item>>
331             {
332                 self.did_poll = true;
333                 Pin::new(&mut self.inner).poll_next(cx)
334             }
335         }
336 
337         for _ in 0..10 {
338             let mut map = task::spawn(StreamMap::new());
339             let mut expect = 0;
340 
341             for (i, &is_empty) in kinds.iter().enumerate() {
342                 let inner = if is_empty {
343                     pin_box(stream::empty::<()>())
344                 } else {
345                     expect += 1;
346                     pin_box(stream::pending::<()>())
347                 };
348 
349                 let stream = DidPoll {
350                     did_poll: false,
351                     inner,
352                 };
353 
354                 map.insert(i, stream);
355             }
356 
357             if expect == 0 {
358                 assert_ready_none!(map.poll_next());
359             } else {
360                 assert_pending!(map.poll_next());
361 
362                 assert_eq!(expect, map.values().count());
363 
364                 for stream in map.values() {
365                     assert!(stream.did_poll);
366                 }
367             }
368         }
369     }
370 }
371 
pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>>372 fn pin_box<T: Stream<Item = U> + 'static, U>(s: T) -> Pin<Box<dyn Stream<Item = U>>> {
373     Box::pin(s)
374 }
375