1 use futures::channel::oneshot;
2 use futures::executor::{block_on, block_on_stream};
3 use futures::future::{self, join, Future, FutureExt};
4 use futures::stream::{FusedStream, FuturesUnordered, StreamExt};
5 use futures::task::{Context, Poll};
6 use futures_test::future::FutureTestExt;
7 use futures_test::task::noop_context;
8 use futures_test::{assert_stream_done, assert_stream_next, assert_stream_pending};
9 use std::iter::FromIterator;
10 use std::pin::Pin;
11 use std::sync::atomic::{AtomicBool, Ordering};
12
13 #[test]
is_terminated()14 fn is_terminated() {
15 let mut cx = noop_context();
16 let mut tasks = FuturesUnordered::new();
17
18 assert_eq!(tasks.is_terminated(), false);
19 assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
20 assert_eq!(tasks.is_terminated(), true);
21
22 // Test that the sentinel value doesn't leak
23 assert_eq!(tasks.is_empty(), true);
24 assert_eq!(tasks.len(), 0);
25 assert_eq!(tasks.iter_mut().len(), 0);
26
27 tasks.push(future::ready(1));
28
29 assert_eq!(tasks.is_empty(), false);
30 assert_eq!(tasks.len(), 1);
31 assert_eq!(tasks.iter_mut().len(), 1);
32
33 assert_eq!(tasks.is_terminated(), false);
34 assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(Some(1)));
35 assert_eq!(tasks.is_terminated(), false);
36 assert_eq!(tasks.poll_next_unpin(&mut cx), Poll::Ready(None));
37 assert_eq!(tasks.is_terminated(), true);
38 }
39
40 #[test]
works_1()41 fn works_1() {
42 let (a_tx, a_rx) = oneshot::channel::<i32>();
43 let (b_tx, b_rx) = oneshot::channel::<i32>();
44 let (c_tx, c_rx) = oneshot::channel::<i32>();
45
46 let mut iter =
47 block_on_stream(vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>());
48
49 b_tx.send(99).unwrap();
50 assert_eq!(Some(Ok(99)), iter.next());
51
52 a_tx.send(33).unwrap();
53 c_tx.send(33).unwrap();
54 assert_eq!(Some(Ok(33)), iter.next());
55 assert_eq!(Some(Ok(33)), iter.next());
56 assert_eq!(None, iter.next());
57 }
58
59 #[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
60 #[test]
works_2()61 fn works_2() {
62 let (a_tx, a_rx) = oneshot::channel::<i32>();
63 let (b_tx, b_rx) = oneshot::channel::<i32>();
64 let (c_tx, c_rx) = oneshot::channel::<i32>();
65
66 let mut stream = vec![a_rx.boxed(), join(b_rx, c_rx).map(|(a, b)| Ok(a? + b?)).boxed()]
67 .into_iter()
68 .collect::<FuturesUnordered<_>>();
69
70 a_tx.send(9).unwrap();
71 b_tx.send(10).unwrap();
72
73 let mut cx = noop_context();
74 assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(9))));
75 c_tx.send(20).unwrap();
76 assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(30))));
77 assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(None));
78 }
79
80 #[test]
from_iterator()81 fn from_iterator() {
82 let stream = vec![future::ready::<i32>(1), future::ready::<i32>(2), future::ready::<i32>(3)]
83 .into_iter()
84 .collect::<FuturesUnordered<_>>();
85 assert_eq!(stream.len(), 3);
86 assert_eq!(block_on(stream.collect::<Vec<_>>()), vec![1, 2, 3]);
87 }
88
89 #[cfg_attr(miri, ignore)] // https://github.com/rust-lang/miri/issues/1038
90 #[test]
finished_future()91 fn finished_future() {
92 let (_a_tx, a_rx) = oneshot::channel::<i32>();
93 let (b_tx, b_rx) = oneshot::channel::<i32>();
94 let (c_tx, c_rx) = oneshot::channel::<i32>();
95
96 let mut stream = vec![
97 Box::new(a_rx) as Box<dyn Future<Output = Result<_, _>> + Unpin>,
98 Box::new(future::select(b_rx, c_rx).map(|e| e.factor_first().0)) as _,
99 ]
100 .into_iter()
101 .collect::<FuturesUnordered<_>>();
102
103 let cx = &mut noop_context();
104 for _ in 0..10 {
105 assert!(stream.poll_next_unpin(cx).is_pending());
106 }
107
108 b_tx.send(12).unwrap();
109 c_tx.send(3).unwrap();
110 assert!(stream.poll_next_unpin(cx).is_ready());
111 assert!(stream.poll_next_unpin(cx).is_pending());
112 assert!(stream.poll_next_unpin(cx).is_pending());
113 }
114
115 #[test]
iter_mut_cancel()116 fn iter_mut_cancel() {
117 let (a_tx, a_rx) = oneshot::channel::<i32>();
118 let (b_tx, b_rx) = oneshot::channel::<i32>();
119 let (c_tx, c_rx) = oneshot::channel::<i32>();
120
121 let mut stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>();
122
123 for rx in stream.iter_mut() {
124 rx.close();
125 }
126
127 let mut iter = block_on_stream(stream);
128
129 assert!(a_tx.is_canceled());
130 assert!(b_tx.is_canceled());
131 assert!(c_tx.is_canceled());
132
133 assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
134 assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
135 assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
136 assert_eq!(iter.next(), None);
137 }
138
139 #[test]
iter_mut_len()140 fn iter_mut_len() {
141 let mut stream =
142 vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()]
143 .into_iter()
144 .collect::<FuturesUnordered<_>>();
145
146 let mut iter_mut = stream.iter_mut();
147 assert_eq!(iter_mut.len(), 3);
148 assert!(iter_mut.next().is_some());
149 assert_eq!(iter_mut.len(), 2);
150 assert!(iter_mut.next().is_some());
151 assert_eq!(iter_mut.len(), 1);
152 assert!(iter_mut.next().is_some());
153 assert_eq!(iter_mut.len(), 0);
154 assert!(iter_mut.next().is_none());
155 }
156
157 #[test]
iter_cancel()158 fn iter_cancel() {
159 struct AtomicCancel<F> {
160 future: F,
161 cancel: AtomicBool,
162 }
163
164 impl<F: Future + Unpin> Future for AtomicCancel<F> {
165 type Output = Option<<F as Future>::Output>;
166
167 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
168 if self.cancel.load(Ordering::Relaxed) {
169 Poll::Ready(None)
170 } else {
171 self.future.poll_unpin(cx).map(Some)
172 }
173 }
174 }
175
176 impl<F: Future + Unpin> AtomicCancel<F> {
177 fn new(future: F) -> Self {
178 Self { future, cancel: AtomicBool::new(false) }
179 }
180 }
181
182 let stream = vec![
183 AtomicCancel::new(future::pending::<()>()),
184 AtomicCancel::new(future::pending::<()>()),
185 AtomicCancel::new(future::pending::<()>()),
186 ]
187 .into_iter()
188 .collect::<FuturesUnordered<_>>();
189
190 for f in stream.iter() {
191 f.cancel.store(true, Ordering::Relaxed);
192 }
193
194 let mut iter = block_on_stream(stream);
195
196 assert_eq!(iter.next(), Some(None));
197 assert_eq!(iter.next(), Some(None));
198 assert_eq!(iter.next(), Some(None));
199 assert_eq!(iter.next(), None);
200 }
201
202 #[test]
iter_len()203 fn iter_len() {
204 let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()]
205 .into_iter()
206 .collect::<FuturesUnordered<_>>();
207
208 let mut iter = stream.iter();
209 assert_eq!(iter.len(), 3);
210 assert!(iter.next().is_some());
211 assert_eq!(iter.len(), 2);
212 assert!(iter.next().is_some());
213 assert_eq!(iter.len(), 1);
214 assert!(iter.next().is_some());
215 assert_eq!(iter.len(), 0);
216 assert!(iter.next().is_none());
217 }
218
219 #[test]
into_iter_cancel()220 fn into_iter_cancel() {
221 let (a_tx, a_rx) = oneshot::channel::<i32>();
222 let (b_tx, b_rx) = oneshot::channel::<i32>();
223 let (c_tx, c_rx) = oneshot::channel::<i32>();
224
225 let stream = vec![a_rx, b_rx, c_rx].into_iter().collect::<FuturesUnordered<_>>();
226
227 let stream = stream
228 .into_iter()
229 .map(|mut rx| {
230 rx.close();
231 rx
232 })
233 .collect::<FuturesUnordered<_>>();
234
235 let mut iter = block_on_stream(stream);
236
237 assert!(a_tx.is_canceled());
238 assert!(b_tx.is_canceled());
239 assert!(c_tx.is_canceled());
240
241 assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
242 assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
243 assert_eq!(iter.next(), Some(Err(futures::channel::oneshot::Canceled)));
244 assert_eq!(iter.next(), None);
245 }
246
247 #[test]
into_iter_len()248 fn into_iter_len() {
249 let stream = vec![future::pending::<()>(), future::pending::<()>(), future::pending::<()>()]
250 .into_iter()
251 .collect::<FuturesUnordered<_>>();
252
253 let mut into_iter = stream.into_iter();
254 assert_eq!(into_iter.len(), 3);
255 assert!(into_iter.next().is_some());
256 assert_eq!(into_iter.len(), 2);
257 assert!(into_iter.next().is_some());
258 assert_eq!(into_iter.len(), 1);
259 assert!(into_iter.next().is_some());
260 assert_eq!(into_iter.len(), 0);
261 assert!(into_iter.next().is_none());
262 }
263
264 #[test]
futures_not_moved_after_poll()265 fn futures_not_moved_after_poll() {
266 // Future that will be ready after being polled twice,
267 // asserting that it does not move.
268 let fut = future::ready(()).pending_once().assert_unmoved();
269 let mut stream = vec![fut; 3].into_iter().collect::<FuturesUnordered<_>>();
270 assert_stream_pending!(stream);
271 assert_stream_next!(stream, ());
272 assert_stream_next!(stream, ());
273 assert_stream_next!(stream, ());
274 assert_stream_done!(stream);
275 }
276
277 #[test]
len_valid_during_out_of_order_completion()278 fn len_valid_during_out_of_order_completion() {
279 // Complete futures out-of-order and add new futures afterwards to ensure
280 // length values remain correct.
281 let (a_tx, a_rx) = oneshot::channel::<i32>();
282 let (b_tx, b_rx) = oneshot::channel::<i32>();
283 let (c_tx, c_rx) = oneshot::channel::<i32>();
284 let (d_tx, d_rx) = oneshot::channel::<i32>();
285
286 let mut cx = noop_context();
287 let mut stream = FuturesUnordered::new();
288 assert_eq!(stream.len(), 0);
289
290 stream.push(a_rx);
291 assert_eq!(stream.len(), 1);
292 stream.push(b_rx);
293 assert_eq!(stream.len(), 2);
294 stream.push(c_rx);
295 assert_eq!(stream.len(), 3);
296
297 b_tx.send(4).unwrap();
298 assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(4))));
299 assert_eq!(stream.len(), 2);
300
301 stream.push(d_rx);
302 assert_eq!(stream.len(), 3);
303
304 c_tx.send(5).unwrap();
305 assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(5))));
306 assert_eq!(stream.len(), 2);
307
308 d_tx.send(6).unwrap();
309 assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(6))));
310 assert_eq!(stream.len(), 1);
311
312 a_tx.send(7).unwrap();
313 assert_eq!(stream.poll_next_unpin(&mut cx), Poll::Ready(Some(Ok(7))));
314 assert_eq!(stream.len(), 0);
315 }
316
317 #[test]
polled_only_once_at_most_per_iteration()318 fn polled_only_once_at_most_per_iteration() {
319 #[derive(Debug, Clone, Copy, Default)]
320 struct F {
321 polled: bool,
322 }
323
324 impl Future for F {
325 type Output = ();
326
327 fn poll(mut self: Pin<&mut Self>, _: &mut Context) -> Poll<Self::Output> {
328 if self.polled {
329 panic!("polled twice")
330 } else {
331 self.polled = true;
332 Poll::Pending
333 }
334 }
335 }
336
337 let cx = &mut noop_context();
338
339 let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 10]);
340 assert!(tasks.poll_next_unpin(cx).is_pending());
341 assert_eq!(10, tasks.iter().filter(|f| f.polled).count());
342
343 let mut tasks = FuturesUnordered::from_iter(vec![F::default(); 33]);
344 assert!(tasks.poll_next_unpin(cx).is_pending());
345 assert_eq!(33, tasks.iter().filter(|f| f.polled).count());
346
347 let mut tasks = FuturesUnordered::<F>::new();
348 assert_eq!(Poll::Ready(None), tasks.poll_next_unpin(cx));
349 }
350
351 #[test]
clear()352 fn clear() {
353 let mut tasks = FuturesUnordered::from_iter(vec![future::ready(1), future::ready(2)]);
354
355 assert_eq!(block_on(tasks.next()), Some(1));
356 assert!(!tasks.is_empty());
357
358 tasks.clear();
359 assert!(tasks.is_empty());
360
361 tasks.push(future::ready(3));
362 assert!(!tasks.is_empty());
363
364 tasks.clear();
365 assert!(tasks.is_empty());
366
367 assert_eq!(block_on(tasks.next()), None);
368 assert!(tasks.is_terminated());
369 tasks.clear();
370 assert!(!tasks.is_terminated());
371 }
372