1 use BindClient; 2 use streaming::{Body, Message}; 3 use super::{StreamingPipeline, Frame, Transport}; 4 use super::advanced::{Pipeline, PipelineMessage}; 5 use util::client_proxy::{self, ClientProxy, Receiver}; 6 use futures::{Future, IntoFuture, Poll, Async, Stream}; 7 use futures::sync::oneshot; 8 use tokio_core::reactor::Handle; 9 use std::collections::VecDeque; 10 use std::io; 11 12 /// A streaming, pipelined client protocol. 13 /// 14 /// The `T` parameter is used for the I/O object used to communicate, which is 15 /// supplied in `bind_transport`. 16 /// 17 /// For simple protocols, the `Self` type is often a unit struct. In more 18 /// advanced cases, `Self` may contain configuration information that is used 19 /// for setting up the transport in `bind_transport`. 20 pub trait ClientProto<T: 'static>: 'static { 21 /// The type of request headers. 22 type Request: 'static; 23 24 /// The type of request body chunks. 25 type RequestBody: 'static; 26 27 /// The type of response headers. 28 type Response: 'static; 29 30 /// The type of response body chunks. 31 type ResponseBody: 'static; 32 33 /// The type of error frames. 34 type Error: From<io::Error> + 'static; 35 36 /// The frame transport, which usually take `T` as a parameter. 37 type Transport: 38 Transport<Item = Frame<Self::Response, Self::ResponseBody, Self::Error>, 39 SinkItem = Frame<Self::Request, Self::RequestBody, Self::Error>>; 40 41 /// A future for initializing a transport from an I/O object. 42 /// 43 /// In simple cases, `Result<Self::Transport, Self::Error>` often suffices. 44 type BindTransport: IntoFuture<Item = Self::Transport, Error = io::Error>; 45 46 /// Build a transport from the given I/O object, using `self` for any 47 /// configuration. 48 fn bind_transport(&self, io: T) -> Self::BindTransport; 49 } 50 51 impl<P, T, B> BindClient<StreamingPipeline<B>, T> for P where 52 P: ClientProto<T>, 53 T: 'static, 54 B: Stream<Item = P::RequestBody, Error = P::Error> + 'static, 55 { 56 type ServiceRequest = Message<P::Request, B>; 57 type ServiceResponse = Message<P::Response, Body<P::ResponseBody, P::Error>>; 58 type ServiceError = P::Error; 59 60 type BindClient = ClientProxy<Self::ServiceRequest, Self::ServiceResponse, Self::ServiceError>; 61 62 fn bind_client(&self, handle: &Handle, io: T) -> Self::BindClient { 63 let (client, rx) = client_proxy::pair(); 64 65 let task = self.bind_transport(io).into_future().and_then(|transport| { 66 let dispatch: Dispatch<P, T, B> = Dispatch { 67 transport: transport, 68 requests: rx, 69 in_flight: VecDeque::with_capacity(32), 70 }; 71 Pipeline::new(dispatch) 72 }).map_err(|e| { 73 // TODO: where to punt this error to? 74 error!("pipeline error: {}", e); 75 }); 76 77 // Spawn the task 78 handle.spawn(task); 79 80 // Return the client 81 client 82 } 83 } 84 85 struct Dispatch<P, T, B> where 86 P: ClientProto<T> + BindClient<StreamingPipeline<B>, T>, 87 T: 'static, 88 B: Stream<Item = P::RequestBody, Error = P::Error> + 'static, 89 { 90 transport: P::Transport, 91 requests: Receiver<P::ServiceRequest, P::ServiceResponse, P::Error>, 92 in_flight: VecDeque<oneshot::Sender<Result<P::ServiceResponse, P::Error>>>, 93 } 94 95 impl<P, T, B> super::advanced::Dispatch for Dispatch<P, T, B> where 96 P: ClientProto<T>, 97 B: Stream<Item = P::RequestBody, Error = P::Error>, 98 { 99 type Io = T; 100 type In = P::Request; 101 type BodyIn = P::RequestBody; 102 type Out = P::Response; 103 type BodyOut = P::ResponseBody; 104 type Error = P::Error; 105 type Stream = B; 106 type Transport = P::Transport; 107 108 fn transport(&mut self) -> &mut Self::Transport { 109 &mut self.transport 110 } 111 112 fn dispatch(&mut self, 113 response: PipelineMessage<Self::Out, Body<Self::BodyOut, Self::Error>, Self::Error>) 114 -> io::Result<()> 115 { 116 if let Some(complete) = self.in_flight.pop_front() { 117 drop(complete.send(response)); 118 } else { 119 return Err(io::Error::new(io::ErrorKind::Other, "request / response mismatch")); 120 } 121 122 Ok(()) 123 } 124 125 fn poll(&mut self) -> Poll<Option<PipelineMessage<Self::In, Self::Stream, Self::Error>>, 126 io::Error> 127 { 128 trace!("Dispatch::poll"); 129 // Try to get a new request frame 130 match self.requests.poll() { 131 Ok(Async::Ready(Some(Ok((request, complete))))) => { 132 trace!(" --> received request"); 133 134 // Track complete handle 135 self.in_flight.push_back(complete); 136 137 Ok(Async::Ready(Some(Ok(request)))) 138 139 } 140 Ok(Async::Ready(None)) => { 141 trace!(" --> client dropped"); 142 Ok(Async::Ready(None)) 143 } 144 Ok(Async::Ready(Some(Err(e)))) => { 145 trace!(" --> error"); 146 // An error on receive can only happen when the other half 147 // disconnected. In this case, the client needs to be 148 // shutdown 149 panic!("unimplemented error handling: {:?}", e); 150 } 151 Ok(Async::NotReady) => { 152 trace!(" --> not ready"); 153 Ok(Async::NotReady) 154 } 155 Err(()) => panic!(), 156 } 157 } 158 159 fn has_in_flight(&self) -> bool { 160 !self.in_flight.is_empty() 161 } 162 } 163 164 impl<P, T, B> Drop for Dispatch<P, T, B> where 165 P: ClientProto<T> + BindClient<StreamingPipeline<B>, T>, 166 T: 'static, 167 B: Stream<Item = P::RequestBody, Error = P::Error> + 'static, 168 { 169 fn drop(&mut self) { 170 // Complete any pending requests with an error 171 while let Some(complete) = self.in_flight.pop_front() { 172 drop(complete.send(Err(broken_pipe().into()))); 173 } 174 } 175 } 176 177 fn broken_pipe() -> io::Error { 178 io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe") 179 } 180