1 use core::pin::Pin;
2 use futures_core::stream::{FusedStream, Stream};
3 use futures_core::task::{Context, Poll};
4 #[cfg(feature = "sink")]
5 use futures_sink::Sink;
6 use pin_utils::unsafe_pinned;
7 
8 /// Stream for the [`flatten`](super::StreamExt::flatten) method.
9 #[derive(Debug)]
10 #[must_use = "streams do nothing unless polled"]
11 pub struct Flatten<St>
12 where
13     St: Stream,
14 {
15     stream: St,
16     next: Option<St::Item>,
17 }
18 
19 impl<St> Unpin for Flatten<St>
20 where
21     St: Stream + Unpin,
22     St::Item: Unpin,
23 {
24 }
25 
26 impl<St> Flatten<St>
27 where
28     St: Stream,
29 {
30     unsafe_pinned!(stream: St);
31     unsafe_pinned!(next: Option<St::Item>);
32 }
33 
34 impl<St> Flatten<St>
35 where
36     St: Stream,
37     St::Item: Stream,
38 {
new(stream: St) -> Self39     pub(super) fn new(stream: St) -> Self {
40         Self { stream, next: None }
41     }
42 
43     /// Acquires a reference to the underlying stream that this combinator is
44     /// pulling from.
get_ref(&self) -> &St45     pub fn get_ref(&self) -> &St {
46         &self.stream
47     }
48 
49     /// Acquires a mutable reference to the underlying stream that this
50     /// combinator is pulling from.
51     ///
52     /// Note that care must be taken to avoid tampering with the state of the
53     /// stream which may otherwise confuse this combinator.
get_mut(&mut self) -> &mut St54     pub fn get_mut(&mut self) -> &mut St {
55         &mut self.stream
56     }
57 
58     /// Acquires a pinned mutable reference to the underlying stream that this
59     /// combinator is pulling from.
60     ///
61     /// Note that care must be taken to avoid tampering with the state of the
62     /// stream which may otherwise confuse this combinator.
get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St>63     pub fn get_pin_mut(self: Pin<&mut Self>) -> Pin<&mut St> {
64         self.stream()
65     }
66 
67     /// Consumes this combinator, returning the underlying stream.
68     ///
69     /// Note that this may discard intermediate state of this combinator, so
70     /// care should be taken to avoid losing resources when this is called.
into_inner(self) -> St71     pub fn into_inner(self) -> St {
72         self.stream
73     }
74 }
75 
76 impl<St> FusedStream for Flatten<St>
77 where
78     St: FusedStream,
79     St::Item: Stream,
80 {
is_terminated(&self) -> bool81     fn is_terminated(&self) -> bool {
82         self.next.is_none() && self.stream.is_terminated()
83     }
84 }
85 
86 impl<St> Stream for Flatten<St>
87 where
88     St: Stream,
89     St::Item: Stream,
90 {
91     type Item = <St::Item as Stream>::Item;
92 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>93     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
94         loop {
95             if self.next.is_none() {
96                 match ready!(self.as_mut().stream().poll_next(cx)) {
97                     Some(e) => self.as_mut().next().set(Some(e)),
98                     None => return Poll::Ready(None),
99                 }
100             }
101 
102             if let Some(item) = ready!(self.as_mut().next().as_pin_mut().unwrap().poll_next(cx)) {
103                 return Poll::Ready(Some(item));
104             } else {
105                 self.as_mut().next().set(None);
106             }
107         }
108     }
109 }
110 
111 // Forwarding impl of Sink from the underlying stream
112 #[cfg(feature = "sink")]
113 impl<S, Item> Sink<Item> for Flatten<S>
114 where
115     S: Stream + Sink<Item>,
116 {
117     type Error = S::Error;
118 
119     delegate_sink!(stream, Item);
120 }
121