1 use std::error::Error as StdError;
2 
3 use bytes::{Buf, Bytes};
4 use http::Request;
5 use tokio::io::{AsyncRead, AsyncWrite};
6 use tracing::{debug, trace};
7 
8 use super::{Http1Transaction, Wants};
9 use crate::body::{Body, DecodedLength, HttpBody};
10 use crate::common::{task, Future, Pin, Poll, Unpin};
11 use crate::proto::{
12     BodyLength, Conn, Dispatched, MessageHead, RequestHead,
13 };
14 use crate::upgrade::OnUpgrade;
15 
16 pub(crate) struct Dispatcher<D, Bs: HttpBody, I, T> {
17     conn: Conn<I, Bs::Data, T>,
18     dispatch: D,
19     body_tx: Option<crate::body::Sender>,
SS20     body_rx: Pin<Box<Option<Bs>>>,
21     is_closing: bool,
22 }
23 
24 pub(crate) trait Dispatch {
25     type PollItem;
26     type PollBody;
27     type PollError;
28     type RecvItem;
29     fn poll_msg(
30         self: Pin<&mut Self>,
31         cx: &mut task::Context<'_>,
32     ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>;
33     fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>;
34     fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>;
35     fn should_poll(&self) -> bool;
36 }
37 
38 cfg_server! {
39     use crate::service::HttpService;
40 
41     pub(crate) struct Server<S: HttpService<B>, B> {
42         in_flight: Pin<Box<Option<S::Future>>>,
43         pub(crate) service: S,
44     }
main()45 }
46 
47 cfg_client! {
48     pin_project_lite::pin_project! {
49         pub(crate) struct Client<B> {
50             callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>,
51             #[pin]
52             rx: ClientRx<B>,
53             rx_closed: bool,
54         }
55     }
56 
57     type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, http::Response<Body>>;
58 }
59 
60 impl<D, Bs, I, T> Dispatcher<D, Bs, I, T>
61 where
62     D: Dispatch<
63         PollItem = MessageHead<T::Outgoing>,
64         PollBody = Bs,
65         RecvItem = MessageHead<T::Incoming>,
66     > + Unpin,
67     D::PollError: Into<Box<dyn StdError + Send + Sync>>,
68     I: AsyncRead + AsyncWrite + Unpin,
69     T: Http1Transaction + Unpin,
70     Bs: HttpBody + 'static,
71     Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
72 {
73     pub(crate) fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self {
74         Dispatcher {
75             conn,
76             dispatch,
77             body_tx: None,
78             body_rx: Box::pin(None),
79             is_closing: false,
80         }
81     }
82 
83     #[cfg(feature = "server")]
84     pub(crate) fn disable_keep_alive(&mut self) {
85         self.conn.disable_keep_alive();
86         if self.conn.is_write_closed() {
87             self.close();
88         }
89     }
90 
91     pub(crate) fn into_inner(self) -> (I, Bytes, D) {
92         let (io, buf) = self.conn.into_inner();
93         (io, buf, self.dispatch)
94     }
95 
96     /// Run this dispatcher until HTTP says this connection is done,
97     /// but don't call `AsyncWrite::shutdown` on the underlying IO.
98     ///
99     /// This is useful for old-style HTTP upgrades, but ignores
100     /// newer-style upgrade API.
101     pub(crate) fn poll_without_shutdown(
102         &mut self,
103         cx: &mut task::Context<'_>,
104     ) -> Poll<crate::Result<()>>
105     where
106         Self: Unpin,
107     {
108         Pin::new(self).poll_catch(cx, false).map_ok(|ds| {
109             if let Dispatched::Upgrade(pending) = ds {
110                 pending.manual();
111             }
112         })
113     }
114 
115     fn poll_catch(
116         &mut self,
117         cx: &mut task::Context<'_>,
118         should_shutdown: bool,
119     ) -> Poll<crate::Result<Dispatched>> {
120         Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| {
121             // An error means we're shutting down either way.
122             // We just try to give the error to the user,
123             // and close the connection with an Ok. If we
124             // cannot give it to the user, then return the Err.
125             self.dispatch.recv_msg(Err(e))?;
126             Ok(Dispatched::Shutdown)
127         }))
128     }
129 
130     fn poll_inner(
131         &mut self,
132         cx: &mut task::Context<'_>,
133         should_shutdown: bool,
134     ) -> Poll<crate::Result<Dispatched>> {
135         T::update_date();
136 
137         ready!(self.poll_loop(cx))?;
138 
139         if self.is_done() {
140             if let Some(pending) = self.conn.pending_upgrade() {
141                 self.conn.take_error()?;
142                 return Poll::Ready(Ok(Dispatched::Upgrade(pending)));
143             } else if should_shutdown {
144                 ready!(self.conn.poll_shutdown(cx)).map_err(crate::Error::new_shutdown)?;
145             }
146             self.conn.take_error()?;
147             Poll::Ready(Ok(Dispatched::Shutdown))
148         } else {
149             Poll::Pending
150         }
151     }
152 
153     fn poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
154         // Limit the looping on this connection, in case it is ready far too
155         // often, so that other futures don't starve.
156         //
157         // 16 was chosen arbitrarily, as that is number of pipelined requests
158         // benchmarks often use. Perhaps it should be a config option instead.
159         for _ in 0..16 {
160             let _ = self.poll_read(cx)?;
161             let _ = self.poll_write(cx)?;
162             let _ = self.poll_flush(cx)?;
163 
164             // This could happen if reading paused before blocking on IO,
165             // such as getting to the end of a framed message, but then
166             // writing/flushing set the state back to Init. In that case,
167             // if the read buffer still had bytes, we'd want to try poll_read
168             // again, or else we wouldn't ever be woken up again.
169             //
170             // Using this instead of task::current() and notify() inside
171             // the Conn is noticeably faster in pipelined benchmarks.
172             if !self.conn.wants_read_again() {
173                 //break;
174                 return Poll::Ready(Ok(()));
175             }
176         }
177 
178         trace!("poll_loop yielding (self = {:p})", self);
179 
180         task::yield_now(cx).map(|never| match never {})
181     }
182 
183     fn poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
184         loop {
185             if self.is_closing {
186                 return Poll::Ready(Ok(()));
187             } else if self.conn.can_read_head() {
188                 ready!(self.poll_read_head(cx))?;
189             } else if let Some(mut body) = self.body_tx.take() {
190                 if self.conn.can_read_body() {
191                     match body.poll_ready(cx) {
192                         Poll::Ready(Ok(())) => (),
193                         Poll::Pending => {
194                             self.body_tx = Some(body);
195                             return Poll::Pending;
196                         }
197                         Poll::Ready(Err(_canceled)) => {
198                             // user doesn't care about the body
199                             // so we should stop reading
200                             trace!("body receiver dropped before eof, draining or closing");
201                             self.conn.poll_drain_or_close_read(cx);
202                             continue;
203                         }
204                     }
205                     match self.conn.poll_read_body(cx) {
206                         Poll::Ready(Some(Ok(chunk))) => match body.try_send_data(chunk) {
207                             Ok(()) => {
208                                 self.body_tx = Some(body);
209                             }
210                             Err(_canceled) => {
211                                 if self.conn.can_read_body() {
212                                     trace!("body receiver dropped before eof, closing");
213                                     self.conn.close_read();
214                                 }
215                             }
216                         },
217                         Poll::Ready(None) => {
218                             // just drop, the body will close automatically
219                         }
220                         Poll::Pending => {
221                             self.body_tx = Some(body);
222                             return Poll::Pending;
223                         }
224                         Poll::Ready(Some(Err(e))) => {
225                             body.send_error(crate::Error::new_body(e));
226                         }
227                     }
228                 } else {
229                     // just drop, the body will close automatically
230                 }
231             } else {
232                 return self.conn.poll_read_keep_alive(cx);
233             }
234         }
235     }
236 
237     fn poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
238         // can dispatch receive, or does it still care about, an incoming message?
239         match ready!(self.dispatch.poll_ready(cx)) {
240             Ok(()) => (),
241             Err(()) => {
242                 trace!("dispatch no longer receiving messages");
243                 self.close();
244                 return Poll::Ready(Ok(()));
245             }
246         }
247         // dispatch is ready for a message, try to read one
248         match ready!(self.conn.poll_read_head(cx)) {
249             Some(Ok((mut head, body_len, wants))) => {
250                 let body = match body_len {
251                     DecodedLength::ZERO => Body::empty(),
252                     other => {
253                         let (tx, rx) = Body::new_channel(other, wants.contains(Wants::EXPECT));
254                         self.body_tx = Some(tx);
255                         rx
256                     }
257                 };
258                 if wants.contains(Wants::UPGRADE) {
259                     let upgrade = self.conn.on_upgrade();
260                     debug_assert!(!upgrade.is_none(), "empty upgrade");
261                     debug_assert!(head.extensions.get::<OnUpgrade>().is_none(), "OnUpgrade already set");
262                     head.extensions.insert(upgrade);
263                 }
264                 self.dispatch.recv_msg(Ok((head, body)))?;
265                 Poll::Ready(Ok(()))
266             }
267             Some(Err(err)) => {
268                 debug!("read_head error: {}", err);
269                 self.dispatch.recv_msg(Err(err))?;
270                 // if here, the dispatcher gave the user the error
271                 // somewhere else. we still need to shutdown, but
272                 // not as a second error.
273                 self.close();
274                 Poll::Ready(Ok(()))
275             }
276             None => {
277                 // read eof, the write side will have been closed too unless
278                 // allow_read_close was set to true, in which case just do
279                 // nothing...
280                 debug_assert!(self.conn.is_read_closed());
281                 if self.conn.is_write_closed() {
282                     self.close();
283                 }
284                 Poll::Ready(Ok(()))
285             }
286         }
287     }
288 
289     fn poll_write(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
290         loop {
291             if self.is_closing {
292                 return Poll::Ready(Ok(()));
293             } else if self.body_rx.is_none()
294                 && self.conn.can_write_head()
295                 && self.dispatch.should_poll()
296             {
297                 if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) {
298                     let (head, mut body) = msg.map_err(crate::Error::new_user_service)?;
299 
300                     // Check if the body knows its full data immediately.
301                     //
302                     // If so, we can skip a bit of bookkeeping that streaming
303                     // bodies need to do.
304                     if let Some(full) = crate::body::take_full_data(&mut body) {
305                         self.conn.write_full_msg(head, full);
306                         return Poll::Ready(Ok(()));
307                     }
308 
309                     let body_type = if body.is_end_stream() {
310                         self.body_rx.set(None);
311                         None
312                     } else {
313                         let btype = body
314                             .size_hint()
315                             .exact()
316                             .map(BodyLength::Known)
317                             .or_else(|| Some(BodyLength::Unknown));
318                         self.body_rx.set(Some(body));
319                         btype
320                     };
321                     self.conn.write_head(head, body_type);
322                 } else {
323                     self.close();
324                     return Poll::Ready(Ok(()));
325                 }
326             } else if !self.conn.can_buffer_body() {
327                 ready!(self.poll_flush(cx))?;
328             } else {
329                 // A new scope is needed :(
330                 if let (Some(mut body), clear_body) =
331                     OptGuard::new(self.body_rx.as_mut()).guard_mut()
332                 {
333                     debug_assert!(!*clear_body, "opt guard defaults to keeping body");
334                     if !self.conn.can_write_body() {
335                         trace!(
336                             "no more write body allowed, user body is_end_stream = {}",
337                             body.is_end_stream(),
338                         );
339                         *clear_body = true;
340                         continue;
341                     }
342 
343                     let item = ready!(body.as_mut().poll_data(cx));
344                     if let Some(item) = item {
345                         let chunk = item.map_err(|e| {
346                             *clear_body = true;
347                             crate::Error::new_user_body(e)
348                         })?;
349                         let eos = body.is_end_stream();
350                         if eos {
351                             *clear_body = true;
352                             if chunk.remaining() == 0 {
353                                 trace!("discarding empty chunk");
354                                 self.conn.end_body()?;
355                             } else {
356                                 self.conn.write_body_and_end(chunk);
357                             }
358                         } else {
359                             if chunk.remaining() == 0 {
360                                 trace!("discarding empty chunk");
361                                 continue;
362                             }
363                             self.conn.write_body(chunk);
364                         }
365                     } else {
366                         *clear_body = true;
367                         self.conn.end_body()?;
368                     }
369                 } else {
370                     return Poll::Pending;
371                 }
372             }
373         }
374     }
375 
376     fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
377         self.conn.poll_flush(cx).map_err(|err| {
378             debug!("error writing: {}", err);
379             crate::Error::new_body_write(err)
380         })
381     }
382 
383     fn close(&mut self) {
384         self.is_closing = true;
385         self.conn.close_read();
386         self.conn.close_write();
387     }
388 
389     fn is_done(&self) -> bool {
390         if self.is_closing {
391             return true;
392         }
393 
394         let read_done = self.conn.is_read_closed();
395 
396         if !T::should_read_first() && read_done {
397             // a client that cannot read may was well be done.
398             true
399         } else {
400             let write_done = self.conn.is_write_closed()
401                 || (!self.dispatch.should_poll() && self.body_rx.is_none());
402             read_done && write_done
403         }
404     }
405 }
406 
407 impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T>
408 where
409     D: Dispatch<
410         PollItem = MessageHead<T::Outgoing>,
411         PollBody = Bs,
412         RecvItem = MessageHead<T::Incoming>,
413     > + Unpin,
414     D::PollError: Into<Box<dyn StdError + Send + Sync>>,
415     I: AsyncRead + AsyncWrite + Unpin,
416     T: Http1Transaction + Unpin,
417     Bs: HttpBody + 'static,
418     Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
419 {
420     type Output = crate::Result<Dispatched>;
421 
422     #[inline]
423     fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
424         self.poll_catch(cx, true)
425     }
426 }
427 
428 // ===== impl OptGuard =====
429 
430 /// A drop guard to allow a mutable borrow of an Option while being able to
431 /// set whether the `Option` should be cleared on drop.
432 struct OptGuard<'a, T>(Pin<&'a mut Option<T>>, bool);
433 
434 impl<'a, T> OptGuard<'a, T> {
435     fn new(pin: Pin<&'a mut Option<T>>) -> Self {
436         OptGuard(pin, false)
437     }
438 
439     fn guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool) {
440         (self.0.as_mut().as_pin_mut(), &mut self.1)
441     }
442 }
443 
444 impl<'a, T> Drop for OptGuard<'a, T> {
445     fn drop(&mut self) {
446         if self.1 {
447             self.0.set(None);
448         }
449     }
450 }
451 
452 // ===== impl Server =====
453 
454 cfg_server! {
455     impl<S, B> Server<S, B>
456     where
457         S: HttpService<B>,
458     {
459         pub(crate) fn new(service: S) -> Server<S, B> {
460             Server {
461                 in_flight: Box::pin(None),
462                 service,
463             }
464         }
465 
466         pub(crate) fn into_service(self) -> S {
467             self.service
468         }
469     }
470 
471     // Service is never pinned
472     impl<S: HttpService<B>, B> Unpin for Server<S, B> {}
473 
474     impl<S, Bs> Dispatch for Server<S, Body>
475     where
476         S: HttpService<Body, ResBody = Bs>,
477         S::Error: Into<Box<dyn StdError + Send + Sync>>,
478         Bs: HttpBody,
479     {
480         type PollItem = MessageHead<http::StatusCode>;
481         type PollBody = Bs;
482         type PollError = S::Error;
483         type RecvItem = RequestHead;
484 
485         fn poll_msg(
486             mut self: Pin<&mut Self>,
487             cx: &mut task::Context<'_>,
488         ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> {
489             let mut this = self.as_mut();
490             let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() {
491                 let resp = ready!(fut.as_mut().poll(cx)?);
492                 let (parts, body) = resp.into_parts();
493                 let head = MessageHead {
494                     version: parts.version,
495                     subject: parts.status,
496                     headers: parts.headers,
497                     extensions: parts.extensions,
498                 };
499                 Poll::Ready(Some(Ok((head, body))))
500             } else {
501                 unreachable!("poll_msg shouldn't be called if no inflight");
502             };
503 
504             // Since in_flight finished, remove it
505             this.in_flight.set(None);
506             ret
507         }
508 
509         fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
510             let (msg, body) = msg?;
511             let mut req = Request::new(body);
512             *req.method_mut() = msg.subject.0;
513             *req.uri_mut() = msg.subject.1;
514             *req.headers_mut() = msg.headers;
515             *req.version_mut() = msg.version;
516             *req.extensions_mut() = msg.extensions;
517             let fut = self.service.call(req);
518             self.in_flight.set(Some(fut));
519             Ok(())
520         }
521 
522         fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
523             if self.in_flight.is_some() {
524                 Poll::Pending
525             } else {
526                 self.service.poll_ready(cx).map_err(|_e| {
527                     // FIXME: return error value.
528                     trace!("service closed");
529                 })
530             }
531         }
532 
533         fn should_poll(&self) -> bool {
534             self.in_flight.is_some()
535         }
536     }
537 }
538 
539 // ===== impl Client =====
540 
541 cfg_client! {
542     impl<B> Client<B> {
543         pub(crate) fn new(rx: ClientRx<B>) -> Client<B> {
544             Client {
545                 callback: None,
546                 rx,
547                 rx_closed: false,
548             }
549         }
550     }
551 
552     impl<B> Dispatch for Client<B>
553     where
554         B: HttpBody,
555     {
556         type PollItem = RequestHead;
557         type PollBody = B;
558         type PollError = crate::common::Never;
559         type RecvItem = crate::proto::ResponseHead;
560 
561         fn poll_msg(
562             mut self: Pin<&mut Self>,
563             cx: &mut task::Context<'_>,
564         ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), crate::common::Never>>> {
565             let mut this = self.as_mut();
566             debug_assert!(!this.rx_closed);
567             match this.rx.poll_recv(cx) {
568                 Poll::Ready(Some((req, mut cb))) => {
569                     // check that future hasn't been canceled already
570                     match cb.poll_canceled(cx) {
571                         Poll::Ready(()) => {
572                             trace!("request canceled");
573                             Poll::Ready(None)
574                         }
575                         Poll::Pending => {
576                             let (parts, body) = req.into_parts();
577                             let head = RequestHead {
578                                 version: parts.version,
579                                 subject: crate::proto::RequestLine(parts.method, parts.uri),
580                                 headers: parts.headers,
581                                 extensions: parts.extensions,
582                             };
583                             this.callback = Some(cb);
584                             Poll::Ready(Some(Ok((head, body))))
585                         }
586                     }
587                 }
588                 Poll::Ready(None) => {
589                     // user has dropped sender handle
590                     trace!("client tx closed");
591                     this.rx_closed = true;
592                     Poll::Ready(None)
593                 }
594                 Poll::Pending => Poll::Pending,
595             }
596         }
597 
598         fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
599             match msg {
600                 Ok((msg, body)) => {
601                     if let Some(cb) = self.callback.take() {
602                         let res = msg.into_response(body);
603                         cb.send(Ok(res));
604                         Ok(())
605                     } else {
606                         // Getting here is likely a bug! An error should have happened
607                         // in Conn::require_empty_read() before ever parsing a
608                         // full message!
609                         Err(crate::Error::new_unexpected_message())
610                     }
611                 }
612                 Err(err) => {
613                     if let Some(cb) = self.callback.take() {
614                         cb.send(Err((err, None)));
615                         Ok(())
616                     } else if !self.rx_closed {
617                         self.rx.close();
618                         if let Some((req, cb)) = self.rx.try_recv() {
619                             trace!("canceling queued request with connection error: {}", err);
620                             // in this case, the message was never even started, so it's safe to tell
621                             // the user that the request was completely canceled
622                             cb.send(Err((crate::Error::new_canceled().with(err), Some(req))));
623                             Ok(())
624                         } else {
625                             Err(err)
626                         }
627                     } else {
628                         Err(err)
629                     }
630                 }
631             }
632         }
633 
634         fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
635             match self.callback {
636                 Some(ref mut cb) => match cb.poll_canceled(cx) {
637                     Poll::Ready(()) => {
638                         trace!("callback receiver has dropped");
639                         Poll::Ready(Err(()))
640                     }
641                     Poll::Pending => Poll::Ready(Ok(())),
642                 },
643                 None => Poll::Ready(Err(())),
644             }
645         }
646 
647         fn should_poll(&self) -> bool {
648             self.callback.is_none()
649         }
650     }
651 }
652 
653 #[cfg(test)]
654 mod tests {
655     use super::*;
656     use crate::proto::h1::ClientTransaction;
657     use std::time::Duration;
658 
659     #[test]
660     fn client_read_bytes_before_writing_request() {
661         let _ = pretty_env_logger::try_init();
662 
663         tokio_test::task::spawn(()).enter(|cx, _| {
664             let (io, mut handle) = tokio_test::io::Builder::new().build_with_handle();
665 
666             // Block at 0 for now, but we will release this response before
667             // the request is ready to write later...
668             let (mut tx, rx) = crate::client::dispatch::channel();
669             let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
670             let mut dispatcher = Dispatcher::new(Client::new(rx), conn);
671 
672             // First poll is needed to allow tx to send...
673             assert!(Pin::new(&mut dispatcher).poll(cx).is_pending());
674 
675             // Unblock our IO, which has a response before we've sent request!
676             //
677             handle.read(b"HTTP/1.1 200 OK\r\n\r\n");
678 
679             let mut res_rx = tx
680                 .try_send(crate::Request::new(crate::Body::empty()))
681                 .unwrap();
682 
683             tokio_test::assert_ready_ok!(Pin::new(&mut dispatcher).poll(cx));
684             let err = tokio_test::assert_ready_ok!(Pin::new(&mut res_rx).poll(cx))
685                 .expect_err("callback should send error");
686 
687             match (err.0.kind(), err.1) {
688                 (&crate::error::Kind::Canceled, Some(_)) => (),
689                 other => panic!("expected Canceled, got {:?}", other),
690             }
691         });
692     }
693 
694     #[tokio::test]
695     async fn client_flushing_is_not_ready_for_next_request() {
696         let _ = pretty_env_logger::try_init();
697 
698         let (io, _handle) = tokio_test::io::Builder::new()
699             .write(b"POST / HTTP/1.1\r\ncontent-length: 4\r\n\r\n")
700             .read(b"HTTP/1.1 200 OK\r\ncontent-length: 0\r\n\r\n")
701             .wait(std::time::Duration::from_secs(2))
702             .build_with_handle();
703 
704         let (mut tx, rx) = crate::client::dispatch::channel();
705         let mut conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
706         conn.set_write_strategy_queue();
707 
708         let dispatcher = Dispatcher::new(Client::new(rx), conn);
709         let _dispatcher = tokio::spawn(async move { dispatcher.await });
710 
711         let req = crate::Request::builder()
712             .method("POST")
713             .body(crate::Body::from("reee"))
714             .unwrap();
715 
716         let res = tx.try_send(req).unwrap().await.expect("response");
717         drop(res);
718 
719         assert!(!tx.is_ready());
720     }
721 
722     #[tokio::test]
723     async fn body_empty_chunks_ignored() {
724         let _ = pretty_env_logger::try_init();
725 
726         let io = tokio_test::io::Builder::new()
727             // no reading or writing, just be blocked for the test...
728             .wait(Duration::from_secs(5))
729             .build();
730 
731         let (mut tx, rx) = crate::client::dispatch::channel();
732         let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
733         let mut dispatcher = tokio_test::task::spawn(Dispatcher::new(Client::new(rx), conn));
734 
735         // First poll is needed to allow tx to send...
736         assert!(dispatcher.poll().is_pending());
737 
738         let body = {
739             let (mut tx, body) = crate::Body::channel();
740             tx.try_send_data("".into()).unwrap();
741             body
742         };
743 
744         let _res_rx = tx.try_send(crate::Request::new(body)).unwrap();
745 
746         // Ensure conn.write_body wasn't called with the empty chunk.
747         // If it is, it will trigger an assertion.
748         assert!(dispatcher.poll().is_pending());
749     }
750 }
751