1 use core::pin::Pin; 2 use futures_core::stream::{FusedStream, Stream}; 3 use futures_core::task::{Context, Poll}; 4 #[cfg(feature = "sink")] 5 use futures_sink::Sink; 6 use pin_utils::unsafe_pinned; 7 8 /// Stream for the [`flatten`](super::StreamExt::flatten) method. 9 #[derive(Debug)] 10 #[must_use = "streams do nothing unless polled"] 11 pub struct Flatten<St> 12 where 13 St: Stream, 14 { 15 stream: St, 16 next: Option<St::Item>, 17 } 18 19 impl<St> Unpin for Flatten<St> 20 where 21 St: Stream + Unpin, 22 St::Item: Unpin, 23 { 24 } 25 26 impl<St> Flatten<St> 27 where 28 St: Stream, 29 { 30 unsafe_pinned!(stream: St); 31 unsafe_pinned!(next: Option<St::Item>); 32 } 33 34 impl<St> Flatten<St> 35 where 36 St: Stream, 37 St::Item: Stream, 38 { new(stream: St) -> Self39 pub(super) fn new(stream: St) -> Self { 40 Self { stream, next: None } 41 } 42 43 /// Acquires a reference to the underlying stream that this combinator is 44 /// pulling from. get_ref(&self) -> &St45 pub fn get_ref(&self) -> &St { 46 &self.stream 47 } 48 49 /// Acquires a mutable reference to the underlying stream that this 50 /// combinator is pulling from. 51 /// 52 /// Note that care must be taken to avoid tampering with the state of the 53 /// stream which may otherwise confuse this combinator. get_mut(&mut self) -> &mut St54 pub fn get_mut(&mut self) -> &mut St { 55 &mut self.stream 56 } 57 58 /// Acquires a pinned mutable reference to the underlying stream that this 59 /// combinator is pulling from. 60 /// 61 /// Note that care must be taken to avoid tampering with the state of the 62 /// stream which may otherwise confuse this combinator. get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St>63 pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> { 64 self.stream() 65 } 66 67 /// Consumes this combinator, returning the underlying stream. 68 /// 69 /// Note that this may discard intermediate state of this combinator, so 70 /// care should be taken to avoid losing resources when this is called. into_inner(self) -> St71 pub fn into_inner(self) -> St { 72 self.stream 73 } 74 } 75 76 impl<St> FusedStream for Flatten<St> 77 where 78 St: FusedStream, 79 St::Item: Stream, 80 { is_terminated(&self) -> bool81 fn is_terminated(&self) -> bool { 82 self.next.is_none() && self.stream.is_terminated() 83 } 84 } 85 86 impl<St> Stream for Flatten<St> 87 where 88 St: Stream, 89 St::Item: Stream, 90 { 91 type Item = <St::Item as Stream>::Item; 92 poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>93 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> { 94 loop { 95 if self.next.is_none() { 96 match ready!(self.as_mut().stream().poll_next(cx)) { 97 Some(e) => self.as_mut().next().set(Some(e)), 98 None => return Poll::Ready(None), 99 } 100 } 101 102 if let Some(item) = ready!(self.as_mut().next().as_pin_mut().unwrap().poll_next(cx)) { 103 return Poll::Ready(Some(item)); 104 } else { 105 self.as_mut().next().set(None); 106 } 107 } 108 } 109 } 110 111 // Forwarding impl of Sink from the underlying stream 112 #[cfg(feature = "sink")] 113 impl<S, Item> Sink<Item> for Flatten<S> 114 where 115 S: Stream + Sink<Item>, 116 { 117 type Error = S::Error; 118 119 delegate_sink!(stream, Item); 120 } 121