use core::fmt; use core::pin::Pin; use futures_core::future::{FusedFuture, Future}; use futures_core::stream::Stream; use futures_core::task::{Context, Poll}; use pin_utils::{unsafe_pinned, unsafe_unpinned}; /// Future for the [`fold`](super::StreamExt::fold) method. #[must_use = "futures do nothing unless you `.await` or poll them"] pub struct Fold { stream: St, f: F, accum: Option, future: Option, } impl Unpin for Fold {} impl fmt::Debug for Fold where St: fmt::Debug, Fut: fmt::Debug, T: fmt::Debug, { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.debug_struct("Fold") .field("stream", &self.stream) .field("accum", &self.accum) .field("future", &self.future) .finish() } } impl Fold where St: Stream, F: FnMut(T, St::Item) -> Fut, Fut: Future, { unsafe_pinned!(stream: St); unsafe_unpinned!(f: F); unsafe_unpinned!(accum: Option); unsafe_pinned!(future: Option); pub(super) fn new(stream: St, f: F, t: T) -> Fold { Fold { stream, f, accum: Some(t), future: None, } } } impl FusedFuture for Fold where St: Stream, F: FnMut(T, St::Item) -> Fut, Fut: Future, { fn is_terminated(&self) -> bool { self.accum.is_none() && self.future.is_none() } } impl Future for Fold where St: Stream, F: FnMut(T, St::Item) -> Fut, Fut: Future, { type Output = T; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { // we're currently processing a future to produce a new accum value if self.accum.is_none() { let accum = ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); *self.as_mut().accum() = Some(accum); self.as_mut().future().set(None); } let item = ready!(self.as_mut().stream().poll_next(cx)); let accum = self.as_mut().accum().take() .expect("Fold polled after completion"); if let Some(e) = item { let future = (self.as_mut().f())(accum, e); self.as_mut().future().set(Some(future)); } else { return Poll::Ready(accum) } } } }