1 use std::error::Error as StdError;
2 
3 use bytes::{Buf, Bytes};
4 use http::Request;
5 use tokio::io::{AsyncRead, AsyncWrite};
6 
7 use super::{Http1Transaction, Wants};
8 use crate::body::{Body, DecodedLength, HttpBody};
9 use crate::common::{task, Future, Pin, Poll, Unpin};
10 use crate::proto::{
11     BodyLength, Conn, Dispatched, MessageHead, RequestHead,
12 };
13 use crate::upgrade::OnUpgrade;
14 
15 pub(crate) struct Dispatcher<D, Bs: HttpBody, I, T> {
16     conn: Conn<I, Bs::Data, T>,
17     dispatch: D,
18     body_tx: Option<crate::body::Sender>,
19     body_rx: Pin<Box<Option<Bs>>>,
20     is_closing: bool,
21 }
22 
23 pub(crate) trait Dispatch {
24     type PollItem;
25     type PollBody;
26     type PollError;
27     type RecvItem;
poll_msg( self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>28     fn poll_msg(
29         self: Pin<&mut Self>,
30         cx: &mut task::Context<'_>,
31     ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>;
recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>32     fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>;
poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>33     fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>;
should_poll(&self) -> bool34     fn should_poll(&self) -> bool;
35 }
36 
37 cfg_server! {
38     use crate::service::HttpService;
39 
40     pub(crate) struct Server<S: HttpService<B>, B> {
41         in_flight: Pin<Box<Option<S::Future>>>,
42         pub(crate) service: S,
43     }
44 }
45 
46 cfg_client! {
47     pub(crate) struct Client<B> {
48         callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>,
49         rx: ClientRx<B>,
50         rx_closed: bool,
51     }
52 
53     type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, http::Response<Body>>;
54 }
55 
56 impl<D, Bs, I, T> Dispatcher<D, Bs, I, T>
57 where
58     D: Dispatch<
59         PollItem = MessageHead<T::Outgoing>,
60         PollBody = Bs,
61         RecvItem = MessageHead<T::Incoming>,
62     > + Unpin,
63     D::PollError: Into<Box<dyn StdError + Send + Sync>>,
64     I: AsyncRead + AsyncWrite + Unpin,
65     T: Http1Transaction + Unpin,
66     Bs: HttpBody + 'static,
67     Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
68 {
new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self69     pub(crate) fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self {
70         Dispatcher {
71             conn,
72             dispatch,
73             body_tx: None,
74             body_rx: Box::pin(None),
75             is_closing: false,
76         }
77     }
78 
79     #[cfg(feature = "server")]
disable_keep_alive(&mut self)80     pub(crate) fn disable_keep_alive(&mut self) {
81         self.conn.disable_keep_alive();
82         if self.conn.is_write_closed() {
83             self.close();
84         }
85     }
86 
into_inner(self) -> (I, Bytes, D)87     pub(crate) fn into_inner(self) -> (I, Bytes, D) {
88         let (io, buf) = self.conn.into_inner();
89         (io, buf, self.dispatch)
90     }
91 
92     /// Run this dispatcher until HTTP says this connection is done,
93     /// but don't call `AsyncWrite::shutdown` on the underlying IO.
94     ///
95     /// This is useful for old-style HTTP upgrades, but ignores
96     /// newer-style upgrade API.
poll_without_shutdown( &mut self, cx: &mut task::Context<'_>, ) -> Poll<crate::Result<()>> where Self: Unpin,97     pub(crate) fn poll_without_shutdown(
98         &mut self,
99         cx: &mut task::Context<'_>,
100     ) -> Poll<crate::Result<()>>
101     where
102         Self: Unpin,
103     {
104         Pin::new(self).poll_catch(cx, false).map_ok(|ds| {
105             if let Dispatched::Upgrade(pending) = ds {
106                 pending.manual();
107             }
108         })
109     }
110 
poll_catch( &mut self, cx: &mut task::Context<'_>, should_shutdown: bool, ) -> Poll<crate::Result<Dispatched>>111     fn poll_catch(
112         &mut self,
113         cx: &mut task::Context<'_>,
114         should_shutdown: bool,
115     ) -> Poll<crate::Result<Dispatched>> {
116         Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| {
117             // An error means we're shutting down either way.
118             // We just try to give the error to the user,
119             // and close the connection with an Ok. If we
120             // cannot give it to the user, then return the Err.
121             self.dispatch.recv_msg(Err(e))?;
122             Ok(Dispatched::Shutdown)
123         }))
124     }
125 
poll_inner( &mut self, cx: &mut task::Context<'_>, should_shutdown: bool, ) -> Poll<crate::Result<Dispatched>>126     fn poll_inner(
127         &mut self,
128         cx: &mut task::Context<'_>,
129         should_shutdown: bool,
130     ) -> Poll<crate::Result<Dispatched>> {
131         T::update_date();
132 
133         ready!(self.poll_loop(cx))?;
134 
135         if self.is_done() {
136             if let Some(pending) = self.conn.pending_upgrade() {
137                 self.conn.take_error()?;
138                 return Poll::Ready(Ok(Dispatched::Upgrade(pending)));
139             } else if should_shutdown {
140                 ready!(self.conn.poll_shutdown(cx)).map_err(crate::Error::new_shutdown)?;
141             }
142             self.conn.take_error()?;
143             Poll::Ready(Ok(Dispatched::Shutdown))
144         } else {
145             Poll::Pending
146         }
147     }
148 
poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>149     fn poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
150         // Limit the looping on this connection, in case it is ready far too
151         // often, so that other futures don't starve.
152         //
153         // 16 was chosen arbitrarily, as that is number of pipelined requests
154         // benchmarks often use. Perhaps it should be a config option instead.
155         for _ in 0..16 {
156             let _ = self.poll_read(cx)?;
157             let _ = self.poll_write(cx)?;
158             let _ = self.poll_flush(cx)?;
159 
160             // This could happen if reading paused before blocking on IO,
161             // such as getting to the end of a framed message, but then
162             // writing/flushing set the state back to Init. In that case,
163             // if the read buffer still had bytes, we'd want to try poll_read
164             // again, or else we wouldn't ever be woken up again.
165             //
166             // Using this instead of task::current() and notify() inside
167             // the Conn is noticeably faster in pipelined benchmarks.
168             if !self.conn.wants_read_again() {
169                 //break;
170                 return Poll::Ready(Ok(()));
171             }
172         }
173 
174         trace!("poll_loop yielding (self = {:p})", self);
175 
176         task::yield_now(cx).map(|never| match never {})
177     }
178 
poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>179     fn poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
180         loop {
181             if self.is_closing {
182                 return Poll::Ready(Ok(()));
183             } else if self.conn.can_read_head() {
184                 ready!(self.poll_read_head(cx))?;
185             } else if let Some(mut body) = self.body_tx.take() {
186                 if self.conn.can_read_body() {
187                     match body.poll_ready(cx) {
188                         Poll::Ready(Ok(())) => (),
189                         Poll::Pending => {
190                             self.body_tx = Some(body);
191                             return Poll::Pending;
192                         }
193                         Poll::Ready(Err(_canceled)) => {
194                             // user doesn't care about the body
195                             // so we should stop reading
196                             trace!("body receiver dropped before eof, draining or closing");
197                             self.conn.poll_drain_or_close_read(cx);
198                             continue;
199                         }
200                     }
201                     match self.conn.poll_read_body(cx) {
202                         Poll::Ready(Some(Ok(chunk))) => match body.try_send_data(chunk) {
203                             Ok(()) => {
204                                 self.body_tx = Some(body);
205                             }
206                             Err(_canceled) => {
207                                 if self.conn.can_read_body() {
208                                     trace!("body receiver dropped before eof, closing");
209                                     self.conn.close_read();
210                                 }
211                             }
212                         },
213                         Poll::Ready(None) => {
214                             // just drop, the body will close automatically
215                         }
216                         Poll::Pending => {
217                             self.body_tx = Some(body);
218                             return Poll::Pending;
219                         }
220                         Poll::Ready(Some(Err(e))) => {
221                             body.send_error(crate::Error::new_body(e));
222                         }
223                     }
224                 } else {
225                     // just drop, the body will close automatically
226                 }
227             } else {
228                 return self.conn.poll_read_keep_alive(cx);
229             }
230         }
231     }
232 
poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>233     fn poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
234         // can dispatch receive, or does it still care about, an incoming message?
235         match ready!(self.dispatch.poll_ready(cx)) {
236             Ok(()) => (),
237             Err(()) => {
238                 trace!("dispatch no longer receiving messages");
239                 self.close();
240                 return Poll::Ready(Ok(()));
241             }
242         }
243         // dispatch is ready for a message, try to read one
244         match ready!(self.conn.poll_read_head(cx)) {
245             Some(Ok((mut head, body_len, wants))) => {
246                 let body = match body_len {
247                     DecodedLength::ZERO => Body::empty(),
248                     other => {
249                         let (tx, rx) = Body::new_channel(other, wants.contains(Wants::EXPECT));
250                         self.body_tx = Some(tx);
251                         rx
252                     }
253                 };
254                 if wants.contains(Wants::UPGRADE) {
255                     let upgrade = self.conn.on_upgrade();
256                     debug_assert!(!upgrade.is_none(), "empty upgrade");
257                     debug_assert!(head.extensions.get::<OnUpgrade>().is_none(), "OnUpgrade already set");
258                     head.extensions.insert(upgrade);
259                 }
260                 self.dispatch.recv_msg(Ok((head, body)))?;
261                 Poll::Ready(Ok(()))
262             }
263             Some(Err(err)) => {
264                 debug!("read_head error: {}", err);
265                 self.dispatch.recv_msg(Err(err))?;
266                 // if here, the dispatcher gave the user the error
267                 // somewhere else. we still need to shutdown, but
268                 // not as a second error.
269                 self.close();
270                 Poll::Ready(Ok(()))
271             }
272             None => {
273                 // read eof, the write side will have been closed too unless
274                 // allow_read_close was set to true, in which case just do
275                 // nothing...
276                 debug_assert!(self.conn.is_read_closed());
277                 if self.conn.is_write_closed() {
278                     self.close();
279                 }
280                 Poll::Ready(Ok(()))
281             }
282         }
283     }
284 
poll_write(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>285     fn poll_write(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
286         loop {
287             if self.is_closing {
288                 return Poll::Ready(Ok(()));
289             } else if self.body_rx.is_none()
290                 && self.conn.can_write_head()
291                 && self.dispatch.should_poll()
292             {
293                 if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) {
294                     let (head, mut body) = msg.map_err(crate::Error::new_user_service)?;
295 
296                     // Check if the body knows its full data immediately.
297                     //
298                     // If so, we can skip a bit of bookkeeping that streaming
299                     // bodies need to do.
300                     if let Some(full) = crate::body::take_full_data(&mut body) {
301                         self.conn.write_full_msg(head, full);
302                         return Poll::Ready(Ok(()));
303                     }
304 
305                     let body_type = if body.is_end_stream() {
306                         self.body_rx.set(None);
307                         None
308                     } else {
309                         let btype = body
310                             .size_hint()
311                             .exact()
312                             .map(BodyLength::Known)
313                             .or_else(|| Some(BodyLength::Unknown));
314                         self.body_rx.set(Some(body));
315                         btype
316                     };
317                     self.conn.write_head(head, body_type);
318                 } else {
319                     self.close();
320                     return Poll::Ready(Ok(()));
321                 }
322             } else if !self.conn.can_buffer_body() {
323                 ready!(self.poll_flush(cx))?;
324             } else {
325                 // A new scope is needed :(
326                 if let (Some(mut body), clear_body) =
327                     OptGuard::new(self.body_rx.as_mut()).guard_mut()
328                 {
329                     debug_assert!(!*clear_body, "opt guard defaults to keeping body");
330                     if !self.conn.can_write_body() {
331                         trace!(
332                             "no more write body allowed, user body is_end_stream = {}",
333                             body.is_end_stream(),
334                         );
335                         *clear_body = true;
336                         continue;
337                     }
338 
339                     let item = ready!(body.as_mut().poll_data(cx));
340                     if let Some(item) = item {
341                         let chunk = item.map_err(|e| {
342                             *clear_body = true;
343                             crate::Error::new_user_body(e)
344                         })?;
345                         let eos = body.is_end_stream();
346                         if eos {
347                             *clear_body = true;
348                             if chunk.remaining() == 0 {
349                                 trace!("discarding empty chunk");
350                                 self.conn.end_body()?;
351                             } else {
352                                 self.conn.write_body_and_end(chunk);
353                             }
354                         } else {
355                             if chunk.remaining() == 0 {
356                                 trace!("discarding empty chunk");
357                                 continue;
358                             }
359                             self.conn.write_body(chunk);
360                         }
361                     } else {
362                         *clear_body = true;
363                         self.conn.end_body()?;
364                     }
365                 } else {
366                     return Poll::Pending;
367                 }
368             }
369         }
370     }
371 
poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>372     fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
373         self.conn.poll_flush(cx).map_err(|err| {
374             debug!("error writing: {}", err);
375             crate::Error::new_body_write(err)
376         })
377     }
378 
close(&mut self)379     fn close(&mut self) {
380         self.is_closing = true;
381         self.conn.close_read();
382         self.conn.close_write();
383     }
384 
is_done(&self) -> bool385     fn is_done(&self) -> bool {
386         if self.is_closing {
387             return true;
388         }
389 
390         let read_done = self.conn.is_read_closed();
391 
392         if !T::should_read_first() && read_done {
393             // a client that cannot read may was well be done.
394             true
395         } else {
396             let write_done = self.conn.is_write_closed()
397                 || (!self.dispatch.should_poll() && self.body_rx.is_none());
398             read_done && write_done
399         }
400     }
401 }
402 
403 impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T>
404 where
405     D: Dispatch<
406         PollItem = MessageHead<T::Outgoing>,
407         PollBody = Bs,
408         RecvItem = MessageHead<T::Incoming>,
409     > + Unpin,
410     D::PollError: Into<Box<dyn StdError + Send + Sync>>,
411     I: AsyncRead + AsyncWrite + Unpin,
412     T: Http1Transaction + Unpin,
413     Bs: HttpBody + 'static,
414     Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
415 {
416     type Output = crate::Result<Dispatched>;
417 
418     #[inline]
poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>419     fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
420         self.poll_catch(cx, true)
421     }
422 }
423 
424 // ===== impl OptGuard =====
425 
426 /// A drop guard to allow a mutable borrow of an Option while being able to
427 /// set whether the `Option` should be cleared on drop.
428 struct OptGuard<'a, T>(Pin<&'a mut Option<T>>, bool);
429 
430 impl<'a, T> OptGuard<'a, T> {
new(pin: Pin<&'a mut Option<T>>) -> Self431     fn new(pin: Pin<&'a mut Option<T>>) -> Self {
432         OptGuard(pin, false)
433     }
434 
guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool)435     fn guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool) {
436         (self.0.as_mut().as_pin_mut(), &mut self.1)
437     }
438 }
439 
440 impl<'a, T> Drop for OptGuard<'a, T> {
drop(&mut self)441     fn drop(&mut self) {
442         if self.1 {
443             self.0.set(None);
444         }
445     }
446 }
447 
448 // ===== impl Server =====
449 
450 cfg_server! {
451     impl<S, B> Server<S, B>
452     where
453         S: HttpService<B>,
454     {
455         pub(crate) fn new(service: S) -> Server<S, B> {
456             Server {
457                 in_flight: Box::pin(None),
458                 service,
459             }
460         }
461 
462         pub(crate) fn into_service(self) -> S {
463             self.service
464         }
465     }
466 
467     // Service is never pinned
468     impl<S: HttpService<B>, B> Unpin for Server<S, B> {}
469 
470     impl<S, Bs> Dispatch for Server<S, Body>
471     where
472         S: HttpService<Body, ResBody = Bs>,
473         S::Error: Into<Box<dyn StdError + Send + Sync>>,
474         Bs: HttpBody,
475     {
476         type PollItem = MessageHead<http::StatusCode>;
477         type PollBody = Bs;
478         type PollError = S::Error;
479         type RecvItem = RequestHead;
480 
481         fn poll_msg(
482             mut self: Pin<&mut Self>,
483             cx: &mut task::Context<'_>,
484         ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> {
485             let mut this = self.as_mut();
486             let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() {
487                 let resp = ready!(fut.as_mut().poll(cx)?);
488                 let (parts, body) = resp.into_parts();
489                 let head = MessageHead {
490                     version: parts.version,
491                     subject: parts.status,
492                     headers: parts.headers,
493                     extensions: parts.extensions,
494                 };
495                 Poll::Ready(Some(Ok((head, body))))
496             } else {
497                 unreachable!("poll_msg shouldn't be called if no inflight");
498             };
499 
500             // Since in_flight finished, remove it
501             this.in_flight.set(None);
502             ret
503         }
504 
505         fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
506             let (msg, body) = msg?;
507             let mut req = Request::new(body);
508             *req.method_mut() = msg.subject.0;
509             *req.uri_mut() = msg.subject.1;
510             *req.headers_mut() = msg.headers;
511             *req.version_mut() = msg.version;
512             *req.extensions_mut() = msg.extensions;
513             let fut = self.service.call(req);
514             self.in_flight.set(Some(fut));
515             Ok(())
516         }
517 
518         fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
519             if self.in_flight.is_some() {
520                 Poll::Pending
521             } else {
522                 self.service.poll_ready(cx).map_err(|_e| {
523                     // FIXME: return error value.
524                     trace!("service closed");
525                 })
526             }
527         }
528 
529         fn should_poll(&self) -> bool {
530             self.in_flight.is_some()
531         }
532     }
533 }
534 
535 // ===== impl Client =====
536 
537 cfg_client! {
538     impl<B> Client<B> {
539         pub(crate) fn new(rx: ClientRx<B>) -> Client<B> {
540             Client {
541                 callback: None,
542                 rx,
543                 rx_closed: false,
544             }
545         }
546     }
547 
548     impl<B> Dispatch for Client<B>
549     where
550         B: HttpBody,
551     {
552         type PollItem = RequestHead;
553         type PollBody = B;
554         type PollError = crate::common::Never;
555         type RecvItem = crate::proto::ResponseHead;
556 
557         fn poll_msg(
558             mut self: Pin<&mut Self>,
559             cx: &mut task::Context<'_>,
560         ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), crate::common::Never>>> {
561             let mut this = self.as_mut();
562             debug_assert!(!this.rx_closed);
563             match this.rx.poll_recv(cx) {
564                 Poll::Ready(Some((req, mut cb))) => {
565                     // check that future hasn't been canceled already
566                     match cb.poll_canceled(cx) {
567                         Poll::Ready(()) => {
568                             trace!("request canceled");
569                             Poll::Ready(None)
570                         }
571                         Poll::Pending => {
572                             let (parts, body) = req.into_parts();
573                             let head = RequestHead {
574                                 version: parts.version,
575                                 subject: crate::proto::RequestLine(parts.method, parts.uri),
576                                 headers: parts.headers,
577                                 extensions: parts.extensions,
578                             };
579                             this.callback = Some(cb);
580                             Poll::Ready(Some(Ok((head, body))))
581                         }
582                     }
583                 }
584                 Poll::Ready(None) => {
585                     // user has dropped sender handle
586                     trace!("client tx closed");
587                     this.rx_closed = true;
588                     Poll::Ready(None)
589                 }
590                 Poll::Pending => Poll::Pending,
591             }
592         }
593 
594         fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
595             match msg {
596                 Ok((msg, body)) => {
597                     if let Some(cb) = self.callback.take() {
598                         let mut res = http::Response::new(body);
599                         *res.status_mut() = msg.subject;
600                         *res.headers_mut() = msg.headers;
601                         *res.version_mut() = msg.version;
602                         *res.extensions_mut() = msg.extensions;
603                         cb.send(Ok(res));
604                         Ok(())
605                     } else {
606                         // Getting here is likely a bug! An error should have happened
607                         // in Conn::require_empty_read() before ever parsing a
608                         // full message!
609                         Err(crate::Error::new_unexpected_message())
610                     }
611                 }
612                 Err(err) => {
613                     if let Some(cb) = self.callback.take() {
614                         cb.send(Err((err, None)));
615                         Ok(())
616                     } else if !self.rx_closed {
617                         self.rx.close();
618                         if let Some((req, cb)) = self.rx.try_recv() {
619                             trace!("canceling queued request with connection error: {}", err);
620                             // in this case, the message was never even started, so it's safe to tell
621                             // the user that the request was completely canceled
622                             cb.send(Err((crate::Error::new_canceled().with(err), Some(req))));
623                             Ok(())
624                         } else {
625                             Err(err)
626                         }
627                     } else {
628                         Err(err)
629                     }
630                 }
631             }
632         }
633 
634         fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
635             match self.callback {
636                 Some(ref mut cb) => match cb.poll_canceled(cx) {
637                     Poll::Ready(()) => {
638                         trace!("callback receiver has dropped");
639                         Poll::Ready(Err(()))
640                     }
641                     Poll::Pending => Poll::Ready(Ok(())),
642                 },
643                 None => Poll::Ready(Err(())),
644             }
645         }
646 
647         fn should_poll(&self) -> bool {
648             self.callback.is_none()
649         }
650     }
651 }
652 
653 #[cfg(test)]
654 mod tests {
655     use super::*;
656     use crate::proto::h1::ClientTransaction;
657     use std::time::Duration;
658 
659     #[test]
client_read_bytes_before_writing_request()660     fn client_read_bytes_before_writing_request() {
661         let _ = pretty_env_logger::try_init();
662 
663         tokio_test::task::spawn(()).enter(|cx, _| {
664             let (io, mut handle) = tokio_test::io::Builder::new().build_with_handle();
665 
666             // Block at 0 for now, but we will release this response before
667             // the request is ready to write later...
668             //let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 0);
669             let (mut tx, rx) = crate::client::dispatch::channel();
670             let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
671             let mut dispatcher = Dispatcher::new(Client::new(rx), conn);
672 
673             // First poll is needed to allow tx to send...
674             assert!(Pin::new(&mut dispatcher).poll(cx).is_pending());
675 
676             // Unblock our IO, which has a response before we've sent request!
677             //
678             handle.read(b"HTTP/1.1 200 OK\r\n\r\n");
679 
680             let mut res_rx = tx
681                 .try_send(crate::Request::new(crate::Body::empty()))
682                 .unwrap();
683 
684             tokio_test::assert_ready_ok!(Pin::new(&mut dispatcher).poll(cx));
685             let err = tokio_test::assert_ready_ok!(Pin::new(&mut res_rx).poll(cx))
686                 .expect_err("callback should send error");
687 
688             match (err.0.kind(), err.1) {
689                 (&crate::error::Kind::Canceled, Some(_)) => (),
690                 other => panic!("expected Canceled, got {:?}", other),
691             }
692         });
693     }
694 
695     #[tokio::test]
body_empty_chunks_ignored()696     async fn body_empty_chunks_ignored() {
697         let _ = pretty_env_logger::try_init();
698 
699         let io = tokio_test::io::Builder::new()
700             // no reading or writing, just be blocked for the test...
701             .wait(Duration::from_secs(5))
702             .build();
703 
704         let (mut tx, rx) = crate::client::dispatch::channel();
705         let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
706         let mut dispatcher = tokio_test::task::spawn(Dispatcher::new(Client::new(rx), conn));
707 
708         // First poll is needed to allow tx to send...
709         assert!(dispatcher.poll().is_pending());
710 
711         let body = {
712             let (mut tx, body) = crate::Body::channel();
713             tx.try_send_data("".into()).unwrap();
714             body
715         };
716 
717         let _res_rx = tx.try_send(crate::Request::new(body)).unwrap();
718 
719         // Ensure conn.write_body wasn't called with the empty chunk.
720         // If it is, it will trigger an assertion.
721         assert!(dispatcher.poll().is_pending());
722     }
723 }
724