1 use core::pin::Pin;
2 use futures_core::future::{FusedFuture, Future};
3 use futures_core::ready;
4 use futures_core::stream::{FusedStream, Stream};
5 use futures_core::task::{Context, Poll};
6 use pin_project_lite::pin_project;
7 
8 pin_project! {
9     /// Future for the [`concat`](super::StreamExt::concat) method.
10     #[derive(Debug)]
11     #[must_use = "futures do nothing unless you `.await` or poll them"]
12     pub struct Concat<St: Stream> {
13         #[pin]
14         stream: St,
15         accum: Option<St::Item>,
16     }
17 }
18 
19 impl<St> Concat<St>
20 where
21     St: Stream,
22     St::Item: Extend<<St::Item as IntoIterator>::Item> + IntoIterator + Default,
23 {
new(stream: St) -> Self24     pub(super) fn new(stream: St) -> Self {
25         Self { stream, accum: None }
26     }
27 }
28 
29 impl<St> Future for Concat<St>
30 where
31     St: Stream,
32     St::Item: Extend<<St::Item as IntoIterator>::Item> + IntoIterator + Default,
33 {
34     type Output = St::Item;
35 
poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>36     fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
37         let mut this = self.project();
38 
39         loop {
40             match ready!(this.stream.as_mut().poll_next(cx)) {
41                 None => return Poll::Ready(this.accum.take().unwrap_or_default()),
42                 Some(e) => {
43                     if let Some(a) = this.accum {
44                         a.extend(e)
45                     } else {
46                         *this.accum = Some(e)
47                     }
48                 }
49             }
50         }
51     }
52 }
53 
54 impl<St> FusedFuture for Concat<St>
55 where
56     St: FusedStream,
57     St::Item: Extend<<St::Item as IntoIterator>::Item> + IntoIterator + Default,
58 {
is_terminated(&self) -> bool59     fn is_terminated(&self) -> bool {
60         self.accum.is_none() && self.stream.is_terminated()
61     }
62 }
63