1 use {IntoFuture, Future, Poll, Async};
2 use stream::Stream;
3 
4 /// A stream combinator which chains a computation onto errors produced by a
5 /// stream.
6 ///
7 /// This structure is produced by the `Stream::or_else` method.
8 #[derive(Debug)]
9 #[must_use = "streams do nothing unless polled"]
10 pub struct OrElse<S, F, U>
11     where U: IntoFuture,
12 {
13     stream: S,
14     future: Option<U::Future>,
15     f: F,
16 }
17 
new<S, F, U>(s: S, f: F) -> OrElse<S, F, U> where S: Stream, F: FnMut(S::Error) -> U, U: IntoFuture<Item=S::Item>,18 pub fn new<S, F, U>(s: S, f: F) -> OrElse<S, F, U>
19     where S: Stream,
20           F: FnMut(S::Error) -> U,
21           U: IntoFuture<Item=S::Item>,
22 {
23     OrElse {
24         stream: s,
25         future: None,
26         f: f,
27     }
28 }
29 
30 // Forwarding impl of Sink from the underlying stream
31 impl<S, F, U> ::sink::Sink for OrElse<S, F, U>
32     where S: ::sink::Sink, U: IntoFuture
33 {
34     type SinkItem = S::SinkItem;
35     type SinkError = S::SinkError;
36 
start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError>37     fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
38         self.stream.start_send(item)
39     }
40 
poll_complete(&mut self) -> Poll<(), S::SinkError>41     fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
42         self.stream.poll_complete()
43     }
44 
close(&mut self) -> Poll<(), S::SinkError>45     fn close(&mut self) -> Poll<(), S::SinkError> {
46         self.stream.close()
47     }
48 }
49 
50 impl<S, F, U> Stream for OrElse<S, F, U>
51     where S: Stream,
52           F: FnMut(S::Error) -> U,
53           U: IntoFuture<Item=S::Item>,
54 {
55     type Item = S::Item;
56     type Error = U::Error;
57 
poll(&mut self) -> Poll<Option<S::Item>, U::Error>58     fn poll(&mut self) -> Poll<Option<S::Item>, U::Error> {
59         if self.future.is_none() {
60             let item = match self.stream.poll() {
61                 Ok(Async::Ready(e)) => return Ok(Async::Ready(e)),
62                 Ok(Async::NotReady) => return Ok(Async::NotReady),
63                 Err(e) => e,
64             };
65             self.future = Some((self.f)(item).into_future());
66         }
67         assert!(self.future.is_some());
68         match self.future.as_mut().unwrap().poll() {
69             Ok(Async::Ready(e)) => {
70                 self.future = None;
71                 Ok(Async::Ready(Some(e)))
72             }
73             Err(e) => {
74                 self.future = None;
75                 Err(e)
76             }
77             Ok(Async::NotReady) => Ok(Async::NotReady)
78         }
79     }
80 }
81