1 use BindServer; 2 use futures::stream::Stream; 3 use futures::{Future, IntoFuture, Poll, Async}; 4 use std::collections::VecDeque; 5 use std::io; 6 use streaming::{Message, Body}; 7 use super::advanced::{Pipeline, PipelineMessage}; 8 use super::{Frame, Transport}; 9 use tokio_core::reactor::Handle; 10 use tokio_service::Service; 11 12 // TODO: 13 // 14 // - Wait for service readiness 15 // - Handle request body stream cancellation 16 17 /// A streaming, pipelined server protocol. 18 /// 19 /// The `T` parameter is used for the I/O object used to communicate, which is 20 /// supplied in `bind_transport`. 21 /// 22 /// For simple protocols, the `Self` type is often a unit struct. In more 23 /// advanced cases, `Self` may contain configuration information that is used 24 /// for setting up the transport in `bind_transport`. 25 pub trait ServerProto<T: 'static>: 'static { 26 /// Request headers. 27 type Request: 'static; 28 29 /// Request body chunks. 30 type RequestBody: 'static; 31 32 /// Response headers. 33 type Response: 'static; 34 35 /// Response body chunks. 36 type ResponseBody: 'static; 37 38 /// Errors, which are used both for error frames and for the service itself. 39 type Error: From<io::Error> + 'static; 40 41 /// The frame transport, which usually take `T` as a parameter. 42 type Transport: 43 Transport<Item = Frame<Self::Request, Self::RequestBody, Self::Error>, 44 SinkItem = Frame<Self::Response, Self::ResponseBody, Self::Error>>; 45 46 /// A future for initializing a transport from an I/O object. 47 /// 48 /// In simple cases, `Result<Self::Transport, Self::Error>` often suffices. 49 type BindTransport: IntoFuture<Item = Self::Transport, Error = io::Error>; 50 51 /// Build a transport from the given I/O object, using `self` for any 52 /// configuration. bind_transport(&self, io: T) -> Self::BindTransport53 fn bind_transport(&self, io: T) -> Self::BindTransport; 54 } 55 56 impl<P, T, B> BindServer<super::StreamingPipeline<B>, T> for P where 57 P: ServerProto<T>, 58 T: 'static, 59 B: Stream<Item = P::ResponseBody, Error = P::Error>, 60 { 61 type ServiceRequest = Message<P::Request, Body<P::RequestBody, P::Error>>; 62 type ServiceResponse = Message<P::Response, B>; 63 type ServiceError = P::Error; 64 bind_server<S>(&self, handle: &Handle, io: T, service: S) where S: Service<Request = Self::ServiceRequest, Response = Self::ServiceResponse, Error = Self::ServiceError> + 'static65 fn bind_server<S>(&self, handle: &Handle, io: T, service: S) 66 where S: Service<Request = Self::ServiceRequest, 67 Response = Self::ServiceResponse, 68 Error = Self::ServiceError> + 'static 69 { 70 let task = self.bind_transport(io).into_future().and_then(|transport| { 71 let dispatch: Dispatch<S, T, P> = Dispatch { 72 service: service, 73 transport: transport, 74 in_flight: VecDeque::with_capacity(32), 75 }; 76 Pipeline::new(dispatch) 77 }); 78 79 // Spawn the pipeline dispatcher 80 handle.spawn(task.map_err(|_| ())) 81 } 82 } 83 84 struct Dispatch<S, T, P> where 85 T: 'static, P: ServerProto<T>, S: Service 86 { 87 // The service handling the connection 88 service: S, 89 transport: P::Transport, 90 in_flight: VecDeque<InFlight<S::Future>>, 91 } 92 93 enum InFlight<F: Future> { 94 Active(F), 95 Done(Result<F::Item, F::Error>), 96 } 97 98 impl<P, T, B, S> super::advanced::Dispatch for Dispatch<S, T, P> where 99 P: ServerProto<T>, 100 T: 'static, 101 B: Stream<Item = P::ResponseBody, Error = P::Error>, 102 S: Service<Request = Message<P::Request, Body<P::RequestBody, P::Error>>, 103 Response = Message<P::Response, B>, 104 Error = P::Error>, 105 { 106 type Io = T; 107 type In = P::Response; 108 type BodyIn = P::ResponseBody; 109 type Out = P::Request; 110 type BodyOut = P::RequestBody; 111 type Error = P::Error; 112 type Stream = B; 113 type Transport = P::Transport; 114 transport(&mut self) -> &mut P::Transport115 fn transport(&mut self) -> &mut P::Transport { 116 &mut self.transport 117 } 118 dispatch(&mut self, request: PipelineMessage<Self::Out, Body<Self::BodyOut, Self::Error>, Self::Error>) -> io::Result<()>119 fn dispatch(&mut self, 120 request: PipelineMessage<Self::Out, Body<Self::BodyOut, Self::Error>, Self::Error>) 121 -> io::Result<()> 122 { 123 if let Ok(request) = request { 124 let response = self.service.call(request); 125 self.in_flight.push_back(InFlight::Active(response)); 126 } 127 128 // TODO: Should the error be handled differently? 129 130 Ok(()) 131 } 132 poll(&mut self) -> Poll<Option<PipelineMessage<Self::In, Self::Stream, Self::Error>>, io::Error>133 fn poll(&mut self) -> Poll<Option<PipelineMessage<Self::In, Self::Stream, Self::Error>>, io::Error> { 134 for slot in self.in_flight.iter_mut() { 135 slot.poll(); 136 } 137 138 match self.in_flight.front() { 139 Some(&InFlight::Done(_)) => {} 140 _ => return Ok(Async::NotReady) 141 } 142 143 match self.in_flight.pop_front() { 144 Some(InFlight::Done(res)) => Ok(Async::Ready(Some(res))), 145 _ => panic!(), 146 } 147 } 148 has_in_flight(&self) -> bool149 fn has_in_flight(&self) -> bool { 150 !self.in_flight.is_empty() 151 } 152 } 153 154 impl<F: Future> InFlight<F> { poll(&mut self)155 fn poll(&mut self) { 156 let res = match *self { 157 InFlight::Active(ref mut f) => { 158 match f.poll() { 159 Ok(Async::Ready(e)) => Ok(e), 160 Err(e) => Err(e), 161 Ok(Async::NotReady) => return, 162 } 163 } 164 _ => return, 165 }; 166 *self = InFlight::Done(res); 167 } 168 } 169