1 use core::fmt;
2 use core::pin::Pin;
3 use futures_core::future::{FusedFuture, Future};
4 use futures_core::stream::Stream;
5 use futures_core::task::{Context, Poll};
6 use pin_utils::{unsafe_pinned, unsafe_unpinned};
7 
8 /// Future for the [`fold`](super::StreamExt::fold) method.
9 #[must_use = "futures do nothing unless you `.await` or poll them"]
10 pub struct Fold<St, Fut, T, F> {
11     stream: St,
12     f: F,
13     accum: Option<T>,
14     future: Option<Fut>,
15 }
16 
17 impl<St: Unpin, Fut: Unpin, T, F> Unpin for Fold<St, Fut, T, F> {}
18 
19 impl<St, Fut, T, F> fmt::Debug for Fold<St, Fut, T, F>
20 where
21     St: fmt::Debug,
22     Fut: fmt::Debug,
23     T: fmt::Debug,
24 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result25     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
26         f.debug_struct("Fold")
27             .field("stream", &self.stream)
28             .field("accum", &self.accum)
29             .field("future", &self.future)
30             .finish()
31     }
32 }
33 
34 impl<St, Fut, T, F> Fold<St, Fut, T, F>
35 where St: Stream,
36       F: FnMut(T, St::Item) -> Fut,
37       Fut: Future<Output = T>,
38 {
39     unsafe_pinned!(stream: St);
40     unsafe_unpinned!(f: F);
41     unsafe_unpinned!(accum: Option<T>);
42     unsafe_pinned!(future: Option<Fut>);
43 
new(stream: St, f: F, t: T) -> Fold<St, Fut, T, F>44     pub(super) fn new(stream: St, f: F, t: T) -> Fold<St, Fut, T, F> {
45         Fold {
46             stream,
47             f,
48             accum: Some(t),
49             future: None,
50         }
51     }
52 }
53 
54 impl<St, Fut, T, F> FusedFuture for Fold<St, Fut, T, F>
55     where St: Stream,
56           F: FnMut(T, St::Item) -> Fut,
57           Fut: Future<Output = T>,
58 {
is_terminated(&self) -> bool59     fn is_terminated(&self) -> bool {
60         self.accum.is_none() && self.future.is_none()
61     }
62 }
63 
64 impl<St, Fut, T, F> Future for Fold<St, Fut, T, F>
65     where St: Stream,
66           F: FnMut(T, St::Item) -> Fut,
67           Fut: Future<Output = T>,
68 {
69     type Output = T;
70 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T>71     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<T> {
72         loop {
73             // we're currently processing a future to produce a new accum value
74             if self.accum.is_none() {
75                 let accum = ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx));
76                 *self.as_mut().accum() = Some(accum);
77                 self.as_mut().future().set(None);
78             }
79 
80             let item = ready!(self.as_mut().stream().poll_next(cx));
81             let accum = self.as_mut().accum().take()
82                 .expect("Fold polled after completion");
83 
84             if let Some(e) = item {
85                 let future = (self.as_mut().f())(accum, e);
86                 self.as_mut().future().set(Some(future));
87             } else {
88                 return Poll::Ready(accum)
89             }
90         }
91     }
92 }
93