1 use core::fmt;
2 use core::pin::Pin;
3 use futures_core::future::Future;
4 use futures_core::stream::{FusedStream, Stream};
5 use futures_core::task::{Context, Poll};
6 #[cfg(feature = "sink")]
7 use futures_sink::Sink;
8 use pin_project::{pin_project, project};
9 
10 struct StateFn<S, F> {
11     state: S,
12     f: F,
13 }
14 
15 /// Stream for the [`scan`](super::StreamExt::scan) method.
16 #[pin_project]
17 #[must_use = "streams do nothing unless polled"]
18 pub struct Scan<St: Stream, S, Fut, F> {
19     #[pin]
20     stream: St,
21     state_f: Option<StateFn<S, F>>,
22     #[pin]
23     future: Option<Fut>,
24 }
25 
26 impl<St, S, Fut, F> fmt::Debug for Scan<St, S, Fut, F>
27 where
28     St: Stream + fmt::Debug,
29     St::Item: fmt::Debug,
30     S: fmt::Debug,
31     Fut: fmt::Debug,
32 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result33     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
34         f.debug_struct("Scan")
35             .field("stream", &self.stream)
36             .field("state", &self.state_f.as_ref().map(|s| &s.state))
37             .field("future", &self.future)
38             .field("done_taking", &self.is_done_taking())
39             .finish()
40     }
41 }
42 
43 impl<St: Stream, S, Fut, F> Scan<St, S, Fut, F> {
44     /// Checks if internal state is `None`.
is_done_taking(&self) -> bool45     fn is_done_taking(&self) -> bool {
46         self.state_f.is_none()
47     }
48 }
49 
50 impl<B, St, S, Fut, F> Scan<St, S, Fut, F>
51 where
52     St: Stream,
53     F: FnMut(&mut S, St::Item) -> Fut,
54     Fut: Future<Output = Option<B>>,
55 {
new(stream: St, initial_state: S, f: F) -> Scan<St, S, Fut, F>56     pub(super) fn new(stream: St, initial_state: S, f: F) -> Scan<St, S, Fut, F> {
57         Scan {
58             stream,
59             state_f: Some(StateFn {
60                 state: initial_state,
61                 f,
62             }),
63             future: None,
64         }
65     }
66 
67     delegate_access_inner!(stream, St, ());
68 }
69 
70 impl<B, St, S, Fut, F> Stream for Scan<St, S, Fut, F>
71 where
72     St: Stream,
73     F: FnMut(&mut S, St::Item) -> Fut,
74     Fut: Future<Output = Option<B>>,
75 {
76     type Item = B;
77 
78     #[project]
poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>>79     fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> {
80         if self.is_done_taking() {
81             return Poll::Ready(None);
82         }
83 
84         #[project]
85         let Scan { mut stream, state_f, mut future } = self.project();
86 
87         Poll::Ready(loop {
88             if let Some(fut) = future.as_mut().as_pin_mut() {
89                 let item = ready!(fut.poll(cx));
90                 future.set(None);
91 
92                 if item.is_none() {
93                     *state_f = None;
94                 }
95 
96                 break item;
97             } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) {
98                 let state_f = state_f.as_mut().unwrap();
99                 future.set(Some((state_f.f)(&mut state_f.state, item)))
100             } else {
101                 break None;
102             }
103         })
104     }
105 
size_hint(&self) -> (usize, Option<usize>)106     fn size_hint(&self) -> (usize, Option<usize>) {
107         if self.is_done_taking() {
108             (0, Some(0))
109         } else {
110             self.stream.size_hint() // can't know a lower bound, due to the predicate
111         }
112     }
113 }
114 
115 impl<B, St, S, Fut, F> FusedStream for Scan<St, S, Fut, F>
116 where
117     St: FusedStream,
118     F: FnMut(&mut S, St::Item) -> Fut,
119     Fut: Future<Output = Option<B>>,
120 {
is_terminated(&self) -> bool121     fn is_terminated(&self) -> bool {
122         self.is_done_taking() || self.future.is_none() && self.stream.is_terminated()
123     }
124 }
125 
126 // Forwarding impl of Sink from the underlying stream
127 #[cfg(feature = "sink")]
128 impl<S, Fut, F, Item> Sink<Item> for Scan<S, S, Fut, F>
129 where
130     S: Stream + Sink<Item>,
131 {
132     type Error = S::Error;
133 
134     delegate_sink!(stream, Item);
135 }
136