1 use futures::channel::oneshot;
2 use futures::executor::{block_on, block_on_stream};
3 use futures::future::{self, join, Future, FutureExt};
4 use futures::stream::{FusedStream, FuturesUnordered, StreamExt};
5 use futures::task::{Context, Poll};
6 use futures_test::future::FutureTestExt;
7 use futures_test::task::noop_context;
8 use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending};
9 use std::iter::FromIterator;
10 use std::pin::Pin;
11 use std::sync::atomic::{AtomicBool, Ordering};
12 
13 #[test]
is_terminated()14 fn is_terminated() {
15     let mut cx = noop_context();
16     let mut tasks = FuturesUnordered::new();
17 
18     assert_eq!(tasks.is_terminated(), false);
19     assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
20     assert_eq!(tasks.is_terminated(), true);
21 
22     // Test that the sentinel value doesn't leak
23     assert_eq!(tasks.is_empty(), true);
24     assert_eq!(tasks.len(), 0);
25     assert_eq!(tasks.iter_mut().len(), 0);
26 
27     tasks.push(future::ready(1));
28 
29     assert_eq!(tasks.is_empty(), false);
30     assert_eq!(tasks.len(), 1);
31     assert_eq!(tasks.iter_mut().len(), 1);
32 
33     assert_eq!(tasks.is_terminated(), false);
34     assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1)));
35     assert_eq!(tasks.is_terminated(), false);
36     assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
37     assert_eq!(tasks.is_terminated(), true);
38 }
39 
40 #[test]
works_1()41 fn works_1() {
42     let (a_tx, a_rx) = oneshot::channel::<i32>();
43     let (b_tx, b_rx) = oneshot::channel::<i32>();
44     let (c_tx, c_rx) = oneshot::channel::<i32>();
45 
46     let mut iter =
47         block_on_stream(vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>());
48 
49     b_tx.send(99).unwrap();
50     assert_eq!(Some(Ok(99)), iter.next());
51 
52     a_tx.send(33).unwrap();
53     c_tx.send(33).unwrap();
54     assert_eq!(Some(Ok(33)), iter.next());
55     assert_eq!(Some(Ok(33)), iter.next());
56     assert_eq!(None, iter.next());
57 }
58 
59 #[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
60 #[test]
works_2()61 fn works_2() {
62     let (a_tx, a_rx) = oneshot::channel::<i32>();
63     let (b_tx, b_rx) = oneshot::channel::<i32>();
64     let (c_tx, c_rx) = oneshot::channel::<i32>();
65 
66     let mut stream = vec![a_rx.boxed(), join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed()]
67         .into_iter()
68         .collect::<FuturesUnordered<_>>();
69 
70     a_tx.send(9).unwrap();
71     b_tx.send(10).unwrap();
72 
73     let mut cx = noop_context();
74     assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(9))));
75     c_tx.send(20).unwrap();
76     assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(30))));
77     assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(None));
78 }
79 
80 #[test]
from_iterator()81 fn from_iterator() {
82     let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)]
83         .into_iter()
84         .collect::<FuturesUnordered<_>>();
85     assert_eq!(stream.len(), 3);
86     assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
87 }
88 
89 #[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
90 #[test]
finished_future()91 fn finished_future() {
92     let (_a_tx, a_rx) = oneshot::channel::<i32>();
93     let (b_tx, b_rx) = oneshot::channel::<i32>();
94     let (c_tx, c_rx) = oneshot::channel::<i32>();
95 
96     let mut stream = vec![
97         Box::new(a_rx) as Box<dyn Future<Output = Result<_, _>> + Unpin>,
98         Box::new(future::select(b_rx, c_rx).map(|e| e.factor_first().0)) as _,
99     ]
100     .into_iter()
101     .collect::<FuturesUnordered<_>>();
102 
103     let cx = &mut noop_context();
104     for _ in 0..10 {
105         assert!(stream.poll_next_unpin(cx).is_pending());
106     }
107 
108     b_tx.send(12).unwrap();
109     c_tx.send(3).unwrap();
110     assert!(stream.poll_next_unpin(cx).is_ready());
111     assert!(stream.poll_next_unpin(cx).is_pending());
112     assert!(stream.poll_next_unpin(cx).is_pending());
113 }
114 
115 #[test]
iter_mut_cancel()116 fn iter_mut_cancel() {
117     let (a_tx, a_rx) = oneshot::channel::<i32>();
118     let (b_tx, b_rx) = oneshot::channel::<i32>();
119     let (c_tx, c_rx) = oneshot::channel::<i32>();
120 
121     let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>();
122 
123     for rx in stream.iter_mut() {
124         rx.close();
125     }
126 
127     let mut iter = block_on_stream(stream);
128 
129     assert!(a_tx.is_canceled());
130     assert!(b_tx.is_canceled());
131     assert!(c_tx.is_canceled());
132 
133     assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
134     assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
135     assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
136     assert_eq!(iter.next(), None);
137 }
138 
139 #[test]
iter_mut_len()140 fn iter_mut_len() {
141     let mut stream =
142         vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()]
143             .into_iter()
144             .collect::<FuturesUnordered<_>>();
145 
146     let mut iter_mut = stream.iter_mut();
147     assert_eq!(iter_mut.len(), 3);
148     assert!(iter_mut.next().is_some());
149     assert_eq!(iter_mut.len(), 2);
150     assert!(iter_mut.next().is_some());
151     assert_eq!(iter_mut.len(), 1);
152     assert!(iter_mut.next().is_some());
153     assert_eq!(iter_mut.len(), 0);
154     assert!(iter_mut.next().is_none());
155 }
156 
157 #[test]
iter_cancel()158 fn iter_cancel() {
159     struct AtomicCancel<F> {
160         future: F,
161         cancel: AtomicBool,
162     }
163 
164     impl<F: Future + Unpin> Future for AtomicCancel<F> {
165         type Output = Option<<F as Future>::Output>;
166 
167         fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
168             if self.cancel.load(Ordering::Relaxed) {
169                 Poll::Ready(None)
170             } else {
171                 self.future.poll_unpin(cx).map(Some)
172             }
173         }
174     }
175 
176     impl<F: Future + Unpin> AtomicCancel<F> {
177         fn new(future: F) -> Self {
178             Self { future, cancel: AtomicBool::new(false) }
179         }
180     }
181 
182     let stream = vec![
183         AtomicCancel::new(future::pending::<()>()),
184         AtomicCancel::new(future::pending::<()>()),
185         AtomicCancel::new(future::pending::<()>()),
186     ]
187     .into_iter()
188     .collect::<FuturesUnordered<_>>();
189 
190     for f in stream.iter() {
191         f.cancel.store(true, Ordering::Relaxed);
192     }
193 
194     let mut iter = block_on_stream(stream);
195 
196     assert_eq!(iter.next(), Some(None));
197     assert_eq!(iter.next(), Some(None));
198     assert_eq!(iter.next(), Some(None));
199     assert_eq!(iter.next(), None);
200 }
201 
202 #[test]
iter_len()203 fn iter_len() {
204     let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()]
205         .into_iter()
206         .collect::<FuturesUnordered<_>>();
207 
208     let mut iter = stream.iter();
209     assert_eq!(iter.len(), 3);
210     assert!(iter.next().is_some());
211     assert_eq!(iter.len(), 2);
212     assert!(iter.next().is_some());
213     assert_eq!(iter.len(), 1);
214     assert!(iter.next().is_some());
215     assert_eq!(iter.len(), 0);
216     assert!(iter.next().is_none());
217 }
218 
219 #[test]
into_iter_cancel()220 fn into_iter_cancel() {
221     let (a_tx, a_rx) = oneshot::channel::<i32>();
222     let (b_tx, b_rx) = oneshot::channel::<i32>();
223     let (c_tx, c_rx) = oneshot::channel::<i32>();
224 
225     let stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>();
226 
227     let stream = stream
228         .into_iter()
229         .map(|mut rx| {
230             rx.close();
231             rx
232         })
233         .collect::<FuturesUnordered<_>>();
234 
235     let mut iter = block_on_stream(stream);
236 
237     assert!(a_tx.is_canceled());
238     assert!(b_tx.is_canceled());
239     assert!(c_tx.is_canceled());
240 
241     assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
242     assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
243     assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
244     assert_eq!(iter.next(), None);
245 }
246 
247 #[test]
into_iter_len()248 fn into_iter_len() {
249     let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()]
250         .into_iter()
251         .collect::<FuturesUnordered<_>>();
252 
253     let mut into_iter = stream.into_iter();
254     assert_eq!(into_iter.len(), 3);
255     assert!(into_iter.next().is_some());
256     assert_eq!(into_iter.len(), 2);
257     assert!(into_iter.next().is_some());
258     assert_eq!(into_iter.len(), 1);
259     assert!(into_iter.next().is_some());
260     assert_eq!(into_iter.len(), 0);
261     assert!(into_iter.next().is_none());
262 }
263 
264 #[test]
futures_not_moved_after_poll()265 fn futures_not_moved_after_poll() {
266     // Future that will be ready after being polled twice,
267     // asserting that it does not move.
268     let fut = future::ready(()).pending_once().assert_unmoved();
269     let mut stream = vec![fut; 3].into_iter().collect::<FuturesUnordered<_>>();
270     assert_stream_pending!(stream);
271     assert_stream_next!(stream, ());
272     assert_stream_next!(stream, ());
273     assert_stream_next!(stream, ());
274     assert_stream_done!(stream);
275 }
276 
277 #[test]
len_valid_during_out_of_order_completion()278 fn len_valid_during_out_of_order_completion() {
279     // Complete futures out-of-order and add new futures afterwards to ensure
280     // length values remain correct.
281     let (a_tx, a_rx) = oneshot::channel::<i32>();
282     let (b_tx, b_rx) = oneshot::channel::<i32>();
283     let (c_tx, c_rx) = oneshot::channel::<i32>();
284     let (d_tx, d_rx) = oneshot::channel::<i32>();
285 
286     let mut cx = noop_context();
287     let mut stream = FuturesUnordered::new();
288     assert_eq!(stream.len(), 0);
289 
290     stream.push(a_rx);
291     assert_eq!(stream.len(), 1);
292     stream.push(b_rx);
293     assert_eq!(stream.len(), 2);
294     stream.push(c_rx);
295     assert_eq!(stream.len(), 3);
296 
297     b_tx.send(4).unwrap();
298     assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(4))));
299     assert_eq!(stream.len(), 2);
300 
301     stream.push(d_rx);
302     assert_eq!(stream.len(), 3);
303 
304     c_tx.send(5).unwrap();
305     assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(5))));
306     assert_eq!(stream.len(), 2);
307 
308     d_tx.send(6).unwrap();
309     assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(6))));
310     assert_eq!(stream.len(), 1);
311 
312     a_tx.send(7).unwrap();
313     assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(7))));
314     assert_eq!(stream.len(), 0);
315 }
316 
317 #[test]
polled_only_once_at_most_per_iteration()318 fn polled_only_once_at_most_per_iteration() {
319     #[derive(Debug, Clone, Copy, Default)]
320     struct F {
321         polled: bool,
322     }
323 
324     impl Future for F {
325         type Output = ();
326 
327         fn poll(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Self::Output> {
328             if self.polled {
329                 panic!("polled twice")
330             } else {
331                 self.polled = true;
332                 Poll::Pending
333             }
334         }
335     }
336 
337     let cx = &mut noop_context();
338 
339     let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 10]);
340     assert!(tasks.poll_next_unpin(cx).is_pending());
341     assert_eq!(10, tasks.iter().filter(|f| f.polled).count());
342 
343     let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]);
344     assert!(tasks.poll_next_unpin(cx).is_pending());
345     assert_eq!(33, tasks.iter().filter(|f| f.polled).count());
346 
347     let mut tasks = FuturesUnordered::<F>::new();
348     assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));
349 }
350 
351 #[test]
clear()352 fn clear() {
353     let mut tasks = FuturesUnordered::from_iter(vec![future::ready(1), future::ready(2)]);
354 
355     assert_eq!(block_on(tasks.next()), Some(1));
356     assert!(!tasks.is_empty());
357 
358     tasks.clear();
359     assert!(tasks.is_empty());
360 
361     tasks.push(future::ready(3));
362     assert!(!tasks.is_empty());
363 
364     tasks.clear();
365     assert!(tasks.is_empty());
366 
367     assert_eq!(block_on(tasks.next()), None);
368     assert!(tasks.is_terminated());
369     tasks.clear();
370     assert!(!tasks.is_terminated());
371 }
372