1 use std::error::Error as StdError;
2 
3 use bytes::{Buf, Bytes};
4 use futures::{Async, Future, Poll, Stream};
5 use http::{Request, Response, StatusCode};
6 use tokio_io::{AsyncRead, AsyncWrite};
7 
8 use body::{Body, Payload};
9 use body::internal::FullDataArg;
10 use common::{Never, YieldNow};
11 use proto::{BodyLength, DecodedLength, Conn, Dispatched, MessageHead, RequestHead, RequestLine, ResponseHead};
12 use super::Http1Transaction;
13 use service::Service;
14 
15 pub(crate) struct Dispatcher<D, Bs: Payload, I, T> {
16     conn: Conn<I, Bs::Data, T>,
17     dispatch: D,
18     body_tx: Option<::body::Sender>,
19     body_rx: Option<Bs>,
20     is_closing: bool,
21     /// If the poll loop reaches its max spin count, it will yield by notifying
22     /// the task immediately. This will cache that `Task`, since it usually is
23     /// the same one.
24     yield_now: YieldNow,
25 }
26 
27 pub(crate) trait Dispatch {
28     type PollItem;
29     type PollBody;
30     type PollError;
31     type RecvItem;
poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, Self::PollError>32     fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, Self::PollError>;
recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()>33     fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()>;
poll_ready(&mut self) -> Poll<(), ()>34     fn poll_ready(&mut self) -> Poll<(), ()>;
should_poll(&self) -> bool35     fn should_poll(&self) -> bool;
36 }
37 
38 pub struct Server<S: Service> {
39     in_flight: Option<S::Future>,
40     pub(crate) service: S,
41 }
42 
43 pub struct Client<B> {
44     callback: Option<::client::dispatch::Callback<Request<B>, Response<Body>>>,
45     rx: ClientRx<B>,
46 }
47 
48 type ClientRx<B> = ::client::dispatch::Receiver<Request<B>, Response<Body>>;
49 
50 impl<D, Bs, I, T> Dispatcher<D, Bs, I, T>
51 where
52     D: Dispatch<PollItem=MessageHead<T::Outgoing>, PollBody=Bs, RecvItem=MessageHead<T::Incoming>>,
53     D::PollError: Into<Box<dyn StdError + Send + Sync>>,
54     I: AsyncRead + AsyncWrite,
55     T: Http1Transaction,
56     Bs: Payload,
57 {
new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self58     pub fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self {
59         Dispatcher {
60             conn: conn,
61             dispatch: dispatch,
62             body_tx: None,
63             body_rx: None,
64             is_closing: false,
65             yield_now: YieldNow::new(),
66         }
67     }
68 
disable_keep_alive(&mut self)69     pub fn disable_keep_alive(&mut self) {
70         self.conn.disable_keep_alive()
71     }
72 
into_inner(self) -> (I, Bytes, D)73     pub fn into_inner(self) -> (I, Bytes, D) {
74         let (io, buf) = self.conn.into_inner();
75         (io, buf, self.dispatch)
76     }
77 
78     /// Run this dispatcher until HTTP says this connection is done,
79     /// but don't call `AsyncWrite::shutdown` on the underlying IO.
80     ///
81     /// This is useful for old-style HTTP upgrades, but ignores
82     /// newer-style upgrade API.
poll_without_shutdown(&mut self) -> Poll<(), ::Error>83     pub fn poll_without_shutdown(&mut self) -> Poll<(), ::Error> {
84         self.poll_catch(false)
85             .map(|x| {
86                 x.map(|ds| if let Dispatched::Upgrade(pending) = ds {
87                     pending.manual();
88                 })
89             })
90     }
91 
poll_catch(&mut self, should_shutdown: bool) -> Poll<Dispatched, ::Error>92     fn poll_catch(&mut self, should_shutdown: bool) -> Poll<Dispatched, ::Error> {
93         self.poll_inner(should_shutdown).or_else(|e| {
94             // An error means we're shutting down either way.
95             // We just try to give the error to the user,
96             // and close the connection with an Ok. If we
97             // cannot give it to the user, then return the Err.
98             self.dispatch.recv_msg(Err(e))?;
99             Ok(Async::Ready(Dispatched::Shutdown))
100         })
101     }
102 
poll_inner(&mut self, should_shutdown: bool) -> Poll<Dispatched, ::Error>103     fn poll_inner(&mut self, should_shutdown: bool) -> Poll<Dispatched, ::Error> {
104         T::update_date();
105 
106         try_ready!(self.poll_loop());
107 
108         if self.is_done() {
109             if let Some(pending) = self.conn.pending_upgrade() {
110                 self.conn.take_error()?;
111                 return Ok(Async::Ready(Dispatched::Upgrade(pending)));
112             } else if should_shutdown {
113                 try_ready!(self.conn.shutdown().map_err(::Error::new_shutdown));
114             }
115             self.conn.take_error()?;
116             Ok(Async::Ready(Dispatched::Shutdown))
117         } else {
118             Ok(Async::NotReady)
119         }
120     }
121 
poll_loop(&mut self) -> Poll<(), ::Error>122     fn poll_loop(&mut self) -> Poll<(), ::Error> {
123         // Limit the looping on this connection, in case it is ready far too
124         // often, so that other futures don't starve.
125         //
126         // 16 was chosen arbitrarily, as that is number of pipelined requests
127         // benchmarks often use. Perhaps it should be a config option instead.
128         for _ in 0..16 {
129             self.poll_read()?;
130             self.poll_write()?;
131             self.poll_flush()?;
132 
133             // This could happen if reading paused before blocking on IO,
134             // such as getting to the end of a framed message, but then
135             // writing/flushing set the state back to Init. In that case,
136             // if the read buffer still had bytes, we'd want to try poll_read
137             // again, or else we wouldn't ever be woken up again.
138             //
139             // Using this instead of task::current() and notify() inside
140             // the Conn is noticeably faster in pipelined benchmarks.
141             if !self.conn.wants_read_again() {
142                 //break;
143                 return Ok(Async::Ready(()));
144             }
145         }
146 
147         trace!("poll_loop yielding (self = {:p})", self);
148 
149         match self.yield_now.poll_yield() {
150             Ok(Async::NotReady) => Ok(Async::NotReady),
151             // maybe with `!` this can be cleaner...
152             // but for now, just doing this to eliminate branches
153             Ok(Async::Ready(never)) |
154             Err(never) => match never {}
155         }
156     }
157 
poll_read(&mut self) -> Poll<(), ::Error>158     fn poll_read(&mut self) -> Poll<(), ::Error> {
159         loop {
160             if self.is_closing {
161                 return Ok(Async::Ready(()));
162             } else if self.conn.can_read_head() {
163                 try_ready!(self.poll_read_head());
164             } else if let Some(mut body) = self.body_tx.take() {
165                 if self.conn.can_read_body() {
166                     match body.poll_ready() {
167                         Ok(Async::Ready(())) => (),
168                         Ok(Async::NotReady) => {
169                             self.body_tx = Some(body);
170                             return Ok(Async::NotReady);
171                         },
172                         Err(_canceled) => {
173                             // user doesn't care about the body
174                             // so we should stop reading
175                             trace!("body receiver dropped before eof, closing");
176                             self.conn.close_read();
177                             return Ok(Async::Ready(()));
178                         }
179                     }
180                     match self.conn.read_body() {
181                         Ok(Async::Ready(Some(chunk))) => {
182                             match body.send_data(chunk) {
183                                 Ok(()) => {
184                                     self.body_tx = Some(body);
185                                 },
186                                 Err(_canceled) => {
187                                     if self.conn.can_read_body() {
188                                         trace!("body receiver dropped before eof, closing");
189                                         self.conn.close_read();
190                                     }
191                                 }
192                             }
193                         },
194                         Ok(Async::Ready(None)) => {
195                             // just drop, the body will close automatically
196                         },
197                         Ok(Async::NotReady) => {
198                             self.body_tx = Some(body);
199                             return Ok(Async::NotReady);
200                         }
201                         Err(e) => {
202                             body.send_error(::Error::new_body(e));
203                         }
204                     }
205                 } else {
206                     // just drop, the body will close automatically
207                 }
208             } else {
209                 return self.conn.read_keep_alive();
210             }
211         }
212     }
213 
poll_read_head(&mut self) -> Poll<(), ::Error>214     fn poll_read_head(&mut self) -> Poll<(), ::Error> {
215         // can dispatch receive, or does it still care about, an incoming message?
216         match self.dispatch.poll_ready() {
217             Ok(Async::Ready(())) => (),
218             Ok(Async::NotReady) => return Ok(Async::NotReady), // service might not be ready
219             Err(()) => {
220                 trace!("dispatch no longer receiving messages");
221                 self.close();
222                 return Ok(Async::Ready(()));
223             }
224         }
225         // dispatch is ready for a message, try to read one
226         match self.conn.read_head() {
227             Ok(Async::Ready(Some((head, body_len, wants_upgrade)))) => {
228                 let mut body = match body_len {
229                     DecodedLength::ZERO => Body::empty(),
230                     other => {
231                         let (tx, rx) = Body::new_channel(other.into_opt());
232                         self.body_tx = Some(tx);
233                         rx
234                     },
235                 };
236                 if wants_upgrade {
237                     body.set_on_upgrade(self.conn.on_upgrade());
238                 }
239                 self.dispatch.recv_msg(Ok((head, body)))?;
240                 Ok(Async::Ready(()))
241             }
242             Ok(Async::Ready(None)) => {
243                 // read eof, conn will start to shutdown automatically
244                 Ok(Async::Ready(()))
245             }
246             Ok(Async::NotReady) => Ok(Async::NotReady),
247             Err(err) => {
248                 debug!("read_head error: {}", err);
249                 self.dispatch.recv_msg(Err(err))?;
250                 // if here, the dispatcher gave the user the error
251                 // somewhere else. we still need to shutdown, but
252                 // not as a second error.
253                 Ok(Async::Ready(()))
254             }
255         }
256     }
257 
poll_write(&mut self) -> Poll<(), ::Error>258     fn poll_write(&mut self) -> Poll<(), ::Error> {
259         loop {
260             if self.is_closing {
261                 return Ok(Async::Ready(()));
262             } else if self.body_rx.is_none() && self.conn.can_write_head() && self.dispatch.should_poll() {
263                 if let Some((head, mut body)) = try_ready!(self.dispatch.poll_msg().map_err(::Error::new_user_service)) {
264                     // Check if the body knows its full data immediately.
265                     //
266                     // If so, we can skip a bit of bookkeeping that streaming
267                     // bodies need to do.
268                     if let Some(full) = body.__hyper_full_data(FullDataArg(())).0 {
269                         self.conn.write_full_msg(head, full);
270                         return Ok(Async::Ready(()));
271                     }
272                     let body_type = if body.is_end_stream() {
273                         self.body_rx = None;
274                         None
275                     } else {
276                         let btype = body.content_length()
277                             .map(BodyLength::Known)
278                             .or_else(|| Some(BodyLength::Unknown));
279                         self.body_rx = Some(body);
280                         btype
281                     };
282                     self.conn.write_head(head, body_type);
283                 } else {
284                     self.close();
285                     return Ok(Async::Ready(()));
286                 }
287             } else if !self.conn.can_buffer_body() {
288                 try_ready!(self.poll_flush());
289             } else if let Some(mut body) = self.body_rx.take() {
290                 if !self.conn.can_write_body() {
291                     trace!(
292                         "no more write body allowed, user body is_end_stream = {}",
293                         body.is_end_stream(),
294                     );
295                     continue;
296                 }
297                 match body.poll_data().map_err(::Error::new_user_body)? {
298                     Async::Ready(Some(chunk)) => {
299                         let eos = body.is_end_stream();
300                         if eos {
301                             if chunk.remaining() == 0 {
302                                 trace!("discarding empty chunk");
303                                 self.conn.end_body();
304                             } else {
305                                 self.conn.write_body_and_end(chunk);
306                             }
307                         } else {
308                             self.body_rx = Some(body);
309                             if chunk.remaining() == 0 {
310                                 trace!("discarding empty chunk");
311                                 continue;
312                             }
313                             self.conn.write_body(chunk);
314                         }
315                     },
316                     Async::Ready(None) => {
317                         self.conn.end_body();
318                     },
319                     Async::NotReady => {
320                         self.body_rx = Some(body);
321                         return Ok(Async::NotReady);
322                     }
323                 }
324             } else {
325                 return Ok(Async::NotReady);
326             }
327         }
328     }
329 
poll_flush(&mut self) -> Poll<(), ::Error>330     fn poll_flush(&mut self) -> Poll<(), ::Error> {
331         self.conn.flush().map_err(|err| {
332             debug!("error writing: {}", err);
333             ::Error::new_body_write(err)
334         })
335     }
336 
close(&mut self)337     fn close(&mut self) {
338         self.is_closing = true;
339         self.conn.close_read();
340         self.conn.close_write();
341     }
342 
is_done(&self) -> bool343     fn is_done(&self) -> bool {
344         if self.is_closing {
345             return true;
346         }
347 
348         let read_done = self.conn.is_read_closed();
349 
350         if !T::should_read_first() && read_done {
351             // a client that cannot read may was well be done.
352             true
353         } else {
354             let write_done = self.conn.is_write_closed() ||
355                 (!self.dispatch.should_poll() && self.body_rx.is_none());
356             read_done && write_done
357         }
358     }
359 }
360 
361 impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T>
362 where
363     D: Dispatch<PollItem=MessageHead<T::Outgoing>, PollBody=Bs, RecvItem=MessageHead<T::Incoming>>,
364     D::PollError: Into<Box<dyn StdError + Send + Sync>>,
365     I: AsyncRead + AsyncWrite,
366     T: Http1Transaction,
367     Bs: Payload,
368 {
369     type Item = Dispatched;
370     type Error = ::Error;
371 
372     #[inline]
poll(&mut self) -> Poll<Self::Item, Self::Error>373     fn poll(&mut self) -> Poll<Self::Item, Self::Error> {
374         self.poll_catch(true)
375     }
376 }
377 
378 // ===== impl Server =====
379 
380 impl<S> Server<S> where S: Service {
new(service: S) -> Server<S>381     pub fn new(service: S) -> Server<S> {
382         Server {
383             in_flight: None,
384             service: service,
385         }
386     }
into_service(self) -> S387     pub fn into_service(self) -> S {
388         self.service
389     }
390 }
391 
392 impl<S, Bs> Dispatch for Server<S>
393 where
394     S: Service<ReqBody=Body, ResBody=Bs>,
395     S::Error: Into<Box<dyn StdError + Send + Sync>>,
396     Bs: Payload,
397 {
398     type PollItem = MessageHead<StatusCode>;
399     type PollBody = Bs;
400     type PollError = S::Error;
401     type RecvItem = RequestHead;
402 
poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, Self::PollError>403     fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, Self::PollError> {
404         if let Some(mut fut) = self.in_flight.take() {
405             let resp = match fut.poll()? {
406                 Async::Ready(res) => res,
407                 Async::NotReady => {
408                     self.in_flight = Some(fut);
409                     return Ok(Async::NotReady);
410                 }
411             };
412             let (parts, body) = resp.into_parts();
413             let head = MessageHead {
414                 version: parts.version,
415                 subject: parts.status,
416                 headers: parts.headers,
417             };
418             Ok(Async::Ready(Some((head, body))))
419         } else {
420             unreachable!("poll_msg shouldn't be called if no inflight");
421         }
422     }
423 
recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()>424     fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()> {
425         let (msg, body) = msg?;
426         let mut req = Request::new(body);
427         *req.method_mut() = msg.subject.0;
428         *req.uri_mut() = msg.subject.1;
429         *req.headers_mut() = msg.headers;
430         *req.version_mut() = msg.version;
431         self.in_flight = Some(self.service.call(req));
432         Ok(())
433     }
434 
poll_ready(&mut self) -> Poll<(), ()>435     fn poll_ready(&mut self) -> Poll<(), ()> {
436         if self.in_flight.is_some() {
437             Ok(Async::NotReady)
438         } else {
439             self.service.poll_ready()
440                 .map_err(|_e| {
441                     // FIXME: return error value.
442                     trace!("service closed");
443                 })
444         }
445     }
446 
should_poll(&self) -> bool447     fn should_poll(&self) -> bool {
448         self.in_flight.is_some()
449     }
450 }
451 
452 // ===== impl Client =====
453 
454 
455 impl<B> Client<B> {
new(rx: ClientRx<B>) -> Client<B>456     pub fn new(rx: ClientRx<B>) -> Client<B> {
457         Client {
458             callback: None,
459             rx: rx,
460         }
461     }
462 }
463 
464 impl<B> Dispatch for Client<B>
465 where
466     B: Payload,
467 {
468     type PollItem = RequestHead;
469     type PollBody = B;
470     type PollError = Never;
471     type RecvItem = ResponseHead;
472 
poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, Never>473     fn poll_msg(&mut self) -> Poll<Option<(Self::PollItem, Self::PollBody)>, Never> {
474         match self.rx.poll() {
475             Ok(Async::Ready(Some((req, mut cb)))) => {
476                 // check that future hasn't been canceled already
477                 match cb.poll_cancel().expect("poll_cancel cannot error") {
478                     Async::Ready(()) => {
479                         trace!("request canceled");
480                         Ok(Async::Ready(None))
481                     },
482                     Async::NotReady => {
483                         let (parts, body) = req.into_parts();
484                         let head = RequestHead {
485                             version: parts.version,
486                             subject: RequestLine(parts.method, parts.uri),
487                             headers: parts.headers,
488                         };
489                         self.callback = Some(cb);
490                         Ok(Async::Ready(Some((head, body))))
491                     }
492                 }
493             },
494             Ok(Async::Ready(None)) => {
495                 trace!("client tx closed");
496                 // user has dropped sender handle
497                 Ok(Async::Ready(None))
498             },
499             Ok(Async::NotReady) => Ok(Async::NotReady),
500             Err(never) => match never {},
501         }
502     }
503 
recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()>504     fn recv_msg(&mut self, msg: ::Result<(Self::RecvItem, Body)>) -> ::Result<()> {
505         match msg {
506             Ok((msg, body)) => {
507                 if let Some(cb) = self.callback.take() {
508                     let mut res = Response::new(body);
509                     *res.status_mut() = msg.subject;
510                     *res.headers_mut() = msg.headers;
511                     *res.version_mut() = msg.version;
512                     let _ = cb.send(Ok(res));
513                     Ok(())
514                 } else {
515                     // Getting here is likely a bug! An error should have happened
516                     // in Conn::require_empty_read() before ever parsing a
517                     // full message!
518                     Err(::Error::new_unexpected_message())
519                 }
520             },
521             Err(err) => {
522                 if let Some(cb) = self.callback.take() {
523                     let _ = cb.send(Err((err, None)));
524                     Ok(())
525                 } else if let Ok(Async::Ready(Some((req, cb)))) = self.rx.poll() {
526                     trace!("canceling queued request with connection error: {}", err);
527                     // in this case, the message was never even started, so it's safe to tell
528                     // the user that the request was completely canceled
529                     let _ = cb.send(Err((::Error::new_canceled().with(err), Some(req))));
530                     Ok(())
531                 } else {
532                     Err(err)
533                 }
534             }
535         }
536     }
537 
poll_ready(&mut self) -> Poll<(), ()>538     fn poll_ready(&mut self) -> Poll<(), ()> {
539         match self.callback {
540             Some(ref mut cb) => match cb.poll_cancel() {
541                 Ok(Async::Ready(())) => {
542                     trace!("callback receiver has dropped");
543                     Err(())
544                 },
545                 Ok(Async::NotReady) => Ok(Async::Ready(())),
546                 Err(_) => unreachable!("oneshot poll_cancel cannot error"),
547             },
548             None => Err(()),
549         }
550     }
551 
should_poll(&self) -> bool552     fn should_poll(&self) -> bool {
553         self.callback.is_none()
554     }
555 }
556 
557 #[cfg(test)]
558 mod tests {
559     extern crate pretty_env_logger;
560 
561     use super::*;
562     use mock::AsyncIo;
563     use proto::h1::ClientTransaction;
564 
565     #[test]
client_read_bytes_before_writing_request()566     fn client_read_bytes_before_writing_request() {
567         let _ = pretty_env_logger::try_init();
568         ::futures::lazy(|| {
569             // Block at 0 for now, but we will release this response before
570             // the request is ready to write later...
571             let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 0);
572             let (mut tx, rx) = ::client::dispatch::channel();
573             let conn = Conn::<_, ::Chunk, ClientTransaction>::new(io);
574             let mut dispatcher = Dispatcher::new(Client::new(rx), conn);
575 
576             // First poll is needed to allow tx to send...
577             assert!(dispatcher.poll().expect("nothing is ready").is_not_ready());
578             // Unblock our IO, which has a response before we've sent request!
579             dispatcher.conn.io_mut().block_in(100);
580 
581             let res_rx = tx.try_send(::Request::new(::Body::empty())).unwrap();
582 
583             let a1 = dispatcher.poll().expect("error should be sent on channel");
584             assert!(a1.is_ready(), "dispatcher should be closed");
585             let err = res_rx.wait()
586                 .expect("callback poll")
587                 .expect_err("callback response");
588 
589             match (err.0.kind(), err.1) {
590                 (&::error::Kind::Canceled, Some(_)) => (),
591                 other => panic!("expected Canceled, got {:?}", other),
592             }
593             Ok::<(), ()>(())
594         }).wait().unwrap();
595     }
596 
597     #[test]
body_empty_chunks_ignored()598     fn body_empty_chunks_ignored() {
599         let _ = pretty_env_logger::try_init();
600         ::futures::lazy(|| {
601             let io = AsyncIo::new_buf(vec![], 0);
602             let (mut tx, rx) = ::client::dispatch::channel();
603             let conn = Conn::<_, ::Chunk, ClientTransaction>::new(io);
604             let mut dispatcher = Dispatcher::new(Client::new(rx), conn);
605 
606             // First poll is needed to allow tx to send...
607             assert!(dispatcher.poll().expect("nothing is ready").is_not_ready());
608 
609             let body = ::Body::wrap_stream(::futures::stream::once(Ok::<_, ::Error>("")));
610 
611             let _res_rx = tx.try_send(::Request::new(body)).unwrap();
612 
613             dispatcher.poll().expect("empty body shouldn't panic");
614             Ok::<(), ()>(())
615         }).wait().unwrap();
616     }
617 }
618