1 use core::mem;
2
3 use {Future, Poll, IntoFuture, Async};
4 use stream::Stream;
5
6 /// A future used to collect all the results of a stream into one generic type.
7 ///
8 /// This future is returned by the `Stream::fold` method.
9 #[derive(Debug)]
10 #[must_use = "streams do nothing unless polled"]
11 pub struct Fold<S, F, Fut, T> where Fut: IntoFuture {
12 stream: S,
13 f: F,
14 state: State<T, Fut::Future>,
15 }
16
17 #[derive(Debug)]
18 enum State<T, F> where F: Future {
19 /// Placeholder state when doing work
20 Empty,
21
22 /// Ready to process the next stream item; current accumulator is the `T`
23 Ready(T),
24
25 /// Working on a future the process the previous stream item
26 Processing(F),
27 }
28
new<S, F, Fut, T>(s: S, f: F, t: T) -> Fold<S, F, Fut, T> where S: Stream, F: FnMut(T, S::Item) -> Fut, Fut: IntoFuture<Item = T>, S::Error: From<Fut::Error>,29 pub fn new<S, F, Fut, T>(s: S, f: F, t: T) -> Fold<S, F, Fut, T>
30 where S: Stream,
31 F: FnMut(T, S::Item) -> Fut,
32 Fut: IntoFuture<Item = T>,
33 S::Error: From<Fut::Error>,
34 {
35 Fold {
36 stream: s,
37 f: f,
38 state: State::Ready(t),
39 }
40 }
41
42 impl<S, F, Fut, T> Future for Fold<S, F, Fut, T>
43 where S: Stream,
44 F: FnMut(T, S::Item) -> Fut,
45 Fut: IntoFuture<Item = T>,
46 S::Error: From<Fut::Error>,
47 {
48 type Item = T;
49 type Error = S::Error;
50
poll(&mut self) -> Poll<T, S::Error>51 fn poll(&mut self) -> Poll<T, S::Error> {
52 loop {
53 match mem::replace(&mut self.state, State::Empty) {
54 State::Empty => panic!("cannot poll Fold twice"),
55 State::Ready(state) => {
56 match self.stream.poll()? {
57 Async::Ready(Some(e)) => {
58 let future = (self.f)(state, e);
59 let future = future.into_future();
60 self.state = State::Processing(future);
61 }
62 Async::Ready(None) => return Ok(Async::Ready(state)),
63 Async::NotReady => {
64 self.state = State::Ready(state);
65 return Ok(Async::NotReady)
66 }
67 }
68 }
69 State::Processing(mut fut) => {
70 match fut.poll()? {
71 Async::Ready(state) => self.state = State::Ready(state),
72 Async::NotReady => {
73 self.state = State::Processing(fut);
74 return Ok(Async::NotReady)
75 }
76 }
77 }
78 }
79 }
80 }
81 }
82