1 use core::pin::Pin; 2 use futures_core::future::{Future, FusedFuture}; 3 use futures_core::stream::{Stream, FusedStream}; 4 use futures_core::task::{Context, Poll}; 5 use pin_utils::{unsafe_pinned, unsafe_unpinned}; 6 7 /// Future for the [`concat`](super::StreamExt::concat) method. 8 #[derive(Debug)] 9 #[must_use = "futures do nothing unless you `.await` or poll them"] 10 pub struct Concat<St: Stream> { 11 stream: St, 12 accum: Option<St::Item>, 13 } 14 15 impl<St: Stream + Unpin> Unpin for Concat<St> {} 16 17 impl<St> Concat<St> 18 where St: Stream, 19 St::Item: Extend<<St::Item as IntoIterator>::Item> + 20 IntoIterator + Default, 21 { 22 unsafe_pinned!(stream: St); 23 unsafe_unpinned!(accum: Option<St::Item>); 24 new(stream: St) -> Concat<St>25 pub(super) fn new(stream: St) -> Concat<St> { 26 Concat { 27 stream, 28 accum: None, 29 } 30 } 31 } 32 33 impl<St> Future for Concat<St> 34 where St: Stream, 35 St::Item: Extend<<St::Item as IntoIterator>::Item> + 36 IntoIterator + Default, 37 { 38 type Output = St::Item; 39 poll( mut self: Pin<&mut Self>, cx: &mut Context<'_> ) -> Poll<Self::Output>40 fn poll( 41 mut self: Pin<&mut Self>, cx: &mut Context<'_> 42 ) -> Poll<Self::Output> { 43 loop { 44 match ready!(self.as_mut().stream().poll_next(cx)) { 45 None => { 46 return Poll::Ready(self.as_mut().accum().take().unwrap_or_default()) 47 } 48 Some(e) => { 49 let accum = self.as_mut().accum(); 50 if let Some(a) = accum { 51 a.extend(e) 52 } else { 53 *accum = Some(e) 54 } 55 } 56 } 57 } 58 } 59 } 60 61 impl<St> FusedFuture for Concat<St> 62 where St: FusedStream, 63 St::Item: Extend<<St::Item as IntoIterator>::Item> + 64 IntoIterator + Default, 65 { is_terminated(&self) -> bool66 fn is_terminated(&self) -> bool { 67 self.accum.is_none() && self.stream.is_terminated() 68 } 69 } 70