#![cfg(feature = "with-deprecated")] #![deprecated(since = "0.1.4", note = "use sync::mpsc::channel instead")] #![allow(deprecated)] use std::any::Any; use std::error::Error; use std::fmt; use {Poll, Async, Stream, Future, Sink}; use sink::Send; use sync::mpsc; /// Creates an in-memory channel implementation of the `Stream` trait. /// /// This method creates a concrete implementation of the `Stream` trait which /// can be used to send values across threads in a streaming fashion. This /// channel is unique in that it implements back pressure to ensure that the /// sender never outpaces the receiver. The `Sender::send` method will only /// allow sending one message and the next message can only be sent once the /// first was consumed. /// /// The `Receiver` returned implements the `Stream` trait and has access to any /// number of the associated combinators for transforming the result. pub fn channel() -> (Sender, Receiver) { let (tx, rx) = mpsc::channel(0); (Sender { inner: tx }, Receiver { inner: rx }) } /// The transmission end of a channel which is used to send values. /// /// This is created by the `channel` method in the `stream` module. #[derive(Debug)] pub struct Sender { inner: mpsc::Sender>, } /// The receiving end of a channel which implements the `Stream` trait. /// /// This is a concrete implementation of a stream which can be used to represent /// a stream of values being computed elsewhere. This is created by the /// `channel` method in the `stream` module. #[must_use = "streams do nothing unless polled"] #[derive(Debug)] pub struct Receiver { inner: mpsc::Receiver>, } /// Error type for sending, used when the receiving end of the channel is dropped pub struct SendError(Result); /// Future returned by `Sender::send`. #[derive(Debug)] pub struct FutureSender { inner: Send>>, } impl fmt::Debug for SendError { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { fmt.debug_tuple("SendError") .field(&"...") .finish() } } impl fmt::Display for SendError { fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { write!(fmt, "send failed because receiver is gone") } } impl Error for SendError where T: Any, E: Any { fn description(&self) -> &str { "send failed because receiver is gone" } } impl Stream for Receiver { type Item = T; type Error = E; fn poll(&mut self) -> Poll, E> { match self.inner.poll().expect("cannot fail") { Async::Ready(Some(Ok(e))) => Ok(Async::Ready(Some(e))), Async::Ready(Some(Err(e))) => Err(e), Async::Ready(None) => Ok(Async::Ready(None)), Async::NotReady => Ok(Async::NotReady), } } } impl Sender { /// Sends a new value along this channel to the receiver. /// /// This method consumes the sender and returns a future which will resolve /// to the sender again when the value sent has been consumed. pub fn send(self, t: Result) -> FutureSender { FutureSender { inner: self.inner.send(t) } } } impl Future for FutureSender { type Item = Sender; type Error = SendError; fn poll(&mut self) -> Poll { match self.inner.poll() { Ok(a) => Ok(a.map(|a| Sender { inner: a })), Err(e) => Err(SendError(e.into_inner())), } } }