1 use core::fmt; 2 use core::pin::Pin; 3 use futures_core::future::{FusedFuture, Future}; 4 use futures_core::stream::Stream; 5 use futures_core::task::{Context, Poll}; 6 use pin_utils::{unsafe_pinned, unsafe_unpinned}; 7 8 /// Future for the [`fold`](super::StreamExt::fold) method. 9 #[must_use = "futures do nothing unless you `.await` or poll them"] 10 pub struct Fold<St, Fut, T, F> { 11 stream: St, 12 f: F, 13 accum: Option<T>, 14 future: Option<Fut>, 15 } 16 17 impl<St: Unpin, Fut: Unpin, T, F> Unpin for Fold<St, Fut, T, F> {} 18 19 impl<St, Fut, T, F> fmt::Debug for Fold<St, Fut, T, F> 20 where 21 St: fmt::Debug, 22 Fut: fmt::Debug, 23 T: fmt::Debug, 24 { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result25 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 26 f.debug_struct("Fold") 27 .field("stream", &self.stream) 28 .field("accum", &self.accum) 29 .field("future", &self.future) 30 .finish() 31 } 32 } 33 34 impl<St, Fut, T, F> Fold<St, Fut, T, F> 35 where St: Stream, 36 F: FnMut(T, St::Item) -> Fut, 37 Fut: Future<Output = T>, 38 { 39 unsafe_pinned!(stream: St); 40 unsafe_unpinned!(f: F); 41 unsafe_unpinned!(accum: Option<T>); 42 unsafe_pinned!(future: Option<Fut>); 43 new(stream: St, f: F, t: T) -> Fold<St, Fut, T, F>44 pub(super) fn new(stream: St, f: F, t: T) -> Fold<St, Fut, T, F> { 45 Fold { 46 stream, 47 f, 48 accum: Some(t), 49 future: None, 50 } 51 } 52 } 53 54 impl<St, Fut, T, F> FusedFuture for Fold<St, Fut, T, F> 55 where St: Stream, 56 F: FnMut(T, St::Item) -> Fut, 57 Fut: Future<Output = T>, 58 { is_terminated(&self) -> bool59 fn is_terminated(&self) -> bool { 60 self.accum.is_none() && self.future.is_none() 61 } 62 } 63 64 impl<St, Fut, T, F> Future for Fold<St, Fut, T, F> 65 where St: Stream, 66 F: FnMut(T, St::Item) -> Fut, 67 Fut: Future<Output = T>, 68 { 69 type Output = T; 70 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T>71 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> { 72 loop { 73 // we're currently processing a future to produce a new accum value 74 if self.accum.is_none() { 75 let accum = ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); 76 *self.as_mut().accum() = Some(accum); 77 self.as_mut().future().set(None); 78 } 79 80 let item = ready!(self.as_mut().stream().poll_next(cx)); 81 let accum = self.as_mut().accum().take() 82 .expect("Fold polled after completion"); 83 84 if let Some(e) = item { 85 let future = (self.as_mut().f())(accum, e); 86 self.as_mut().future().set(Some(future)); 87 } else { 88 return Poll::Ready(accum) 89 } 90 } 91 } 92 } 93