1 use BindServer;
2 use futures::stream::Stream;
3 use futures::{Future, IntoFuture, Poll, Async};
4 use std::collections::VecDeque;
5 use std::io;
6 use streaming::{Message, Body};
7 use super::advanced::{Pipeline, PipelineMessage};
8 use super::{Frame, Transport};
9 use tokio_core::reactor::Handle;
10 use tokio_service::Service;
11 
12 // TODO:
13 //
14 // - Wait for service readiness
15 // - Handle request body stream cancellation
16 
17 /// A streaming, pipelined server protocol.
18 ///
19 /// The `T` parameter is used for the I/O object used to communicate, which is
20 /// supplied in `bind_transport`.
21 ///
22 /// For simple protocols, the `Self` type is often a unit struct. In more
23 /// advanced cases, `Self` may contain configuration information that is used
24 /// for setting up the transport in `bind_transport`.
25 pub trait ServerProto<T: 'static>: 'static {
26     /// Request headers.
27     type Request: 'static;
28 
29     /// Request body chunks.
30     type RequestBody: 'static;
31 
32     /// Response headers.
33     type Response: 'static;
34 
35     /// Response body chunks.
36     type ResponseBody: 'static;
37 
38     /// Errors, which are used both for error frames and for the service itself.
39     type Error: From<io::Error> + 'static;
40 
41     /// The frame transport, which usually take `T` as a parameter.
42     type Transport:
43         Transport<Item = Frame<Self::Request, Self::RequestBody, Self::Error>,
44                   SinkItem = Frame<Self::Response, Self::ResponseBody, Self::Error>>;
45 
46     /// A future for initializing a transport from an I/O object.
47     ///
48     /// In simple cases, `Result<Self::Transport, Self::Error>` often suffices.
49     type BindTransport: IntoFuture<Item = Self::Transport, Error = io::Error>;
50 
51     /// Build a transport from the given I/O object, using `self` for any
52     /// configuration.
bind_transport(&self, io: T) -> Self::BindTransport53     fn bind_transport(&self, io: T) -> Self::BindTransport;
54 }
55 
56 impl<P, T, B> BindServer<super::StreamingPipeline<B>, T> for P where
57     P: ServerProto<T>,
58     T: 'static,
59     B: Stream<Item = P::ResponseBody, Error = P::Error>,
60 {
61     type ServiceRequest = Message<P::Request, Body<P::RequestBody, P::Error>>;
62     type ServiceResponse = Message<P::Response, B>;
63     type ServiceError = P::Error;
64 
bind_server<S>(&self, handle: &Handle, io: T, service: S) where S: Service<Request = Self::ServiceRequest, Response = Self::ServiceResponse, Error = Self::ServiceError> + 'static65     fn bind_server<S>(&self, handle: &Handle, io: T, service: S)
66         where S: Service<Request = Self::ServiceRequest,
67                          Response = Self::ServiceResponse,
68                          Error = Self::ServiceError> + 'static
69     {
70         let task = self.bind_transport(io).into_future().and_then(|transport| {
71             let dispatch: Dispatch<S, T, P> = Dispatch {
72                 service: service,
73                 transport: transport,
74                 in_flight: VecDeque::with_capacity(32),
75             };
76             Pipeline::new(dispatch)
77         });
78 
79         // Spawn the pipeline dispatcher
80         handle.spawn(task.map_err(|_| ()))
81     }
82 }
83 
84 struct Dispatch<S, T, P> where
85     T: 'static, P: ServerProto<T>, S: Service
86 {
87     // The service handling the connection
88     service: S,
89     transport: P::Transport,
90     in_flight: VecDeque<InFlight<S::Future>>,
91 }
92 
93 enum InFlight<F: Future> {
94     Active(F),
95     Done(Result<F::Item, F::Error>),
96 }
97 
98 impl<P, T, B, S> super::advanced::Dispatch for Dispatch<S, T, P> where
99     P: ServerProto<T>,
100     T: 'static,
101     B: Stream<Item = P::ResponseBody, Error = P::Error>,
102     S: Service<Request = Message<P::Request, Body<P::RequestBody, P::Error>>,
103                Response = Message<P::Response, B>,
104                Error = P::Error>,
105 {
106     type Io = T;
107     type In = P::Response;
108     type BodyIn = P::ResponseBody;
109     type Out = P::Request;
110     type BodyOut = P::RequestBody;
111     type Error = P::Error;
112     type Stream = B;
113     type Transport = P::Transport;
114 
transport(&mut self) -> &mut P::Transport115     fn transport(&mut self) -> &mut P::Transport {
116         &mut self.transport
117     }
118 
dispatch(&mut self, request: PipelineMessage<Self::Out, Body<Self::BodyOut, Self::Error>, Self::Error>) -> io::Result<()>119     fn dispatch(&mut self,
120                 request: PipelineMessage<Self::Out, Body<Self::BodyOut, Self::Error>, Self::Error>)
121                 -> io::Result<()>
122     {
123         if let Ok(request) = request {
124             let response = self.service.call(request);
125             self.in_flight.push_back(InFlight::Active(response));
126         }
127 
128         // TODO: Should the error be handled differently?
129 
130         Ok(())
131     }
132 
poll(&mut self) -> Poll<Option<PipelineMessage<Self::In, Self::Stream, Self::Error>>, io::Error>133     fn poll(&mut self) -> Poll<Option<PipelineMessage<Self::In, Self::Stream, Self::Error>>, io::Error> {
134         for slot in self.in_flight.iter_mut() {
135             slot.poll();
136         }
137 
138         match self.in_flight.front() {
139             Some(&InFlight::Done(_)) => {}
140             _ => return Ok(Async::NotReady)
141         }
142 
143         match self.in_flight.pop_front() {
144             Some(InFlight::Done(res)) => Ok(Async::Ready(Some(res))),
145             _ => panic!(),
146         }
147     }
148 
has_in_flight(&self) -> bool149     fn has_in_flight(&self) -> bool {
150         !self.in_flight.is_empty()
151     }
152 }
153 
154 impl<F: Future> InFlight<F> {
poll(&mut self)155     fn poll(&mut self) {
156         let res = match *self {
157             InFlight::Active(ref mut f) => {
158                 match f.poll() {
159                     Ok(Async::Ready(e)) => Ok(e),
160                     Err(e) => Err(e),
161                     Ok(Async::NotReady) => return,
162                 }
163             }
164             _ => return,
165         };
166         *self = InFlight::Done(res);
167     }
168 }
169