use {Poll, Async}; use stream::Stream; /// A combinator used to flatten a stream-of-streams into one long stream of /// elements. /// /// This combinator is created by the `Stream::flatten` method. #[derive(Debug)] #[must_use = "streams do nothing unless polled"] pub struct Flatten where S: Stream, { stream: S, next: Option, } pub fn new(s: S) -> Flatten where S: Stream, S::Item: Stream, ::Error: From, { Flatten { stream: s, next: None, } } impl Flatten { /// Acquires a reference to the underlying stream that this combinator is /// pulling from. pub fn get_ref(&self) -> &S { &self.stream } /// Acquires a mutable reference to the underlying stream that this /// combinator is pulling from. /// /// Note that care must be taken to avoid tampering with the state of the /// stream which may otherwise confuse this combinator. pub fn get_mut(&mut self) -> &mut S { &mut self.stream } /// Consumes this combinator, returning the underlying stream. /// /// Note that this may discard intermediate state of this combinator, so /// care should be taken to avoid losing resources when this is called. pub fn into_inner(self) -> S { self.stream } } // Forwarding impl of Sink from the underlying stream impl ::sink::Sink for Flatten where S: ::sink::Sink + Stream { type SinkItem = S::SinkItem; type SinkError = S::SinkError; fn start_send(&mut self, item: S::SinkItem) -> ::StartSend { self.stream.start_send(item) } fn poll_complete(&mut self) -> Poll<(), S::SinkError> { self.stream.poll_complete() } fn close(&mut self) -> Poll<(), S::SinkError> { self.stream.close() } } impl Stream for Flatten where S: Stream, S::Item: Stream, ::Error: From, { type Item = ::Item; type Error = ::Error; fn poll(&mut self) -> Poll, Self::Error> { loop { if self.next.is_none() { match try_ready!(self.stream.poll()) { Some(e) => self.next = Some(e), None => return Ok(Async::Ready(None)), } } assert!(self.next.is_some()); match self.next.as_mut().unwrap().poll() { Ok(Async::Ready(None)) => self.next = None, other => return other, } } } }