1 use core::mem;
2 
3 use {Future, Poll, IntoFuture, Async};
4 use stream::Stream;
5 
6 /// A future used to collect all the results of a stream into one generic type.
7 ///
8 /// This future is returned by the `Stream::fold` method.
9 #[derive(Debug)]
10 #[must_use = "streams do nothing unless polled"]
11 pub struct Fold<S, F, Fut, T> where Fut: IntoFuture {
12     stream: S,
13     f: F,
14     state: State<T, Fut::Future>,
15 }
16 
17 #[derive(Debug)]
18 enum State<T, F> where F: Future {
19     /// Placeholder state when doing work
20     Empty,
21 
22     /// Ready to process the next stream item; current accumulator is the `T`
23     Ready(T),
24 
25     /// Working on a future the process the previous stream item
26     Processing(F),
27 }
28 
new<S, F, Fut, T>(s: S, f: F, t: T) -> Fold<S, F, Fut, T> where S: Stream, F: FnMut(T, S::Item) -> Fut, Fut: IntoFuture<Item = T>, S::Error: From<Fut::Error>,29 pub fn new<S, F, Fut, T>(s: S, f: F, t: T) -> Fold<S, F, Fut, T>
30     where S: Stream,
31           F: FnMut(T, S::Item) -> Fut,
32           Fut: IntoFuture<Item = T>,
33           S::Error: From<Fut::Error>,
34 {
35     Fold {
36         stream: s,
37         f: f,
38         state: State::Ready(t),
39     }
40 }
41 
42 impl<S, F, Fut, T> Future for Fold<S, F, Fut, T>
43     where S: Stream,
44           F: FnMut(T, S::Item) -> Fut,
45           Fut: IntoFuture<Item = T>,
46           S::Error: From<Fut::Error>,
47 {
48     type Item = T;
49     type Error = S::Error;
50 
poll(&mut self) -> Poll<T, S::Error>51     fn poll(&mut self) -> Poll<T, S::Error> {
52         loop {
53             match mem::replace(&mut self.state, State::Empty) {
54                 State::Empty => panic!("cannot poll Fold twice"),
55                 State::Ready(state) => {
56                     match self.stream.poll()? {
57                         Async::Ready(Some(e)) => {
58                             let future = (self.f)(state, e);
59                             let future = future.into_future();
60                             self.state = State::Processing(future);
61                         }
62                         Async::Ready(None) => return Ok(Async::Ready(state)),
63                         Async::NotReady => {
64                             self.state = State::Ready(state);
65                             return Ok(Async::NotReady)
66                         }
67                     }
68                 }
69                 State::Processing(mut fut) => {
70                     match fut.poll()? {
71                         Async::Ready(state) => self.state = State::Ready(state),
72                         Async::NotReady => {
73                             self.state = State::Processing(fut);
74                             return Ok(Async::NotReady)
75                         }
76                     }
77                 }
78             }
79         }
80     }
81 }
82