1 use core::pin::Pin; 2 use futures_core::ready; 3 use futures_core::stream::{FusedStream, Stream}; 4 use futures_core::task::{Context, Poll}; 5 #[cfg(feature = "sink")] 6 use futures_sink::Sink; 7 use pin_project_lite::pin_project; 8 9 pin_project! { 10 /// Stream for the [`flatten`](super::StreamExt::flatten) method. 11 #[derive(Debug)] 12 #[must_use = "streams do nothing unless polled"] 13 pub struct Flatten<St, U> { 14 #[pin] 15 stream: St, 16 #[pin] 17 next: Option<U>, 18 } 19 } 20 21 impl<St, U> Flatten<St, U> { new(stream: St) -> Self22 pub(super) fn new(stream: St) -> Self { 23 Self { stream, next: None } 24 } 25 26 delegate_access_inner!(stream, St, ()); 27 } 28 29 impl<St> FusedStream for Flatten<St, St::Item> 30 where 31 St: FusedStream, 32 St::Item: Stream, 33 { is_terminated(&self) -> bool34 fn is_terminated(&self) -> bool { 35 self.next.is_none() && self.stream.is_terminated() 36 } 37 } 38 39 impl<St> Stream for Flatten<St, St::Item> 40 where 41 St: Stream, 42 St::Item: Stream, 43 { 44 type Item = <St::Item as Stream>::Item; 45 poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>46 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 47 let mut this = self.project(); 48 Poll::Ready(loop { 49 if let Some(s) = this.next.as_mut().as_pin_mut() { 50 if let Some(item) = ready!(s.poll_next(cx)) { 51 break Some(item); 52 } else { 53 this.next.set(None); 54 } 55 } else if let Some(s) = ready!(this.stream.as_mut().poll_next(cx)) { 56 this.next.set(Some(s)); 57 } else { 58 break None; 59 } 60 }) 61 } 62 } 63 64 // Forwarding impl of Sink from the underlying stream 65 #[cfg(feature = "sink")] 66 impl<S, Item> Sink<Item> for Flatten<S, S::Item> 67 where 68 S: Stream + Sink<Item>, 69 { 70 type Error = S::Error; 71 72 delegate_sink!(stream, Item); 73 } 74