use crate::stream::{Fuse, FuturesUnordered, StreamExt}; use futures_core::future::Future; use futures_core::stream::{Stream, FusedStream}; use futures_core::task::{Context, Poll}; #[cfg(feature = "sink")] use futures_sink::Sink; use pin_project::{pin_project, project}; use core::fmt; use core::pin::Pin; /// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered) /// method. #[pin_project] #[must_use = "streams do nothing unless polled"] pub struct BufferUnordered where St: Stream, { #[pin] stream: Fuse, in_progress_queue: FuturesUnordered, max: usize, } impl fmt::Debug for BufferUnordered where St: Stream + fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("BufferUnordered") .field("stream", &self.stream) .field("in_progress_queue", &self.in_progress_queue) .field("max", &self.max) .finish() } } impl BufferUnordered where St: Stream, St::Item: Future, { pub(super) fn new(stream: St, n: usize) -> BufferUnordered where St: Stream, St::Item: Future, { BufferUnordered { stream: super::Fuse::new(stream), in_progress_queue: FuturesUnordered::new(), max: n, } } delegate_access_inner!(stream, St, (.)); } impl Stream for BufferUnordered where St: Stream, St::Item: Future, { type Item = ::Output; #[project] fn poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll> { #[project] let BufferUnordered { mut stream, in_progress_queue, max } = self.project(); // First up, try to spawn off as many futures as possible by filling up // our queue of futures. while in_progress_queue.len() < *max { match stream.as_mut().poll_next(cx) { Poll::Ready(Some(fut)) => in_progress_queue.push(fut), Poll::Ready(None) | Poll::Pending => break, } } // Attempt to pull the next value from the in_progress_queue match in_progress_queue.poll_next_unpin(cx) { x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x, Poll::Ready(None) => {} } // If more values are still coming from the stream, we're not done yet if stream.is_done() { Poll::Ready(None) } else { Poll::Pending } } fn size_hint(&self) -> (usize, Option) { let queue_len = self.in_progress_queue.len(); let (lower, upper) = self.stream.size_hint(); let lower = lower.saturating_add(queue_len); let upper = match upper { Some(x) => x.checked_add(queue_len), None => None, }; (lower, upper) } } impl FusedStream for BufferUnordered where St: Stream, St::Item: Future, { fn is_terminated(&self) -> bool { self.in_progress_queue.is_terminated() && self.stream.is_terminated() } } // Forwarding impl of Sink from the underlying stream #[cfg(feature = "sink")] impl Sink for BufferUnordered where S: Stream + Sink, S::Item: Future, { type Error = S::Error; delegate_sink!(stream, Item); }