1 use crate::stream::{FuturesUnordered, StreamExt};
2 use alloc::collections::binary_heap::{BinaryHeap, PeekMut};
3 use core::cmp::Ordering;
4 use core::fmt::{self, Debug};
5 use core::iter::FromIterator;
6 use core::pin::Pin;
7 use futures_core::future::Future;
8 use futures_core::ready;
9 use futures_core::stream::Stream;
10 use futures_core::{
11     task::{Context, Poll},
12     FusedStream,
13 };
14 use pin_project_lite::pin_project;
15 
16 pin_project! {
17     #[must_use = "futures do nothing unless you `.await` or poll them"]
18     #[derive(Debug)]
19     struct OrderWrapper<T> {
20         #[pin]
21         data: T, // A future or a future's output
22         index: usize,
23     }
24 }
25 
26 impl<T> PartialEq for OrderWrapper<T> {
eq(&self, other: &Self) -> bool27     fn eq(&self, other: &Self) -> bool {
28         self.index == other.index
29     }
30 }
31 
32 impl<T> Eq for OrderWrapper<T> {}
33 
34 impl<T> PartialOrd for OrderWrapper<T> {
partial_cmp(&self, other: &Self) -> Option<Ordering>35     fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
36         Some(self.cmp(other))
37     }
38 }
39 
40 impl<T> Ord for OrderWrapper<T> {
cmp(&self, other: &Self) -> Ordering41     fn cmp(&self, other: &Self) -> Ordering {
42         // BinaryHeap is a max heap, so compare backwards here.
43         other.index.cmp(&self.index)
44     }
45 }
46 
47 impl<T> Future for OrderWrapper<T>
48 where
49     T: Future,
50 {
51     type Output = OrderWrapper<T::Output>;
52 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>53     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
54         let index = self.index;
55         self.project().data.poll(cx).map(|output| OrderWrapper { data: output, index })
56     }
57 }
58 
59 /// An unbounded queue of futures.
60 ///
61 /// This "combinator" is similar to `FuturesUnordered`, but it imposes an order
62 /// on top of the set of futures. While futures in the set will race to
63 /// completion in parallel, results will only be returned in the order their
64 /// originating futures were added to the queue.
65 ///
66 /// Futures are pushed into this queue and their realized values are yielded in
67 /// order. This structure is optimized to manage a large number of futures.
68 /// Futures managed by `FuturesOrdered` will only be polled when they generate
69 /// notifications. This reduces the required amount of work needed to coordinate
70 /// large numbers of futures.
71 ///
72 /// When a `FuturesOrdered` is first created, it does not contain any futures.
73 /// Calling `poll` in this state will result in `Poll::Ready(None))` to be
74 /// returned. Futures are submitted to the queue using `push`; however, the
75 /// future will **not** be polled at this point. `FuturesOrdered` will only
76 /// poll managed futures when `FuturesOrdered::poll` is called. As such, it
77 /// is important to call `poll` after pushing new futures.
78 ///
79 /// If `FuturesOrdered::poll` returns `Poll::Ready(None)` this means that
80 /// the queue is currently not managing any futures. A future may be submitted
81 /// to the queue at a later time. At that point, a call to
82 /// `FuturesOrdered::poll` will either return the future's resolved value
83 /// **or** `Poll::Pending` if the future has not yet completed. When
84 /// multiple futures are submitted to the queue, `FuturesOrdered::poll` will
85 /// return `Poll::Pending` until the first future completes, even if
86 /// some of the later futures have already completed.
87 ///
88 /// Note that you can create a ready-made `FuturesOrdered` via the
89 /// [`collect`](Iterator::collect) method, or you can start with an empty queue
90 /// with the `FuturesOrdered::new` constructor.
91 ///
92 /// This type is only available when the `std` or `alloc` feature of this
93 /// library is activated, and it is activated by default.
94 #[must_use = "streams do nothing unless polled"]
95 pub struct FuturesOrdered<T: Future> {
96     in_progress_queue: FuturesUnordered<OrderWrapper<T>>,
97     queued_outputs: BinaryHeap<OrderWrapper<T::Output>>,
98     next_incoming_index: usize,
99     next_outgoing_index: usize,
100 }
101 
102 impl<T: Future> Unpin for FuturesOrdered<T> {}
103 
104 impl<Fut: Future> FuturesOrdered<Fut> {
105     /// Constructs a new, empty `FuturesOrdered`
106     ///
107     /// The returned `FuturesOrdered` does not contain any futures and, in this
108     /// state, `FuturesOrdered::poll_next` will return `Poll::Ready(None)`.
new() -> Self109     pub fn new() -> Self {
110         Self {
111             in_progress_queue: FuturesUnordered::new(),
112             queued_outputs: BinaryHeap::new(),
113             next_incoming_index: 0,
114             next_outgoing_index: 0,
115         }
116     }
117 
118     /// Returns the number of futures contained in the queue.
119     ///
120     /// This represents the total number of in-flight futures, both
121     /// those currently processing and those that have completed but
122     /// which are waiting for earlier futures to complete.
len(&self) -> usize123     pub fn len(&self) -> usize {
124         self.in_progress_queue.len() + self.queued_outputs.len()
125     }
126 
127     /// Returns `true` if the queue contains no futures
is_empty(&self) -> bool128     pub fn is_empty(&self) -> bool {
129         self.in_progress_queue.is_empty() && self.queued_outputs.is_empty()
130     }
131 
132     /// Push a future into the queue.
133     ///
134     /// This function submits the given future to the internal set for managing.
135     /// This function will not call `poll` on the submitted future. The caller
136     /// must ensure that `FuturesOrdered::poll` is called in order to receive
137     /// task notifications.
push(&mut self, future: Fut)138     pub fn push(&mut self, future: Fut) {
139         let wrapped = OrderWrapper { data: future, index: self.next_incoming_index };
140         self.next_incoming_index += 1;
141         self.in_progress_queue.push(wrapped);
142     }
143 }
144 
145 impl<Fut: Future> Default for FuturesOrdered<Fut> {
default() -> Self146     fn default() -> Self {
147         Self::new()
148     }
149 }
150 
151 impl<Fut: Future> Stream for FuturesOrdered<Fut> {
152     type Item = Fut::Output;
153 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>154     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
155         let this = &mut *self;
156 
157         // Check to see if we've already received the next value
158         if let Some(next_output) = this.queued_outputs.peek_mut() {
159             if next_output.index == this.next_outgoing_index {
160                 this.next_outgoing_index += 1;
161                 return Poll::Ready(Some(PeekMut::pop(next_output).data));
162             }
163         }
164 
165         loop {
166             match ready!(this.in_progress_queue.poll_next_unpin(cx)) {
167                 Some(output) => {
168                     if output.index == this.next_outgoing_index {
169                         this.next_outgoing_index += 1;
170                         return Poll::Ready(Some(output.data));
171                     } else {
172                         this.queued_outputs.push(output)
173                     }
174                 }
175                 None => return Poll::Ready(None),
176             }
177         }
178     }
179 
size_hint(&self) -> (usize, Option<usize>)180     fn size_hint(&self) -> (usize, Option<usize>) {
181         let len = self.len();
182         (len, Some(len))
183     }
184 }
185 
186 impl<Fut: Future> Debug for FuturesOrdered<Fut> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result187     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
188         write!(f, "FuturesOrdered {{ ... }}")
189     }
190 }
191 
192 impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> {
from_iter<T>(iter: T) -> Self where T: IntoIterator<Item = Fut>,193     fn from_iter<T>(iter: T) -> Self
194     where
195         T: IntoIterator<Item = Fut>,
196     {
197         let acc = Self::new();
198         iter.into_iter().fold(acc, |mut acc, item| {
199             acc.push(item);
200             acc
201         })
202     }
203 }
204 
205 impl<Fut: Future> FusedStream for FuturesOrdered<Fut> {
is_terminated(&self) -> bool206     fn is_terminated(&self) -> bool {
207         self.in_progress_queue.is_terminated() && self.queued_outputs.is_empty()
208     }
209 }
210 
211 impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> {
extend<I>(&mut self, iter: I) where I: IntoIterator<Item = Fut>,212     fn extend<I>(&mut self, iter: I)
213     where
214         I: IntoIterator<Item = Fut>,
215     {
216         for item in iter {
217             self.push(item);
218         }
219     }
220 }
221