1 use {Poll, Async};
2 use stream::Stream;
3
4 /// A combinator used to flatten a stream-of-streams into one long stream of
5 /// elements.
6 ///
7 /// This combinator is created by the `Stream::flatten` method.
8 #[derive(Debug)]
9 #[must_use = "streams do nothing unless polled"]
10 pub struct Flatten<S>
11 where S: Stream,
12 {
13 stream: S,
14 next: Option<S::Item>,
15 }
16
new<S>(s: S) -> Flatten<S> where S: Stream, S::Item: Stream, <S::Item as Stream>::Error: From<S::Error>,17 pub fn new<S>(s: S) -> Flatten<S>
18 where S: Stream,
19 S::Item: Stream,
20 <S::Item as Stream>::Error: From<S::Error>,
21 {
22 Flatten {
23 stream: s,
24 next: None,
25 }
26 }
27
28 impl<S: Stream> Flatten<S> {
29 /// Acquires a reference to the underlying stream that this combinator is
30 /// pulling from.
get_ref(&self) -> &S31 pub fn get_ref(&self) -> &S {
32 &self.stream
33 }
34
35 /// Acquires a mutable reference to the underlying stream that this
36 /// combinator is pulling from.
37 ///
38 /// Note that care must be taken to avoid tampering with the state of the
39 /// stream which may otherwise confuse this combinator.
get_mut(&mut self) -> &mut S40 pub fn get_mut(&mut self) -> &mut S {
41 &mut self.stream
42 }
43
44 /// Consumes this combinator, returning the underlying stream.
45 ///
46 /// Note that this may discard intermediate state of this combinator, so
47 /// care should be taken to avoid losing resources when this is called.
into_inner(self) -> S48 pub fn into_inner(self) -> S {
49 self.stream
50 }
51 }
52
53 // Forwarding impl of Sink from the underlying stream
54 impl<S> ::sink::Sink for Flatten<S>
55 where S: ::sink::Sink + Stream
56 {
57 type SinkItem = S::SinkItem;
58 type SinkError = S::SinkError;
59
start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError>60 fn start_send(&mut self, item: S::SinkItem) -> ::StartSend<S::SinkItem, S::SinkError> {
61 self.stream.start_send(item)
62 }
63
poll_complete(&mut self) -> Poll<(), S::SinkError>64 fn poll_complete(&mut self) -> Poll<(), S::SinkError> {
65 self.stream.poll_complete()
66 }
67
close(&mut self) -> Poll<(), S::SinkError>68 fn close(&mut self) -> Poll<(), S::SinkError> {
69 self.stream.close()
70 }
71 }
72
73 impl<S> Stream for Flatten<S>
74 where S: Stream,
75 S::Item: Stream,
76 <S::Item as Stream>::Error: From<S::Error>,
77 {
78 type Item = <S::Item as Stream>::Item;
79 type Error = <S::Item as Stream>::Error;
80
poll(&mut self) -> Poll<Option<Self::Item>, Self::Error>81 fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
82 loop {
83 if self.next.is_none() {
84 match try_ready!(self.stream.poll()) {
85 Some(e) => self.next = Some(e),
86 None => return Ok(Async::Ready(None)),
87 }
88 }
89 assert!(self.next.is_some());
90 match self.next.as_mut().unwrap().poll() {
91 Ok(Async::Ready(None)) => self.next = None,
92 other => return other,
93 }
94 }
95 }
96 }
97