1 #![cfg(feature = "with-deprecated")]
2 #![deprecated(since = "0.1.4", note = "use sync::mpsc::channel instead")]
3 #![allow(deprecated)]
4 
5 use std::any::Any;
6 use std::error::Error;
7 use std::fmt;
8 
9 use {Poll, Async, Stream, Future, Sink};
10 use sink::Send;
11 use sync::mpsc;
12 
13 /// Creates an in-memory channel implementation of the `Stream` trait.
14 ///
15 /// This method creates a concrete implementation of the `Stream` trait which
16 /// can be used to send values across threads in a streaming fashion. This
17 /// channel is unique in that it implements back pressure to ensure that the
18 /// sender never outpaces the receiver. The `Sender::send` method will only
19 /// allow sending one message and the next message can only be sent once the
20 /// first was consumed.
21 ///
22 /// The `Receiver` returned implements the `Stream` trait and has access to any
23 /// number of the associated combinators for transforming the result.
channel<T, E>() -> (Sender<T, E>, Receiver<T, E>)24 pub fn channel<T, E>() -> (Sender<T, E>, Receiver<T, E>) {
25     let (tx, rx) = mpsc::channel(0);
26     (Sender { inner: tx }, Receiver { inner: rx })
27 }
28 
29 /// The transmission end of a channel which is used to send values.
30 ///
31 /// This is created by the `channel` method in the `stream` module.
32 #[derive(Debug)]
33 pub struct Sender<T, E> {
34     inner: mpsc::Sender<Result<T, E>>,
35 }
36 
37 /// The receiving end of a channel which implements the `Stream` trait.
38 ///
39 /// This is a concrete implementation of a stream which can be used to represent
40 /// a stream of values being computed elsewhere. This is created by the
41 /// `channel` method in the `stream` module.
42 #[must_use = "streams do nothing unless polled"]
43 #[derive(Debug)]
44 pub struct Receiver<T, E> {
45     inner: mpsc::Receiver<Result<T, E>>,
46 }
47 
48 /// Error type for sending, used when the receiving end of the channel is dropped
49 pub struct SendError<T, E>(Result<T, E>);
50 
51 /// Future returned by `Sender::send`.
52 #[derive(Debug)]
53 pub struct FutureSender<T, E> {
54     inner: Send<mpsc::Sender<Result<T, E>>>,
55 }
56 
57 impl<T, E> fmt::Debug for SendError<T, E> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result58     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
59         fmt.debug_tuple("SendError")
60             .field(&"...")
61             .finish()
62     }
63 }
64 
65 impl<T, E> fmt::Display for SendError<T, E> {
fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result66     fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
67         write!(fmt, "send failed because receiver is gone")
68     }
69 }
70 
71 impl<T, E> Error for SendError<T, E>
72     where T: Any, E: Any
73 {
description(&self) -> &str74     fn description(&self) -> &str {
75         "send failed because receiver is gone"
76     }
77 }
78 
79 
80 impl<T, E> Stream for Receiver<T, E> {
81     type Item = T;
82     type Error = E;
83 
poll(&mut self) -> Poll<Option<T>, E>84     fn poll(&mut self) -> Poll<Option<T>, E> {
85         match self.inner.poll().expect("cannot fail") {
86             Async::Ready(Some(Ok(e))) => Ok(Async::Ready(Some(e))),
87             Async::Ready(Some(Err(e))) => Err(e),
88             Async::Ready(None) => Ok(Async::Ready(None)),
89             Async::NotReady => Ok(Async::NotReady),
90         }
91     }
92 }
93 
94 impl<T, E> Sender<T, E> {
95     /// Sends a new value along this channel to the receiver.
96     ///
97     /// This method consumes the sender and returns a future which will resolve
98     /// to the sender again when the value sent has been consumed.
send(self, t: Result<T, E>) -> FutureSender<T, E>99     pub fn send(self, t: Result<T, E>) -> FutureSender<T, E> {
100         FutureSender { inner: self.inner.send(t) }
101     }
102 }
103 
104 impl<T, E> Future for FutureSender<T, E> {
105     type Item = Sender<T, E>;
106     type Error = SendError<T, E>;
107 
poll(&mut self) -> Poll<Self::Item, Self::Error>108     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
109         match self.inner.poll() {
110             Ok(a) => Ok(a.map(|a| Sender { inner: a })),
111             Err(e) => Err(SendError(e.into_inner())),
112         }
113     }
114 }
115