1 use {IntoFuture, Future, Poll, Async};
2 use stream::Stream;
3 
4 /// A stream combinator which chains a computation onto values produced by a
5 /// stream.
6 ///
7 /// This structure is produced by the `Stream::and_then` method.
8 #[derive(Debug)]
9 #[must_use = "streams do nothing unless polled"]
10 pub struct AndThen<S, F, U>
11     where U: IntoFuture,
12 {
13     stream: S,
14     future: Option<U::Future>,
15     f: F,
16 }
17 
new<S, F, U>(s: S, f: F) -> AndThen<S, F, U> where S: Stream, F: FnMut(S::Item) -> U, U: IntoFuture<Error=S::Error>,18 pub fn new<S, F, U>(s: S, f: F) -> AndThen<S, F, U>
19     where S: Stream,
20           F: FnMut(S::Item) -> U,
21           U: IntoFuture<Error=S::Error>,
22 {
23     AndThen {
24         stream: s,
25         future: None,
26         f: f,
27     }
28 }
29 
30 impl<S, F, U> AndThen<S, F, U>
31     where U: IntoFuture,
32 {
33     /// Acquires a reference to the underlying stream that this combinator is
34     /// pulling from.
get_ref(&self) -> &S35     pub fn get_ref(&self) -> &S {
36         &self.stream
37     }
38 
39     /// Acquires a mutable reference to the underlying stream that this
40     /// combinator is pulling from.
41     ///
42     /// Note that care must be taken to avoid tampering with the state of the
43     /// stream which may otherwise confuse this combinator.
get_mut(&mut self) -> &mut S44     pub fn get_mut(&mut self) -> &mut S {
45         &mut self.stream
46     }
47 
48     /// Consumes this combinator, returning the underlying stream.
49     ///
50     /// Note that this may discard intermediate state of this combinator, so
51     /// care should be taken to avoid losing resources when this is called.
into_inner(self) -> S52     pub fn into_inner(self) -> S {
53         self.stream
54     }
55 }
56 
57 // Forwarding impl of Sink from the underlying stream
58 impl<S, F, U: IntoFuture> ::sink::Sink for AndThen<S, F, U>
59     where S: ::sink::Sink
60 {
61     type SinkItem = S::SinkItem;
62     type SinkError = S::SinkError;
63 
start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError>64     fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
65         self.stream.start_send(item)
66     }
67 
poll_complete(&mut self) -> Poll<(), S::SinkError>68     fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
69         self.stream.poll_complete()
70     }
71 
close(&mut self) -> Poll<(), S::SinkError>72     fn close(&mut self) -> Poll<(), S::SinkError> {
73         self.stream.close()
74     }
75 }
76 
77 impl<S, F, U> Stream for AndThen<S, F, U>
78     where S: Stream,
79           F: FnMut(S::Item) -> U,
80           U: IntoFuture<Error=S::Error>,
81 {
82     type Item = U::Item;
83     type Error = S::Error;
84 
poll(&mut self) -> Poll<Option<U::Item>, S::Error>85     fn poll(&mut self) -> Poll<Option<U::Item>, S::Error> {
86         if self.future.is_none() {
87             let item = match try_ready!(self.stream.poll()) {
88                 None => return Ok(Async::Ready(None)),
89                 Some(e) => e,
90             };
91             self.future = Some((self.f)(item).into_future());
92         }
93         assert!(self.future.is_some());
94         match self.future.as_mut().unwrap().poll() {
95             Ok(Async::Ready(e)) => {
96                 self.future = None;
97                 Ok(Async::Ready(Some(e)))
98             }
99             Err(e) => {
100                 self.future = None;
101                 Err(e)
102             }
103             Ok(Async::NotReady) => Ok(Async::NotReady)
104         }
105     }
106 }
107