1 use BindClient;
2 use streaming::{Body, Message};
3 use super::{StreamingPipeline, Frame, Transport};
4 use super::advanced::{Pipeline, PipelineMessage};
5 use util::client_proxy::{self, ClientProxy, Receiver};
6 use futures::{Future, IntoFuture, Poll, Async, Stream};
7 use futures::sync::oneshot;
8 use tokio_core::reactor::Handle;
9 use std::collections::VecDeque;
10 use std::io;
11 
12 /// A streaming, pipelined client protocol.
13 ///
14 /// The `T` parameter is used for the I/O object used to communicate, which is
15 /// supplied in `bind_transport`.
16 ///
17 /// For simple protocols, the `Self` type is often a unit struct. In more
18 /// advanced cases, `Self` may contain configuration information that is used
19 /// for setting up the transport in `bind_transport`.
20 pub trait ClientProto<T: 'static>: 'static {
21     /// The type of request headers.
22     type Request: 'static;
23 
24     /// The type of request body chunks.
25     type RequestBody: 'static;
26 
27     /// The type of response headers.
28     type Response: 'static;
29 
30     /// The type of response body chunks.
31     type ResponseBody: 'static;
32 
33     /// The type of error frames.
34     type Error: From<io::Error> + 'static;
35 
36     /// The frame transport, which usually take `T` as a parameter.
37     type Transport:
38         Transport<Item = Frame<Self::Response, Self::ResponseBody, Self::Error>,
39                   SinkItem = Frame<Self::Request, Self::RequestBody, Self::Error>>;
40 
41     /// A future for initializing a transport from an I/O object.
42     ///
43     /// In simple cases, `Result<Self::Transport, Self::Error>` often suffices.
44     type BindTransport: IntoFuture<Item = Self::Transport, Error = io::Error>;
45 
46     /// Build a transport from the given I/O object, using `self` for any
47     /// configuration.
48     fn bind_transport(&self, io: T) -> Self::BindTransport;
49 }
50 
51 impl<P, T, B> BindClient<StreamingPipeline<B>, T> for P where
52     P: ClientProto<T>,
53     T: 'static,
54     B: Stream<Item = P::RequestBody, Error = P::Error> + 'static,
55 {
56     type ServiceRequest = Message<P::Request, B>;
57     type ServiceResponse = Message<P::Response, Body<P::ResponseBody, P::Error>>;
58     type ServiceError = P::Error;
59 
60     type BindClient = ClientProxy<Self::ServiceRequest, Self::ServiceResponse, Self::ServiceError>;
61 
62     fn bind_client(&self, handle: &Handle, io: T) -> Self::BindClient {
63         let (client, rx) = client_proxy::pair();
64 
65         let task = self.bind_transport(io).into_future().and_then(|transport| {
66             let dispatch: Dispatch<P, T, B> = Dispatch {
67                 transport: transport,
68                 requests: rx,
69                 in_flight: VecDeque::with_capacity(32),
70             };
71             Pipeline::new(dispatch)
72         }).map_err(|e| {
73             // TODO: where to punt this error to?
74             error!("pipeline error: {}", e);
75         });
76 
77         // Spawn the task
78         handle.spawn(task);
79 
80         // Return the client
81         client
82     }
83 }
84 
85 struct Dispatch<P, T, B> where
86     P: ClientProto<T> + BindClient<StreamingPipeline<B>, T>,
87     T: 'static,
88     B: Stream<Item = P::RequestBody, Error = P::Error> + 'static,
89 {
90     transport: P::Transport,
91     requests: Receiver<P::ServiceRequest, P::ServiceResponse, P::Error>,
92     in_flight: VecDeque<oneshot::Sender<Result<P::ServiceResponse, P::Error>>>,
93 }
94 
95 impl<P, T, B> super::advanced::Dispatch for Dispatch<P, T, B> where
96     P: ClientProto<T>,
97     B: Stream<Item = P::RequestBody, Error = P::Error>,
98 {
99     type Io = T;
100     type In = P::Request;
101     type BodyIn = P::RequestBody;
102     type Out = P::Response;
103     type BodyOut = P::ResponseBody;
104     type Error = P::Error;
105     type Stream = B;
106     type Transport = P::Transport;
107 
108     fn transport(&mut self) -> &mut Self::Transport {
109         &mut self.transport
110     }
111 
112     fn dispatch(&mut self,
113                 response: PipelineMessage<Self::Out, Body<Self::BodyOut, Self::Error>, Self::Error>)
114                 -> io::Result<()>
115     {
116         if let Some(complete) = self.in_flight.pop_front() {
117             drop(complete.send(response));
118         } else {
119             return Err(io::Error::new(io::ErrorKind::Other, "request / response mismatch"));
120         }
121 
122         Ok(())
123     }
124 
125     fn poll(&mut self) -> Poll<Option<PipelineMessage<Self::In, Self::Stream, Self::Error>>,
126                                io::Error>
127     {
128         trace!("Dispatch::poll");
129         // Try to get a new request frame
130         match self.requests.poll() {
131             Ok(Async::Ready(Some(Ok((request, complete))))) => {
132                 trace!("   --> received request");
133 
134                 // Track complete handle
135                 self.in_flight.push_back(complete);
136 
137                 Ok(Async::Ready(Some(Ok(request))))
138 
139             }
140             Ok(Async::Ready(None)) => {
141                 trace!("   --> client dropped");
142                 Ok(Async::Ready(None))
143             }
144             Ok(Async::Ready(Some(Err(e)))) => {
145                 trace!("   --> error");
146                 // An error on receive can only happen when the other half
147                 // disconnected. In this case, the client needs to be
148                 // shutdown
149                 panic!("unimplemented error handling: {:?}", e);
150             }
151             Ok(Async::NotReady) => {
152                 trace!("   --> not ready");
153                 Ok(Async::NotReady)
154             }
155             Err(()) => panic!(),
156         }
157     }
158 
159     fn has_in_flight(&self) -> bool {
160         !self.in_flight.is_empty()
161     }
162 }
163 
164 impl<P, T, B> Drop for Dispatch<P, T, B> where
165     P: ClientProto<T> + BindClient<StreamingPipeline<B>, T>,
166     T: 'static,
167     B: Stream<Item = P::RequestBody, Error = P::Error> + 'static,
168 {
169     fn drop(&mut self) {
170         // Complete any pending requests with an error
171         while let Some(complete) = self.in_flight.pop_front() {
172             drop(complete.send(Err(broken_pipe().into())));
173         }
174     }
175 }
176 
177 fn broken_pipe() -> io::Error {
178     io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe")
179 }
180