1 use {IntoFuture, Future, Poll, Async};
2 use stream::Stream;
3
4 /// A stream combinator which chains a computation onto errors produced by a
5 /// stream.
6 ///
7 /// This structure is produced by the `Stream::or_else` method.
8 #[derive(Debug)]
9 #[must_use = "streams do nothing unless polled"]
10 pub struct OrElse<S, F, U>
11 where U: IntoFuture,
12 {
13 stream: S,
14 future: Option<U::Future>,
15 f: F,
16 }
17
new<S, F, U>(s: S, f: F) -> OrElse<S, F, U> where S: Stream, F: FnMut(S::Error) -> U, U: IntoFuture<Item=S::Item>,18 pub fn new<S, F, U>(s: S, f: F) -> OrElse<S, F, U>
19 where S: Stream,
20 F: FnMut(S::Error) -> U,
21 U: IntoFuture<Item=S::Item>,
22 {
23 OrElse {
24 stream: s,
25 future: None,
26 f: f,
27 }
28 }
29
30 // Forwarding impl of Sink from the underlying stream
31 impl<S, F, U> ::sink::Sink for OrElse<S, F, U>
32 where S: ::sink::Sink, U: IntoFuture
33 {
34 type SinkItem = S::SinkItem;
35 type SinkError = S::SinkError;
36
start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError>37 fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
38 self.stream.start_send(item)
39 }
40
poll_complete(&mut self) -> Poll<(), S::SinkError>41 fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
42 self.stream.poll_complete()
43 }
44
close(&mut self) -> Poll<(), S::SinkError>45 fn close(&mut self) -> Poll<(), S::SinkError> {
46 self.stream.close()
47 }
48 }
49
50 impl<S, F, U> Stream for OrElse<S, F, U>
51 where S: Stream,
52 F: FnMut(S::Error) -> U,
53 U: IntoFuture<Item=S::Item>,
54 {
55 type Item = S::Item;
56 type Error = U::Error;
57
poll(&mut self) -> Poll<Option<S::Item>, U::Error>58 fn poll(&mut self) -> Poll<Option<S::Item>, U::Error> {
59 if self.future.is_none() {
60 let item = match self.stream.poll() {
61 Ok(Async::Ready(e)) => return Ok(Async::Ready(e)),
62 Ok(Async::NotReady) => return Ok(Async::NotReady),
63 Err(e) => e,
64 };
65 self.future = Some((self.f)(item).into_future());
66 }
67 assert!(self.future.is_some());
68 match self.future.as_mut().unwrap().poll() {
69 Ok(Async::Ready(e)) => {
70 self.future = None;
71 Ok(Async::Ready(Some(e)))
72 }
73 Err(e) => {
74 self.future = None;
75 Err(e)
76 }
77 Ok(Async::NotReady) => Ok(Async::NotReady)
78 }
79 }
80 }
81