1 use crate::stream::{Fuse, FuturesUnordered, StreamExt}; 2 use futures_core::future::Future; 3 use futures_core::stream::{Stream, FusedStream}; 4 use futures_core::task::{Context, Poll}; 5 #[cfg(feature = "sink")] 6 use futures_sink::Sink; 7 use pin_project::{pin_project, project}; 8 use core::fmt; 9 use core::pin::Pin; 10 11 /// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered) 12 /// method. 13 #[pin_project] 14 #[must_use = "streams do nothing unless polled"] 15 pub struct BufferUnordered<St> 16 where 17 St: Stream, 18 { 19 #[pin] 20 stream: Fuse<St>, 21 in_progress_queue: FuturesUnordered<St::Item>, 22 max: usize, 23 } 24 25 impl<St> fmt::Debug for BufferUnordered<St> 26 where 27 St: Stream + fmt::Debug, 28 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result29 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 30 f.debug_struct("BufferUnordered") 31 .field("stream", &self.stream) 32 .field("in_progress_queue", &self.in_progress_queue) 33 .field("max", &self.max) 34 .finish() 35 } 36 } 37 38 impl<St> BufferUnordered<St> 39 where 40 St: Stream, 41 St::Item: Future, 42 { new(stream: St, n: usize) -> BufferUnordered<St> where St: Stream, St::Item: Future,43 pub(super) fn new(stream: St, n: usize) -> BufferUnordered<St> 44 where 45 St: Stream, 46 St::Item: Future, 47 { 48 BufferUnordered { 49 stream: super::Fuse::new(stream), 50 in_progress_queue: FuturesUnordered::new(), 51 max: n, 52 } 53 } 54 55 delegate_access_inner!(stream, St, (.)); 56 } 57 58 impl<St> Stream for BufferUnordered<St> 59 where 60 St: Stream, 61 St::Item: Future, 62 { 63 type Item = <St::Item as Future>::Output; 64 65 #[project] poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>66 fn poll_next( 67 self: Pin<&mut Self>, 68 cx: &mut Context<'_>, 69 ) -> Poll<Option<Self::Item>> { 70 #[project] 71 let BufferUnordered { mut stream, in_progress_queue, max } = self.project(); 72 73 // First up, try to spawn off as many futures as possible by filling up 74 // our queue of futures. 75 while in_progress_queue.len() < *max { 76 match stream.as_mut().poll_next(cx) { 77 Poll::Ready(Some(fut)) => in_progress_queue.push(fut), 78 Poll::Ready(None) | Poll::Pending => break, 79 } 80 } 81 82 // Attempt to pull the next value from the in_progress_queue 83 match in_progress_queue.poll_next_unpin(cx) { 84 x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x, 85 Poll::Ready(None) => {} 86 } 87 88 // If more values are still coming from the stream, we're not done yet 89 if stream.is_done() { 90 Poll::Ready(None) 91 } else { 92 Poll::Pending 93 } 94 } 95 size_hint(&self) -> (usize, Option<usize>)96 fn size_hint(&self) -> (usize, Option<usize>) { 97 let queue_len = self.in_progress_queue.len(); 98 let (lower, upper) = self.stream.size_hint(); 99 let lower = lower.saturating_add(queue_len); 100 let upper = match upper { 101 Some(x) => x.checked_add(queue_len), 102 None => None, 103 }; 104 (lower, upper) 105 } 106 } 107 108 impl<St> FusedStream for BufferUnordered<St> 109 where 110 St: Stream, 111 St::Item: Future, 112 { is_terminated(&self) -> bool113 fn is_terminated(&self) -> bool { 114 self.in_progress_queue.is_terminated() && self.stream.is_terminated() 115 } 116 } 117 118 // Forwarding impl of Sink from the underlying stream 119 #[cfg(feature = "sink")] 120 impl<S, Item> Sink<Item> for BufferUnordered<S> 121 where 122 S: Stream + Sink<Item>, 123 S::Item: Future, 124 { 125 type Error = S::Error; 126 127 delegate_sink!(stream, Item); 128 } 129