1 use crate::stream::{FuturesUnordered, StreamExt}; 2 use alloc::collections::binary_heap::{BinaryHeap, PeekMut}; 3 use core::cmp::Ordering; 4 use core::fmt::{self, Debug}; 5 use core::iter::FromIterator; 6 use core::pin::Pin; 7 use futures_core::future::Future; 8 use futures_core::ready; 9 use futures_core::stream::Stream; 10 use futures_core::{ 11 task::{Context, Poll}, 12 FusedStream, 13 }; 14 use pin_project_lite::pin_project; 15 16 pin_project! { 17 #[must_use = "futures do nothing unless you `.await` or poll them"] 18 #[derive(Debug)] 19 struct OrderWrapper<T> { 20 #[pin] 21 data: T, // A future or a future's output 22 index: usize, 23 } 24 } 25 26 impl<T> PartialEq for OrderWrapper<T> { eq(&self, other: &Self) -> bool27 fn eq(&self, other: &Self) -> bool { 28 self.index == other.index 29 } 30 } 31 32 impl<T> Eq for OrderWrapper<T> {} 33 34 impl<T> PartialOrd for OrderWrapper<T> { partial_cmp(&self, other: &Self) -> Option<Ordering>35 fn partial_cmp(&self, other: &Self) -> Option<Ordering> { 36 Some(self.cmp(other)) 37 } 38 } 39 40 impl<T> Ord for OrderWrapper<T> { cmp(&self, other: &Self) -> Ordering41 fn cmp(&self, other: &Self) -> Ordering { 42 // BinaryHeap is a max heap, so compare backwards here. 43 other.index.cmp(&self.index) 44 } 45 } 46 47 impl<T> Future for OrderWrapper<T> 48 where 49 T: Future, 50 { 51 type Output = OrderWrapper<T::Output>; 52 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>53 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 54 let index = self.index; 55 self.project().data.poll(cx).map(|output| OrderWrapper { data: output, index }) 56 } 57 } 58 59 /// An unbounded queue of futures. 60 /// 61 /// This "combinator" is similar to `FuturesUnordered`, but it imposes an order 62 /// on top of the set of futures. While futures in the set will race to 63 /// completion in parallel, results will only be returned in the order their 64 /// originating futures were added to the queue. 65 /// 66 /// Futures are pushed into this queue and their realized values are yielded in 67 /// order. This structure is optimized to manage a large number of futures. 68 /// Futures managed by `FuturesOrdered` will only be polled when they generate 69 /// notifications. This reduces the required amount of work needed to coordinate 70 /// large numbers of futures. 71 /// 72 /// When a `FuturesOrdered` is first created, it does not contain any futures. 73 /// Calling `poll` in this state will result in `Poll::Ready(None))` to be 74 /// returned. Futures are submitted to the queue using `push`; however, the 75 /// future will **not** be polled at this point. `FuturesOrdered` will only 76 /// poll managed futures when `FuturesOrdered::poll` is called. As such, it 77 /// is important to call `poll` after pushing new futures. 78 /// 79 /// If `FuturesOrdered::poll` returns `Poll::Ready(None)` this means that 80 /// the queue is currently not managing any futures. A future may be submitted 81 /// to the queue at a later time. At that point, a call to 82 /// `FuturesOrdered::poll` will either return the future's resolved value 83 /// **or** `Poll::Pending` if the future has not yet completed. When 84 /// multiple futures are submitted to the queue, `FuturesOrdered::poll` will 85 /// return `Poll::Pending` until the first future completes, even if 86 /// some of the later futures have already completed. 87 /// 88 /// Note that you can create a ready-made `FuturesOrdered` via the 89 /// [`collect`](Iterator::collect) method, or you can start with an empty queue 90 /// with the `FuturesOrdered::new` constructor. 91 /// 92 /// This type is only available when the `std` or `alloc` feature of this 93 /// library is activated, and it is activated by default. 94 #[must_use = "streams do nothing unless polled"] 95 pub struct FuturesOrdered<T: Future> { 96 in_progress_queue: FuturesUnordered<OrderWrapper<T>>, 97 queued_outputs: BinaryHeap<OrderWrapper<T::Output>>, 98 next_incoming_index: usize, 99 next_outgoing_index: usize, 100 } 101 102 impl<T: Future> Unpin for FuturesOrdered<T> {} 103 104 impl<Fut: Future> FuturesOrdered<Fut> { 105 /// Constructs a new, empty `FuturesOrdered` 106 /// 107 /// The returned `FuturesOrdered` does not contain any futures and, in this 108 /// state, `FuturesOrdered::poll_next` will return `Poll::Ready(None)`. new() -> Self109 pub fn new() -> Self { 110 Self { 111 in_progress_queue: FuturesUnordered::new(), 112 queued_outputs: BinaryHeap::new(), 113 next_incoming_index: 0, 114 next_outgoing_index: 0, 115 } 116 } 117 118 /// Returns the number of futures contained in the queue. 119 /// 120 /// This represents the total number of in-flight futures, both 121 /// those currently processing and those that have completed but 122 /// which are waiting for earlier futures to complete. len(&self) -> usize123 pub fn len(&self) -> usize { 124 self.in_progress_queue.len() + self.queued_outputs.len() 125 } 126 127 /// Returns `true` if the queue contains no futures is_empty(&self) -> bool128 pub fn is_empty(&self) -> bool { 129 self.in_progress_queue.is_empty() && self.queued_outputs.is_empty() 130 } 131 132 /// Push a future into the queue. 133 /// 134 /// This function submits the given future to the internal set for managing. 135 /// This function will not call `poll` on the submitted future. The caller 136 /// must ensure that `FuturesOrdered::poll` is called in order to receive 137 /// task notifications. push(&mut self, future: Fut)138 pub fn push(&mut self, future: Fut) { 139 let wrapped = OrderWrapper { data: future, index: self.next_incoming_index }; 140 self.next_incoming_index += 1; 141 self.in_progress_queue.push(wrapped); 142 } 143 } 144 145 impl<Fut: Future> Default for FuturesOrdered<Fut> { default() -> Self146 fn default() -> Self { 147 Self::new() 148 } 149 } 150 151 impl<Fut: Future> Stream for FuturesOrdered<Fut> { 152 type Item = Fut::Output; 153 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>154 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 155 let this = &mut *self; 156 157 // Check to see if we've already received the next value 158 if let Some(next_output) = this.queued_outputs.peek_mut() { 159 if next_output.index == this.next_outgoing_index { 160 this.next_outgoing_index += 1; 161 return Poll::Ready(Some(PeekMut::pop(next_output).data)); 162 } 163 } 164 165 loop { 166 match ready!(this.in_progress_queue.poll_next_unpin(cx)) { 167 Some(output) => { 168 if output.index == this.next_outgoing_index { 169 this.next_outgoing_index += 1; 170 return Poll::Ready(Some(output.data)); 171 } else { 172 this.queued_outputs.push(output) 173 } 174 } 175 None => return Poll::Ready(None), 176 } 177 } 178 } 179 size_hint(&self) -> (usize, Option<usize>)180 fn size_hint(&self) -> (usize, Option<usize>) { 181 let len = self.len(); 182 (len, Some(len)) 183 } 184 } 185 186 impl<Fut: Future> Debug for FuturesOrdered<Fut> { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result187 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 188 write!(f, "FuturesOrdered {{ ... }}") 189 } 190 } 191 192 impl<Fut: Future> FromIterator<Fut> for FuturesOrdered<Fut> { from_iter<T>(iter: T) -> Self where T: IntoIterator<Item = Fut>,193 fn from_iter<T>(iter: T) -> Self 194 where 195 T: IntoIterator<Item = Fut>, 196 { 197 let acc = Self::new(); 198 iter.into_iter().fold(acc, |mut acc, item| { 199 acc.push(item); 200 acc 201 }) 202 } 203 } 204 205 impl<Fut: Future> FusedStream for FuturesOrdered<Fut> { is_terminated(&self) -> bool206 fn is_terminated(&self) -> bool { 207 self.in_progress_queue.is_terminated() && self.queued_outputs.is_empty() 208 } 209 } 210 211 impl<Fut: Future> Extend<Fut> for FuturesOrdered<Fut> { extend<I>(&mut self, iter: I) where I: IntoIterator<Item = Fut>,212 fn extend<I>(&mut self, iter: I) 213 where 214 I: IntoIterator<Item = Fut>, 215 { 216 for item in iter { 217 self.push(item); 218 } 219 } 220 } 221