1 use crate::stream::{Fuse, FuturesUnordered, StreamExt};
2 use futures_core::future::Future;
3 use futures_core::stream::{Stream, FusedStream};
4 use futures_core::task::{Context, Poll};
5 #[cfg(feature = "sink")]
6 use futures_sink::Sink;
7 use pin_project::{pin_project, project};
8 use core::fmt;
9 use core::pin::Pin;
10 
11 /// Stream for the [`buffer_unordered`](super::StreamExt::buffer_unordered)
12 /// method.
13 #[pin_project]
14 #[must_use = "streams do nothing unless polled"]
15 pub struct BufferUnordered<St>
16 where
17     St: Stream,
18 {
19     #[pin]
20     stream: Fuse<St>,
21     in_progress_queue: FuturesUnordered<St::Item>,
22     max: usize,
23 }
24 
25 impl<St> fmt::Debug for BufferUnordered<St>
26 where
27     St: Stream + fmt::Debug,
28 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result29     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
30         f.debug_struct("BufferUnordered")
31             .field("stream", &self.stream)
32             .field("in_progress_queue", &self.in_progress_queue)
33             .field("max", &self.max)
34             .finish()
35     }
36 }
37 
38 impl<St> BufferUnordered<St>
39 where
40     St: Stream,
41     St::Item: Future,
42 {
new(stream: St, n: usize) -> BufferUnordered<St> where St: Stream, St::Item: Future,43     pub(super) fn new(stream: St, n: usize) -> BufferUnordered<St>
44     where
45         St: Stream,
46         St::Item: Future,
47     {
48         BufferUnordered {
49             stream: super::Fuse::new(stream),
50             in_progress_queue: FuturesUnordered::new(),
51             max: n,
52         }
53     }
54 
55     delegate_access_inner!(stream, St, (.));
56 }
57 
58 impl<St> Stream for BufferUnordered<St>
59 where
60     St: Stream,
61     St::Item: Future,
62 {
63     type Item = <St::Item as Future>::Output;
64 
65     #[project]
poll_next( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<Option<Self::Item>>66     fn poll_next(
67         self: Pin<&mut Self>,
68         cx: &mut Context<'_>,
69     ) -> Poll<Option<Self::Item>> {
70         #[project]
71         let BufferUnordered { mut stream, in_progress_queue, max } = self.project();
72 
73         // First up, try to spawn off as many futures as possible by filling up
74         // our queue of futures.
75         while in_progress_queue.len() < *max {
76             match stream.as_mut().poll_next(cx) {
77                 Poll::Ready(Some(fut)) => in_progress_queue.push(fut),
78                 Poll::Ready(None) | Poll::Pending => break,
79             }
80         }
81 
82         // Attempt to pull the next value from the in_progress_queue
83         match in_progress_queue.poll_next_unpin(cx) {
84             x @ Poll::Pending | x @ Poll::Ready(Some(_)) => return x,
85             Poll::Ready(None) => {}
86         }
87 
88         // If more values are still coming from the stream, we're not done yet
89         if stream.is_done() {
90             Poll::Ready(None)
91         } else {
92             Poll::Pending
93         }
94     }
95 
size_hint(&self) -> (usize, Option<usize>)96     fn size_hint(&self) -> (usize, Option<usize>) {
97         let queue_len = self.in_progress_queue.len();
98         let (lower, upper) = self.stream.size_hint();
99         let lower = lower.saturating_add(queue_len);
100         let upper = match upper {
101             Some(x) => x.checked_add(queue_len),
102             None => None,
103         };
104         (lower, upper)
105     }
106 }
107 
108 impl<St> FusedStream for BufferUnordered<St>
109 where
110     St: Stream,
111     St::Item: Future,
112 {
is_terminated(&self) -> bool113     fn is_terminated(&self) -> bool {
114         self.in_progress_queue.is_terminated() && self.stream.is_terminated()
115     }
116 }
117 
118 // Forwarding impl of Sink from the underlying stream
119 #[cfg(feature = "sink")]
120 impl<S, Item> Sink<Item> for BufferUnordered<S>
121 where
122     S: Stream + Sink<Item>,
123     S::Item: Future,
124 {
125     type Error = S::Error;
126 
127     delegate_sink!(stream, Item);
128 }
129