1 use core::pin::Pin; 2 use futures_core::future::{FusedFuture, Future}; 3 use futures_core::ready; 4 use futures_core::stream::{FusedStream, Stream}; 5 use futures_core::task::{Context, Poll}; 6 use pin_project_lite::pin_project; 7 8 pin_project! { 9 /// Future for the [`concat`](super::StreamExt::concat) method. 10 #[derive(Debug)] 11 #[must_use = "futures do nothing unless you `.await` or poll them"] 12 pub struct Concat<St: Stream> { 13 #[pin] 14 stream: St, 15 accum: Option<St::Item>, 16 } 17 } 18 19 impl<St> Concat<St> 20 where 21 St: Stream, 22 St::Item: Extend<<St::Item as IntoIterator>::Item> + IntoIterator + Default, 23 { new(stream: St) -> Self24 pub(super) fn new(stream: St) -> Self { 25 Self { stream, accum: None } 26 } 27 } 28 29 impl<St> Future for Concat<St> 30 where 31 St: Stream, 32 St::Item: Extend<<St::Item as IntoIterator>::Item> + IntoIterator + Default, 33 { 34 type Output = St::Item; 35 poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>36 fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 37 let mut this = self.project(); 38 39 loop { 40 match ready!(this.stream.as_mut().poll_next(cx)) { 41 None => return Poll::Ready(this.accum.take().unwrap_or_default()), 42 Some(e) => { 43 if let Some(a) = this.accum { 44 a.extend(e) 45 } else { 46 *this.accum = Some(e) 47 } 48 } 49 } 50 } 51 } 52 } 53 54 impl<St> FusedFuture for Concat<St> 55 where 56 St: FusedStream, 57 St::Item: Extend<<St::Item as IntoIterator>::Item> + IntoIterator + Default, 58 { is_terminated(&self) -> bool59 fn is_terminated(&self) -> bool { 60 self.accum.is_none() && self.stream.is_terminated() 61 } 62 } 63