1 use std::error::Error as StdError;
2 
3 use bytes::{Buf, Bytes};
4 use http::Request;
5 use tokio::io::{AsyncRead, AsyncWrite};
6 
7 use super::{Http1Transaction, Wants};
8 use crate::body::{Body, DecodedLength, HttpBody};
9 use crate::common::{task, Future, Pin, Poll, Unpin};
10 use crate::proto::{
11     BodyLength, Conn, Dispatched, MessageHead, RequestHead,
12 };
13 use crate::upgrade::OnUpgrade;
14 
15 pub(crate) struct Dispatcher<D, Bs: HttpBody, I, T> {
16     conn: Conn<I, Bs::Data, T>,
17     dispatch: D,
18     body_tx: Option<crate::body::Sender>,
19     body_rx: Pin<Box<Option<Bs>>>,
20     is_closing: bool,
21 }
22 
23 pub(crate) trait Dispatch {
24     type PollItem;
25     type PollBody;
26     type PollError;
27     type RecvItem;
poll_msg( self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>28     fn poll_msg(
29         self: Pin<&mut Self>,
30         cx: &mut task::Context<'_>,
31     ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>;
recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>32     fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>;
poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>33     fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>;
should_poll(&self) -> bool34     fn should_poll(&self) -> bool;
35 }
36 
37 cfg_server! {
38     use crate::service::HttpService;
39 
40     pub(crate) struct Server<S: HttpService<B>, B> {
41         in_flight: Pin<Box<Option<S::Future>>>,
42         pub(crate) service: S,
43     }
44 }
45 
46 cfg_client! {
47     pin_project_lite::pin_project! {
48         pub(crate) struct Client<B> {
49             callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>,
50             #[pin]
51             rx: ClientRx<B>,
52             rx_closed: bool,
53         }
54     }
55 
56     type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, http::Response<Body>>;
57 }
58 
59 impl<D, Bs, I, T> Dispatcher<D, Bs, I, T>
60 where
61     D: Dispatch<
62         PollItem = MessageHead<T::Outgoing>,
63         PollBody = Bs,
64         RecvItem = MessageHead<T::Incoming>,
65     > + Unpin,
66     D::PollError: Into<Box<dyn StdError + Send + Sync>>,
67     I: AsyncRead + AsyncWrite + Unpin,
68     T: Http1Transaction + Unpin,
69     Bs: HttpBody + 'static,
70     Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
71 {
new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self72     pub(crate) fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self {
73         Dispatcher {
74             conn,
75             dispatch,
76             body_tx: None,
77             body_rx: Box::pin(None),
78             is_closing: false,
79         }
80     }
81 
82     #[cfg(feature = "server")]
disable_keep_alive(&mut self)83     pub(crate) fn disable_keep_alive(&mut self) {
84         self.conn.disable_keep_alive();
85         if self.conn.is_write_closed() {
86             self.close();
87         }
88     }
89 
into_inner(self) -> (I, Bytes, D)90     pub(crate) fn into_inner(self) -> (I, Bytes, D) {
91         let (io, buf) = self.conn.into_inner();
92         (io, buf, self.dispatch)
93     }
94 
95     /// Run this dispatcher until HTTP says this connection is done,
96     /// but don't call `AsyncWrite::shutdown` on the underlying IO.
97     ///
98     /// This is useful for old-style HTTP upgrades, but ignores
99     /// newer-style upgrade API.
poll_without_shutdown( &mut self, cx: &mut task::Context<'_>, ) -> Poll<crate::Result<()>> where Self: Unpin,100     pub(crate) fn poll_without_shutdown(
101         &mut self,
102         cx: &mut task::Context<'_>,
103     ) -> Poll<crate::Result<()>>
104     where
105         Self: Unpin,
106     {
107         Pin::new(self).poll_catch(cx, false).map_ok(|ds| {
108             if let Dispatched::Upgrade(pending) = ds {
109                 pending.manual();
110             }
111         })
112     }
113 
poll_catch( &mut self, cx: &mut task::Context<'_>, should_shutdown: bool, ) -> Poll<crate::Result<Dispatched>>114     fn poll_catch(
115         &mut self,
116         cx: &mut task::Context<'_>,
117         should_shutdown: bool,
118     ) -> Poll<crate::Result<Dispatched>> {
119         Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| {
120             // An error means we're shutting down either way.
121             // We just try to give the error to the user,
122             // and close the connection with an Ok. If we
123             // cannot give it to the user, then return the Err.
124             self.dispatch.recv_msg(Err(e))?;
125             Ok(Dispatched::Shutdown)
126         }))
127     }
128 
poll_inner( &mut self, cx: &mut task::Context<'_>, should_shutdown: bool, ) -> Poll<crate::Result<Dispatched>>129     fn poll_inner(
130         &mut self,
131         cx: &mut task::Context<'_>,
132         should_shutdown: bool,
133     ) -> Poll<crate::Result<Dispatched>> {
134         T::update_date();
135 
136         ready!(self.poll_loop(cx))?;
137 
138         if self.is_done() {
139             if let Some(pending) = self.conn.pending_upgrade() {
140                 self.conn.take_error()?;
141                 return Poll::Ready(Ok(Dispatched::Upgrade(pending)));
142             } else if should_shutdown {
143                 ready!(self.conn.poll_shutdown(cx)).map_err(crate::Error::new_shutdown)?;
144             }
145             self.conn.take_error()?;
146             Poll::Ready(Ok(Dispatched::Shutdown))
147         } else {
148             Poll::Pending
149         }
150     }
151 
poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>152     fn poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
153         // Limit the looping on this connection, in case it is ready far too
154         // often, so that other futures don't starve.
155         //
156         // 16 was chosen arbitrarily, as that is number of pipelined requests
157         // benchmarks often use. Perhaps it should be a config option instead.
158         for _ in 0..16 {
159             let _ = self.poll_read(cx)?;
160             let _ = self.poll_write(cx)?;
161             let _ = self.poll_flush(cx)?;
162 
163             // This could happen if reading paused before blocking on IO,
164             // such as getting to the end of a framed message, but then
165             // writing/flushing set the state back to Init. In that case,
166             // if the read buffer still had bytes, we'd want to try poll_read
167             // again, or else we wouldn't ever be woken up again.
168             //
169             // Using this instead of task::current() and notify() inside
170             // the Conn is noticeably faster in pipelined benchmarks.
171             if !self.conn.wants_read_again() {
172                 //break;
173                 return Poll::Ready(Ok(()));
174             }
175         }
176 
177         trace!("poll_loop yielding (self = {:p})", self);
178 
179         task::yield_now(cx).map(|never| match never {})
180     }
181 
poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>182     fn poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
183         loop {
184             if self.is_closing {
185                 return Poll::Ready(Ok(()));
186             } else if self.conn.can_read_head() {
187                 ready!(self.poll_read_head(cx))?;
188             } else if let Some(mut body) = self.body_tx.take() {
189                 if self.conn.can_read_body() {
190                     match body.poll_ready(cx) {
191                         Poll::Ready(Ok(())) => (),
192                         Poll::Pending => {
193                             self.body_tx = Some(body);
194                             return Poll::Pending;
195                         }
196                         Poll::Ready(Err(_canceled)) => {
197                             // user doesn't care about the body
198                             // so we should stop reading
199                             trace!("body receiver dropped before eof, draining or closing");
200                             self.conn.poll_drain_or_close_read(cx);
201                             continue;
202                         }
203                     }
204                     match self.conn.poll_read_body(cx) {
205                         Poll::Ready(Some(Ok(chunk))) => match body.try_send_data(chunk) {
206                             Ok(()) => {
207                                 self.body_tx = Some(body);
208                             }
209                             Err(_canceled) => {
210                                 if self.conn.can_read_body() {
211                                     trace!("body receiver dropped before eof, closing");
212                                     self.conn.close_read();
213                                 }
214                             }
215                         },
216                         Poll::Ready(None) => {
217                             // just drop, the body will close automatically
218                         }
219                         Poll::Pending => {
220                             self.body_tx = Some(body);
221                             return Poll::Pending;
222                         }
223                         Poll::Ready(Some(Err(e))) => {
224                             body.send_error(crate::Error::new_body(e));
225                         }
226                     }
227                 } else {
228                     // just drop, the body will close automatically
229                 }
230             } else {
231                 return self.conn.poll_read_keep_alive(cx);
232             }
233         }
234     }
235 
poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>236     fn poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
237         // can dispatch receive, or does it still care about, an incoming message?
238         match ready!(self.dispatch.poll_ready(cx)) {
239             Ok(()) => (),
240             Err(()) => {
241                 trace!("dispatch no longer receiving messages");
242                 self.close();
243                 return Poll::Ready(Ok(()));
244             }
245         }
246         // dispatch is ready for a message, try to read one
247         match ready!(self.conn.poll_read_head(cx)) {
248             Some(Ok((mut head, body_len, wants))) => {
249                 let body = match body_len {
250                     DecodedLength::ZERO => Body::empty(),
251                     other => {
252                         let (tx, rx) = Body::new_channel(other, wants.contains(Wants::EXPECT));
253                         self.body_tx = Some(tx);
254                         rx
255                     }
256                 };
257                 if wants.contains(Wants::UPGRADE) {
258                     let upgrade = self.conn.on_upgrade();
259                     debug_assert!(!upgrade.is_none(), "empty upgrade");
260                     debug_assert!(head.extensions.get::<OnUpgrade>().is_none(), "OnUpgrade already set");
261                     head.extensions.insert(upgrade);
262                 }
263                 self.dispatch.recv_msg(Ok((head, body)))?;
264                 Poll::Ready(Ok(()))
265             }
266             Some(Err(err)) => {
267                 debug!("read_head error: {}", err);
268                 self.dispatch.recv_msg(Err(err))?;
269                 // if here, the dispatcher gave the user the error
270                 // somewhere else. we still need to shutdown, but
271                 // not as a second error.
272                 self.close();
273                 Poll::Ready(Ok(()))
274             }
275             None => {
276                 // read eof, the write side will have been closed too unless
277                 // allow_read_close was set to true, in which case just do
278                 // nothing...
279                 debug_assert!(self.conn.is_read_closed());
280                 if self.conn.is_write_closed() {
281                     self.close();
282                 }
283                 Poll::Ready(Ok(()))
284             }
285         }
286     }
287 
poll_write(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>288     fn poll_write(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
289         loop {
290             if self.is_closing {
291                 return Poll::Ready(Ok(()));
292             } else if self.body_rx.is_none()
293                 && self.conn.can_write_head()
294                 && self.dispatch.should_poll()
295             {
296                 if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) {
297                     let (head, mut body) = msg.map_err(crate::Error::new_user_service)?;
298 
299                     // Check if the body knows its full data immediately.
300                     //
301                     // If so, we can skip a bit of bookkeeping that streaming
302                     // bodies need to do.
303                     if let Some(full) = crate::body::take_full_data(&mut body) {
304                         self.conn.write_full_msg(head, full);
305                         return Poll::Ready(Ok(()));
306                     }
307 
308                     let body_type = if body.is_end_stream() {
309                         self.body_rx.set(None);
310                         None
311                     } else {
312                         let btype = body
313                             .size_hint()
314                             .exact()
315                             .map(BodyLength::Known)
316                             .or_else(|| Some(BodyLength::Unknown));
317                         self.body_rx.set(Some(body));
318                         btype
319                     };
320                     self.conn.write_head(head, body_type);
321                 } else {
322                     self.close();
323                     return Poll::Ready(Ok(()));
324                 }
325             } else if !self.conn.can_buffer_body() {
326                 ready!(self.poll_flush(cx))?;
327             } else {
328                 // A new scope is needed :(
329                 if let (Some(mut body), clear_body) =
330                     OptGuard::new(self.body_rx.as_mut()).guard_mut()
331                 {
332                     debug_assert!(!*clear_body, "opt guard defaults to keeping body");
333                     if !self.conn.can_write_body() {
334                         trace!(
335                             "no more write body allowed, user body is_end_stream = {}",
336                             body.is_end_stream(),
337                         );
338                         *clear_body = true;
339                         continue;
340                     }
341 
342                     let item = ready!(body.as_mut().poll_data(cx));
343                     if let Some(item) = item {
344                         let chunk = item.map_err(|e| {
345                             *clear_body = true;
346                             crate::Error::new_user_body(e)
347                         })?;
348                         let eos = body.is_end_stream();
349                         if eos {
350                             *clear_body = true;
351                             if chunk.remaining() == 0 {
352                                 trace!("discarding empty chunk");
353                                 self.conn.end_body()?;
354                             } else {
355                                 self.conn.write_body_and_end(chunk);
356                             }
357                         } else {
358                             if chunk.remaining() == 0 {
359                                 trace!("discarding empty chunk");
360                                 continue;
361                             }
362                             self.conn.write_body(chunk);
363                         }
364                     } else {
365                         *clear_body = true;
366                         self.conn.end_body()?;
367                     }
368                 } else {
369                     return Poll::Pending;
370                 }
371             }
372         }
373     }
374 
poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>375     fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
376         self.conn.poll_flush(cx).map_err(|err| {
377             debug!("error writing: {}", err);
378             crate::Error::new_body_write(err)
379         })
380     }
381 
close(&mut self)382     fn close(&mut self) {
383         self.is_closing = true;
384         self.conn.close_read();
385         self.conn.close_write();
386     }
387 
is_done(&self) -> bool388     fn is_done(&self) -> bool {
389         if self.is_closing {
390             return true;
391         }
392 
393         let read_done = self.conn.is_read_closed();
394 
395         if !T::should_read_first() && read_done {
396             // a client that cannot read may was well be done.
397             true
398         } else {
399             let write_done = self.conn.is_write_closed()
400                 || (!self.dispatch.should_poll() && self.body_rx.is_none());
401             read_done && write_done
402         }
403     }
404 }
405 
406 impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T>
407 where
408     D: Dispatch<
409         PollItem = MessageHead<T::Outgoing>,
410         PollBody = Bs,
411         RecvItem = MessageHead<T::Incoming>,
412     > + Unpin,
413     D::PollError: Into<Box<dyn StdError + Send + Sync>>,
414     I: AsyncRead + AsyncWrite + Unpin,
415     T: Http1Transaction + Unpin,
416     Bs: HttpBody + 'static,
417     Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
418 {
419     type Output = crate::Result<Dispatched>;
420 
421     #[inline]
poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>422     fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
423         self.poll_catch(cx, true)
424     }
425 }
426 
427 // ===== impl OptGuard =====
428 
429 /// A drop guard to allow a mutable borrow of an Option while being able to
430 /// set whether the `Option` should be cleared on drop.
431 struct OptGuard<'a, T>(Pin<&'a mut Option<T>>, bool);
432 
433 impl<'a, T> OptGuard<'a, T> {
new(pin: Pin<&'a mut Option<T>>) -> Self434     fn new(pin: Pin<&'a mut Option<T>>) -> Self {
435         OptGuard(pin, false)
436     }
437 
guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool)438     fn guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool) {
439         (self.0.as_mut().as_pin_mut(), &mut self.1)
440     }
441 }
442 
443 impl<'a, T> Drop for OptGuard<'a, T> {
drop(&mut self)444     fn drop(&mut self) {
445         if self.1 {
446             self.0.set(None);
447         }
448     }
449 }
450 
451 // ===== impl Server =====
452 
453 cfg_server! {
454     impl<S, B> Server<S, B>
455     where
456         S: HttpService<B>,
457     {
458         pub(crate) fn new(service: S) -> Server<S, B> {
459             Server {
460                 in_flight: Box::pin(None),
461                 service,
462             }
463         }
464 
465         pub(crate) fn into_service(self) -> S {
466             self.service
467         }
468     }
469 
470     // Service is never pinned
471     impl<S: HttpService<B>, B> Unpin for Server<S, B> {}
472 
473     impl<S, Bs> Dispatch for Server<S, Body>
474     where
475         S: HttpService<Body, ResBody = Bs>,
476         S::Error: Into<Box<dyn StdError + Send + Sync>>,
477         Bs: HttpBody,
478     {
479         type PollItem = MessageHead<http::StatusCode>;
480         type PollBody = Bs;
481         type PollError = S::Error;
482         type RecvItem = RequestHead;
483 
484         fn poll_msg(
485             mut self: Pin<&mut Self>,
486             cx: &mut task::Context<'_>,
487         ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> {
488             let mut this = self.as_mut();
489             let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() {
490                 let resp = ready!(fut.as_mut().poll(cx)?);
491                 let (parts, body) = resp.into_parts();
492                 let head = MessageHead {
493                     version: parts.version,
494                     subject: parts.status,
495                     headers: parts.headers,
496                     extensions: parts.extensions,
497                 };
498                 Poll::Ready(Some(Ok((head, body))))
499             } else {
500                 unreachable!("poll_msg shouldn't be called if no inflight");
501             };
502 
503             // Since in_flight finished, remove it
504             this.in_flight.set(None);
505             ret
506         }
507 
508         fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
509             let (msg, body) = msg?;
510             let mut req = Request::new(body);
511             *req.method_mut() = msg.subject.0;
512             *req.uri_mut() = msg.subject.1;
513             *req.headers_mut() = msg.headers;
514             *req.version_mut() = msg.version;
515             *req.extensions_mut() = msg.extensions;
516             let fut = self.service.call(req);
517             self.in_flight.set(Some(fut));
518             Ok(())
519         }
520 
521         fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
522             if self.in_flight.is_some() {
523                 Poll::Pending
524             } else {
525                 self.service.poll_ready(cx).map_err(|_e| {
526                     // FIXME: return error value.
527                     trace!("service closed");
528                 })
529             }
530         }
531 
532         fn should_poll(&self) -> bool {
533             self.in_flight.is_some()
534         }
535     }
536 }
537 
538 // ===== impl Client =====
539 
540 cfg_client! {
541     impl<B> Client<B> {
542         pub(crate) fn new(rx: ClientRx<B>) -> Client<B> {
543             Client {
544                 callback: None,
545                 rx,
546                 rx_closed: false,
547             }
548         }
549     }
550 
551     impl<B> Dispatch for Client<B>
552     where
553         B: HttpBody,
554     {
555         type PollItem = RequestHead;
556         type PollBody = B;
557         type PollError = crate::common::Never;
558         type RecvItem = crate::proto::ResponseHead;
559 
560         fn poll_msg(
561             mut self: Pin<&mut Self>,
562             cx: &mut task::Context<'_>,
563         ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), crate::common::Never>>> {
564             let mut this = self.as_mut();
565             debug_assert!(!this.rx_closed);
566             match this.rx.poll_recv(cx) {
567                 Poll::Ready(Some((req, mut cb))) => {
568                     // check that future hasn't been canceled already
569                     match cb.poll_canceled(cx) {
570                         Poll::Ready(()) => {
571                             trace!("request canceled");
572                             Poll::Ready(None)
573                         }
574                         Poll::Pending => {
575                             let (parts, body) = req.into_parts();
576                             let head = RequestHead {
577                                 version: parts.version,
578                                 subject: crate::proto::RequestLine(parts.method, parts.uri),
579                                 headers: parts.headers,
580                                 extensions: parts.extensions,
581                             };
582                             this.callback = Some(cb);
583                             Poll::Ready(Some(Ok((head, body))))
584                         }
585                     }
586                 }
587                 Poll::Ready(None) => {
588                     // user has dropped sender handle
589                     trace!("client tx closed");
590                     this.rx_closed = true;
591                     Poll::Ready(None)
592                 }
593                 Poll::Pending => Poll::Pending,
594             }
595         }
596 
597         fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
598             match msg {
599                 Ok((msg, body)) => {
600                     if let Some(cb) = self.callback.take() {
601                         let res = msg.into_response(body);
602                         cb.send(Ok(res));
603                         Ok(())
604                     } else {
605                         // Getting here is likely a bug! An error should have happened
606                         // in Conn::require_empty_read() before ever parsing a
607                         // full message!
608                         Err(crate::Error::new_unexpected_message())
609                     }
610                 }
611                 Err(err) => {
612                     if let Some(cb) = self.callback.take() {
613                         cb.send(Err((err, None)));
614                         Ok(())
615                     } else if !self.rx_closed {
616                         self.rx.close();
617                         if let Some((req, cb)) = self.rx.try_recv() {
618                             trace!("canceling queued request with connection error: {}", err);
619                             // in this case, the message was never even started, so it's safe to tell
620                             // the user that the request was completely canceled
621                             cb.send(Err((crate::Error::new_canceled().with(err), Some(req))));
622                             Ok(())
623                         } else {
624                             Err(err)
625                         }
626                     } else {
627                         Err(err)
628                     }
629                 }
630             }
631         }
632 
633         fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
634             match self.callback {
635                 Some(ref mut cb) => match cb.poll_canceled(cx) {
636                     Poll::Ready(()) => {
637                         trace!("callback receiver has dropped");
638                         Poll::Ready(Err(()))
639                     }
640                     Poll::Pending => Poll::Ready(Ok(())),
641                 },
642                 None => Poll::Ready(Err(())),
643             }
644         }
645 
646         fn should_poll(&self) -> bool {
647             self.callback.is_none()
648         }
649     }
650 }
651 
652 #[cfg(test)]
653 mod tests {
654     use super::*;
655     use crate::proto::h1::ClientTransaction;
656     use std::time::Duration;
657 
658     #[test]
client_read_bytes_before_writing_request()659     fn client_read_bytes_before_writing_request() {
660         let _ = pretty_env_logger::try_init();
661 
662         tokio_test::task::spawn(()).enter(|cx, _| {
663             let (io, mut handle) = tokio_test::io::Builder::new().build_with_handle();
664 
665             // Block at 0 for now, but we will release this response before
666             // the request is ready to write later...
667             //let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 0);
668             let (mut tx, rx) = crate::client::dispatch::channel();
669             let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
670             let mut dispatcher = Dispatcher::new(Client::new(rx), conn);
671 
672             // First poll is needed to allow tx to send...
673             assert!(Pin::new(&mut dispatcher).poll(cx).is_pending());
674 
675             // Unblock our IO, which has a response before we've sent request!
676             //
677             handle.read(b"HTTP/1.1 200 OK\r\n\r\n");
678 
679             let mut res_rx = tx
680                 .try_send(crate::Request::new(crate::Body::empty()))
681                 .unwrap();
682 
683             tokio_test::assert_ready_ok!(Pin::new(&mut dispatcher).poll(cx));
684             let err = tokio_test::assert_ready_ok!(Pin::new(&mut res_rx).poll(cx))
685                 .expect_err("callback should send error");
686 
687             match (err.0.kind(), err.1) {
688                 (&crate::error::Kind::Canceled, Some(_)) => (),
689                 other => panic!("expected Canceled, got {:?}", other),
690             }
691         });
692     }
693 
694     #[tokio::test]
body_empty_chunks_ignored()695     async fn body_empty_chunks_ignored() {
696         let _ = pretty_env_logger::try_init();
697 
698         let io = tokio_test::io::Builder::new()
699             // no reading or writing, just be blocked for the test...
700             .wait(Duration::from_secs(5))
701             .build();
702 
703         let (mut tx, rx) = crate::client::dispatch::channel();
704         let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
705         let mut dispatcher = tokio_test::task::spawn(Dispatcher::new(Client::new(rx), conn));
706 
707         // First poll is needed to allow tx to send...
708         assert!(dispatcher.poll().is_pending());
709 
710         let body = {
711             let (mut tx, body) = crate::Body::channel();
712             tx.try_send_data("".into()).unwrap();
713             body
714         };
715 
716         let _res_rx = tx.try_send(crate::Request::new(body)).unwrap();
717 
718         // Ensure conn.write_body wasn't called with the empty chunk.
719         // If it is, it will trigger an assertion.
720         assert!(dispatcher.poll().is_pending());
721     }
722 }
723