1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::Future; 4 use futures_core::stream::{FusedStream, Stream}; 5 use futures_core::task::{Context, Poll}; 6 #[cfg(feature = "sink")] 7 use futures_sink::Sink; 8 use pin_project::{pin_project, project}; 9 10 struct StateFn<S, F> { 11 state: S, 12 f: F, 13 } 14 15 /// Stream for the [`scan`](super::StreamExt::scan) method. 16 #[pin_project] 17 #[must_use = "streams do nothing unless polled"] 18 pub struct Scan<St: Stream, S, Fut, F> { 19 #[pin] 20 stream: St, 21 state_f: Option<StateFn<S, F>>, 22 #[pin] 23 future: Option<Fut>, 24 } 25 26 impl<St, S, Fut, F> fmt::Debug for Scan<St, S, Fut, F> 27 where 28 St: Stream + fmt::Debug, 29 St::Item: fmt::Debug, 30 S: fmt::Debug, 31 Fut: fmt::Debug, 32 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result33 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 34 f.debug_struct("Scan") 35 .field("stream", &self.stream) 36 .field("state", &self.state_f.as_ref().map(|s| &s.state)) 37 .field("future", &self.future) 38 .field("done_taking", &self.is_done_taking()) 39 .finish() 40 } 41 } 42 43 impl<St: Stream, S, Fut, F> Scan<St, S, Fut, F> { 44 /// Checks if internal state is `None`. is_done_taking(&self) -> bool45 fn is_done_taking(&self) -> bool { 46 self.state_f.is_none() 47 } 48 } 49 50 impl<B, St, S, Fut, F> Scan<St, S, Fut, F> 51 where 52 St: Stream, 53 F: FnMut(&mut S, St::Item) -> Fut, 54 Fut: Future<Output = Option<B>>, 55 { new(stream: St, initial_state: S, f: F) -> Scan<St, S, Fut, F>56 pub(super) fn new(stream: St, initial_state: S, f: F) -> Scan<St, S, Fut, F> { 57 Scan { 58 stream, 59 state_f: Some(StateFn { 60 state: initial_state, 61 f, 62 }), 63 future: None, 64 } 65 } 66 67 delegate_access_inner!(stream, St, ()); 68 } 69 70 impl<B, St, S, Fut, F> Stream for Scan<St, S, Fut, F> 71 where 72 St: Stream, 73 F: FnMut(&mut S, St::Item) -> Fut, 74 Fut: Future<Output = Option<B>>, 75 { 76 type Item = B; 77 78 #[project] poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>>79 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<B>> { 80 if self.is_done_taking() { 81 return Poll::Ready(None); 82 } 83 84 #[project] 85 let Scan { mut stream, state_f, mut future } = self.project(); 86 87 Poll::Ready(loop { 88 if let Some(fut) = future.as_mut().as_pin_mut() { 89 let item = ready!(fut.poll(cx)); 90 future.set(None); 91 92 if item.is_none() { 93 *state_f = None; 94 } 95 96 break item; 97 } else if let Some(item) = ready!(stream.as_mut().poll_next(cx)) { 98 let state_f = state_f.as_mut().unwrap(); 99 future.set(Some((state_f.f)(&mut state_f.state, item))) 100 } else { 101 break None; 102 } 103 }) 104 } 105 size_hint(&self) -> (usize, Option<usize>)106 fn size_hint(&self) -> (usize, Option<usize>) { 107 if self.is_done_taking() { 108 (0, Some(0)) 109 } else { 110 self.stream.size_hint() // can't know a lower bound, due to the predicate 111 } 112 } 113 } 114 115 impl<B, St, S, Fut, F> FusedStream for Scan<St, S, Fut, F> 116 where 117 St: FusedStream, 118 F: FnMut(&mut S, St::Item) -> Fut, 119 Fut: Future<Output = Option<B>>, 120 { is_terminated(&self) -> bool121 fn is_terminated(&self) -> bool { 122 self.is_done_taking() || self.future.is_none() && self.stream.is_terminated() 123 } 124 } 125 126 // Forwarding impl of Sink from the underlying stream 127 #[cfg(feature = "sink")] 128 impl<S, Fut, F, Item> Sink<Item> for Scan<S, S, Fut, F> 129 where 130 S: Stream + Sink<Item>, 131 { 132 type Error = S::Error; 133 134 delegate_sink!(stream, Item); 135 } 136