1 // This is a derived version of client_proxy.rs from
2 // tokio_proto crate used under MIT license.
3 //
4 // Original version of client_proxy.rs:
5 // https://github.com/tokio-rs/tokio-proto/commit/8fb8e482dcd55cf02ceee165f8e08eee799c96d3
6 //
7 // The following modifications were made:
8 // * Remove `Service` trait since audioipc doesn't use `tokio_service`
9 //   crate.
10 // * Remove `RefCell` from `ClientProxy` since cubeb is called from
11 //   multiple threads. `mpsc::UnboundedSender` is thread safe.
12 // * Simplify the interface to just request (`R`) and response (`Q`)
13 //   removing error (`E`).
14 // * Remove the `Envelope` type.
15 // * Renamed `pair` to `channel` to represent that an `rpc::channel`
16 //   is being created.
17 //
18 // Original License:
19 //
20 // Copyright (c) 2016 Tokio contributors
21 //
22 // Permission is hereby granted, free of charge, to any
23 // person obtaining a copy of this software and associated
24 // documentation files (the "Software"), to deal in the
25 // Software without restriction, including without
26 // limitation the rights to use, copy, modify, merge,
27 // publish, distribute, sublicense, and/or sell copies of
28 // the Software, and to permit persons to whom the Software
29 // is furnished to do so, subject to the following
30 // conditions:
31 //
32 // The above copyright notice and this permission notice
33 // shall be included in all copies or substantial portions
34 // of the Software.
35 //
36 // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF
37 // ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED
38 // TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A
39 // PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT
40 // SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY
41 // CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
42 // OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR
43 // IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
44 // DEALINGS IN THE SOFTWARE.
45 
46 use futures::sync::{mpsc, oneshot};
47 use futures::{Async, Future, Poll};
48 use std::fmt;
49 use std::io;
50 
51 /// Message used to dispatch requests to the task managing the
52 /// client connection.
53 pub type Request<R, Q> = (R, oneshot::Sender<Q>);
54 
55 /// Receive requests submitted to the client
56 pub type Receiver<R, Q> = mpsc::UnboundedReceiver<Request<R, Q>>;
57 
58 /// Response future returned from a client
59 pub struct Response<Q> {
60     inner: oneshot::Receiver<Q>,
61 }
62 
63 pub struct ClientProxy<R, Q> {
64     tx: mpsc::UnboundedSender<Request<R, Q>>,
65 }
66 
67 impl<R, Q> Clone for ClientProxy<R, Q> {
clone(&self) -> Self68     fn clone(&self) -> Self {
69         ClientProxy {
70             tx: self.tx.clone(),
71         }
72     }
73 }
74 
channel<R, Q>() -> (ClientProxy<R, Q>, Receiver<R, Q>)75 pub fn channel<R, Q>() -> (ClientProxy<R, Q>, Receiver<R, Q>) {
76     // Create a channel to send the requests to client-side of rpc.
77     let (tx, rx) = mpsc::unbounded();
78 
79     // Wrap the `tx` part in ClientProxy so the rpc call interface
80     // can be implemented.
81     let client = ClientProxy { tx };
82 
83     (client, rx)
84 }
85 
86 impl<R, Q> ClientProxy<R, Q> {
call(&self, request: R) -> Response<Q>87     pub fn call(&self, request: R) -> Response<Q> {
88         // The response to returned from the rpc client task over a
89         // oneshot channel.
90         let (tx, rx) = oneshot::channel();
91 
92         // If send returns an Err, its because the other side has been dropped.
93         // By ignoring it, we are just dropping the `tx`, which will mean the
94         // rx will return Canceled when polled. In turn, that is translated
95         // into a BrokenPipe, which conveys the proper error.
96         let _ = self.tx.unbounded_send((request, tx));
97 
98         Response { inner: rx }
99     }
100 }
101 
102 impl<R, Q> fmt::Debug for ClientProxy<R, Q>
103 where
104     R: fmt::Debug,
105     Q: fmt::Debug,
106 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result107     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
108         write!(f, "ClientProxy {{ ... }}")
109     }
110 }
111 
112 impl<Q> Future for Response<Q> {
113     type Item = Q;
114     type Error = io::Error;
115 
poll(&mut self) -> Poll<Q, io::Error>116     fn poll(&mut self) -> Poll<Q, io::Error> {
117         match self.inner.poll() {
118             Ok(Async::Ready(res)) => Ok(Async::Ready(res)),
119             Ok(Async::NotReady) => Ok(Async::NotReady),
120             // Convert oneshot::Canceled into io::Error
121             Err(_) => {
122                 let e = io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe");
123                 Err(e)
124             }
125         }
126     }
127 }
128 
129 impl<Q> fmt::Debug for Response<Q>
130 where
131     Q: fmt::Debug,
132 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result133     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
134         write!(f, "Response {{ ... }}")
135     }
136 }
137