1 use {Poll, Async};
2 use stream::Stream;
3 
4 /// A combinator used to flatten a stream-of-streams into one long stream of
5 /// elements.
6 ///
7 /// This combinator is created by the `Stream::flatten` method.
8 #[derive(Debug)]
9 #[must_use = "streams do nothing unless polled"]
10 pub struct Flatten<S>
11     where S: Stream,
12 {
13     stream: S,
14     next: Option<S::Item>,
15 }
16 
new<S>(s: S) -> Flatten<S> where S: Stream, S::Item: Stream, <S::Item as Stream>::Error: From<S::Error>,17 pub fn new<S>(s: S) -> Flatten<S>
18     where S: Stream,
19           S::Item: Stream,
20           <S::Item as Stream>::Error: From<S::Error>,
21 {
22     Flatten {
23         stream: s,
24         next: None,
25     }
26 }
27 
28 impl<S: Stream> Flatten<S> {
29     /// Acquires a reference to the underlying stream that this combinator is
30     /// pulling from.
get_ref(&self) -> &S31     pub fn get_ref(&self) -> &S {
32         &self.stream
33     }
34 
35     /// Acquires a mutable reference to the underlying stream that this
36     /// combinator is pulling from.
37     ///
38     /// Note that care must be taken to avoid tampering with the state of the
39     /// stream which may otherwise confuse this combinator.
get_mut(&mut self) -> &mut S40     pub fn get_mut(&mut self) -> &mut S {
41         &mut self.stream
42     }
43 
44     /// Consumes this combinator, returning the underlying stream.
45     ///
46     /// Note that this may discard intermediate state of this combinator, so
47     /// care should be taken to avoid losing resources when this is called.
into_inner(self) -> S48     pub fn into_inner(self) -> S {
49         self.stream
50     }
51 }
52 
53 // Forwarding impl of Sink from the underlying stream
54 impl<S> ::sink::Sink for Flatten<S>
55     where S: ::sink::Sink + Stream
56 {
57     type SinkItem = S::SinkItem;
58     type SinkError = S::SinkError;
59 
start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError>60     fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
61         self.stream.start_send(item)
62     }
63 
poll_complete(&mut self) -> Poll<(), S::SinkError>64     fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
65         self.stream.poll_complete()
66     }
67 
close(&mut self) -> Poll<(), S::SinkError>68     fn close(&mut self) -> Poll<(), S::SinkError> {
69         self.stream.close()
70     }
71 }
72 
73 impl<S> Stream for Flatten<S>
74     where S: Stream,
75           S::Item: Stream,
76           <S::Item as Stream>::Error: From<S::Error>,
77 {
78     type Item = <S::Item as Stream>::Item;
79     type Error = <S::Item as Stream>::Error;
80 
poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>81     fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
82         loop {
83             if self.next.is_none() {
84                 match try_ready!(self.stream.poll()) {
85                     Some(e) => self.next = Some(e),
86                     None => return Ok(Async::Ready(None)),
87                 }
88             }
89             assert!(self.next.is_some());
90             match self.next.as_mut().unwrap().poll() {
91                 Ok(Async::Ready(None)) => self.next = None,
92                 other => return other,
93             }
94         }
95     }
96 }
97