1 use std::error::Error as StdError;
2 
3 use bytes::{Buf, Bytes};
4 use http::{Request, Response, StatusCode};
5 use tokio::io::{AsyncRead, AsyncWrite};
6 
7 use super::{Http1Transaction, Wants};
8 use crate::body::{Body, HttpBody};
9 use crate::common::{task, Future, Never, Pin, Poll, Unpin};
10 use crate::proto::{
11     BodyLength, Conn, DecodedLength, Dispatched, MessageHead, RequestHead, RequestLine,
12     ResponseHead,
13 };
14 use crate::service::HttpService;
15 
16 pub(crate) struct Dispatcher<D, Bs: HttpBody, I, T> {
17     conn: Conn<I, Bs::Data, T>,
18     dispatch: D,
19     body_tx: Option<crate::body::Sender>,
20     body_rx: Pin<Box<Option<Bs>>>,
21     is_closing: bool,
22 }
23 
24 pub(crate) trait Dispatch {
25     type PollItem;
26     type PollBody;
27     type PollError;
28     type RecvItem;
poll_msg( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>29     fn poll_msg(
30         &mut self,
31         cx: &mut task::Context<'_>,
32     ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>;
recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>33     fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>;
poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>34     fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>;
should_poll(&self) -> bool35     fn should_poll(&self) -> bool;
36 }
37 
38 pub struct Server<S: HttpService<B>, B> {
39     in_flight: Pin<Box<Option<S::Future>>>,
40     pub(crate) service: S,
41 }
42 
43 pub struct Client<B> {
44     callback: Option<crate::client::dispatch::Callback<Request<B>, Response<Body>>>,
45     rx: ClientRx<B>,
46     rx_closed: bool,
47 }
48 
49 type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<Body>>;
50 
51 impl<D, Bs, I, T> Dispatcher<D, Bs, I, T>
52 where
53     D: Dispatch<
54             PollItem = MessageHead<T::Outgoing>,
55             PollBody = Bs,
56             RecvItem = MessageHead<T::Incoming>,
57         > + Unpin,
58     D::PollError: Into<Box<dyn StdError + Send + Sync>>,
59     I: AsyncRead + AsyncWrite + Unpin,
60     T: Http1Transaction + Unpin,
61     Bs: HttpBody + 'static,
62     Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
63 {
new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self64     pub fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self {
65         Dispatcher {
66             conn,
67             dispatch,
68             body_tx: None,
69             body_rx: Box::pin(None),
70             is_closing: false,
71         }
72     }
73 
disable_keep_alive(&mut self)74     pub fn disable_keep_alive(&mut self) {
75         self.conn.disable_keep_alive();
76         if self.conn.is_write_closed() {
77             self.close();
78         }
79     }
80 
into_inner(self) -> (I, Bytes, D)81     pub fn into_inner(self) -> (I, Bytes, D) {
82         let (io, buf) = self.conn.into_inner();
83         (io, buf, self.dispatch)
84     }
85 
86     /// Run this dispatcher until HTTP says this connection is done,
87     /// but don't call `AsyncWrite::shutdown` on the underlying IO.
88     ///
89     /// This is useful for old-style HTTP upgrades, but ignores
90     /// newer-style upgrade API.
poll_without_shutdown( &mut self, cx: &mut task::Context<'_>, ) -> Poll<crate::Result<()>> where Self: Unpin,91     pub(crate) fn poll_without_shutdown(
92         &mut self,
93         cx: &mut task::Context<'_>,
94     ) -> Poll<crate::Result<()>>
95     where
96         Self: Unpin,
97     {
98         Pin::new(self).poll_catch(cx, false).map_ok(|ds| {
99             if let Dispatched::Upgrade(pending) = ds {
100                 pending.manual();
101             }
102         })
103     }
104 
poll_catch( &mut self, cx: &mut task::Context<'_>, should_shutdown: bool, ) -> Poll<crate::Result<Dispatched>>105     fn poll_catch(
106         &mut self,
107         cx: &mut task::Context<'_>,
108         should_shutdown: bool,
109     ) -> Poll<crate::Result<Dispatched>> {
110         Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| {
111             // An error means we're shutting down either way.
112             // We just try to give the error to the user,
113             // and close the connection with an Ok. If we
114             // cannot give it to the user, then return the Err.
115             self.dispatch.recv_msg(Err(e))?;
116             Ok(Dispatched::Shutdown)
117         }))
118     }
119 
poll_inner( &mut self, cx: &mut task::Context<'_>, should_shutdown: bool, ) -> Poll<crate::Result<Dispatched>>120     fn poll_inner(
121         &mut self,
122         cx: &mut task::Context<'_>,
123         should_shutdown: bool,
124     ) -> Poll<crate::Result<Dispatched>> {
125         T::update_date();
126 
127         ready!(self.poll_loop(cx))?;
128 
129         if self.is_done() {
130             if let Some(pending) = self.conn.pending_upgrade() {
131                 self.conn.take_error()?;
132                 return Poll::Ready(Ok(Dispatched::Upgrade(pending)));
133             } else if should_shutdown {
134                 ready!(self.conn.poll_shutdown(cx)).map_err(crate::Error::new_shutdown)?;
135             }
136             self.conn.take_error()?;
137             Poll::Ready(Ok(Dispatched::Shutdown))
138         } else {
139             Poll::Pending
140         }
141     }
142 
poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>143     fn poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
144         // Limit the looping on this connection, in case it is ready far too
145         // often, so that other futures don't starve.
146         //
147         // 16 was chosen arbitrarily, as that is number of pipelined requests
148         // benchmarks often use. Perhaps it should be a config option instead.
149         for _ in 0..16 {
150             let _ = self.poll_read(cx)?;
151             let _ = self.poll_write(cx)?;
152             let _ = self.poll_flush(cx)?;
153 
154             // This could happen if reading paused before blocking on IO,
155             // such as getting to the end of a framed message, but then
156             // writing/flushing set the state back to Init. In that case,
157             // if the read buffer still had bytes, we'd want to try poll_read
158             // again, or else we wouldn't ever be woken up again.
159             //
160             // Using this instead of task::current() and notify() inside
161             // the Conn is noticeably faster in pipelined benchmarks.
162             if !self.conn.wants_read_again() {
163                 //break;
164                 return Poll::Ready(Ok(()));
165             }
166         }
167 
168         trace!("poll_loop yielding (self = {:p})", self);
169 
170         task::yield_now(cx).map(|never| match never {})
171     }
172 
poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>173     fn poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
174         loop {
175             if self.is_closing {
176                 return Poll::Ready(Ok(()));
177             } else if self.conn.can_read_head() {
178                 ready!(self.poll_read_head(cx))?;
179             } else if let Some(mut body) = self.body_tx.take() {
180                 if self.conn.can_read_body() {
181                     match body.poll_ready(cx) {
182                         Poll::Ready(Ok(())) => (),
183                         Poll::Pending => {
184                             self.body_tx = Some(body);
185                             return Poll::Pending;
186                         }
187                         Poll::Ready(Err(_canceled)) => {
188                             // user doesn't care about the body
189                             // so we should stop reading
190                             trace!("body receiver dropped before eof, draining or closing");
191                             self.conn.poll_drain_or_close_read(cx);
192                             continue;
193                         }
194                     }
195                     match self.conn.poll_read_body(cx) {
196                         Poll::Ready(Some(Ok(chunk))) => match body.try_send_data(chunk) {
197                             Ok(()) => {
198                                 self.body_tx = Some(body);
199                             }
200                             Err(_canceled) => {
201                                 if self.conn.can_read_body() {
202                                     trace!("body receiver dropped before eof, closing");
203                                     self.conn.close_read();
204                                 }
205                             }
206                         },
207                         Poll::Ready(None) => {
208                             // just drop, the body will close automatically
209                         }
210                         Poll::Pending => {
211                             self.body_tx = Some(body);
212                             return Poll::Pending;
213                         }
214                         Poll::Ready(Some(Err(e))) => {
215                             body.send_error(crate::Error::new_body(e));
216                         }
217                     }
218                 } else {
219                     // just drop, the body will close automatically
220                 }
221             } else {
222                 return self.conn.poll_read_keep_alive(cx);
223             }
224         }
225     }
226 
poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>227     fn poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
228         // can dispatch receive, or does it still care about, an incoming message?
229         match ready!(self.dispatch.poll_ready(cx)) {
230             Ok(()) => (),
231             Err(()) => {
232                 trace!("dispatch no longer receiving messages");
233                 self.close();
234                 return Poll::Ready(Ok(()));
235             }
236         }
237         // dispatch is ready for a message, try to read one
238         match ready!(self.conn.poll_read_head(cx)) {
239             Some(Ok((head, body_len, wants))) => {
240                 let mut body = match body_len {
241                     DecodedLength::ZERO => Body::empty(),
242                     other => {
243                         let (tx, rx) = Body::new_channel(other, wants.contains(Wants::EXPECT));
244                         self.body_tx = Some(tx);
245                         rx
246                     }
247                 };
248                 if wants.contains(Wants::UPGRADE) {
249                     body.set_on_upgrade(self.conn.on_upgrade());
250                 }
251                 self.dispatch.recv_msg(Ok((head, body)))?;
252                 Poll::Ready(Ok(()))
253             }
254             Some(Err(err)) => {
255                 debug!("read_head error: {}", err);
256                 self.dispatch.recv_msg(Err(err))?;
257                 // if here, the dispatcher gave the user the error
258                 // somewhere else. we still need to shutdown, but
259                 // not as a second error.
260                 self.close();
261                 Poll::Ready(Ok(()))
262             }
263             None => {
264                 // read eof, the write side will have been closed too unless
265                 // allow_read_close was set to true, in which case just do
266                 // nothing...
267                 debug_assert!(self.conn.is_read_closed());
268                 if self.conn.is_write_closed() {
269                     self.close();
270                 }
271                 Poll::Ready(Ok(()))
272             }
273         }
274     }
275 
poll_write(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>276     fn poll_write(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
277         loop {
278             if self.is_closing {
279                 return Poll::Ready(Ok(()));
280             } else if self.body_rx.is_none()
281                 && self.conn.can_write_head()
282                 && self.dispatch.should_poll()
283             {
284                 if let Some(msg) = ready!(self.dispatch.poll_msg(cx)) {
285                     let (head, mut body) = msg.map_err(crate::Error::new_user_service)?;
286 
287                     // Check if the body knows its full data immediately.
288                     //
289                     // If so, we can skip a bit of bookkeeping that streaming
290                     // bodies need to do.
291                     if let Some(full) = crate::body::take_full_data(&mut body) {
292                         self.conn.write_full_msg(head, full);
293                         return Poll::Ready(Ok(()));
294                     }
295 
296                     let body_type = if body.is_end_stream() {
297                         self.body_rx.set(None);
298                         None
299                     } else {
300                         let btype = body
301                             .size_hint()
302                             .exact()
303                             .map(BodyLength::Known)
304                             .or_else(|| Some(BodyLength::Unknown));
305                         self.body_rx.set(Some(body));
306                         btype
307                     };
308                     self.conn.write_head(head, body_type);
309                 } else {
310                     self.close();
311                     return Poll::Ready(Ok(()));
312                 }
313             } else if !self.conn.can_buffer_body() {
314                 ready!(self.poll_flush(cx))?;
315             } else {
316                 // A new scope is needed :(
317                 if let (Some(mut body), clear_body) =
318                     OptGuard::new(self.body_rx.as_mut()).guard_mut()
319                 {
320                     debug_assert!(!*clear_body, "opt guard defaults to keeping body");
321                     if !self.conn.can_write_body() {
322                         trace!(
323                             "no more write body allowed, user body is_end_stream = {}",
324                             body.is_end_stream(),
325                         );
326                         *clear_body = true;
327                         continue;
328                     }
329 
330                     let item = ready!(body.as_mut().poll_data(cx));
331                     if let Some(item) = item {
332                         let chunk = item.map_err(|e| {
333                             *clear_body = true;
334                             crate::Error::new_user_body(e)
335                         })?;
336                         let eos = body.is_end_stream();
337                         if eos {
338                             *clear_body = true;
339                             if chunk.remaining() == 0 {
340                                 trace!("discarding empty chunk");
341                                 self.conn.end_body()?;
342                             } else {
343                                 self.conn.write_body_and_end(chunk);
344                             }
345                         } else {
346                             if chunk.remaining() == 0 {
347                                 trace!("discarding empty chunk");
348                                 continue;
349                             }
350                             self.conn.write_body(chunk);
351                         }
352                     } else {
353                         *clear_body = true;
354                         self.conn.end_body()?;
355                     }
356                 } else {
357                     return Poll::Pending;
358                 }
359             }
360         }
361     }
362 
poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>363     fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
364         self.conn.poll_flush(cx).map_err(|err| {
365             debug!("error writing: {}", err);
366             crate::Error::new_body_write(err)
367         })
368     }
369 
close(&mut self)370     fn close(&mut self) {
371         self.is_closing = true;
372         self.conn.close_read();
373         self.conn.close_write();
374     }
375 
is_done(&self) -> bool376     fn is_done(&self) -> bool {
377         if self.is_closing {
378             return true;
379         }
380 
381         let read_done = self.conn.is_read_closed();
382 
383         if !T::should_read_first() && read_done {
384             // a client that cannot read may was well be done.
385             true
386         } else {
387             let write_done = self.conn.is_write_closed()
388                 || (!self.dispatch.should_poll() && self.body_rx.is_none());
389             read_done && write_done
390         }
391     }
392 }
393 
394 impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T>
395 where
396     D: Dispatch<
397             PollItem = MessageHead<T::Outgoing>,
398             PollBody = Bs,
399             RecvItem = MessageHead<T::Incoming>,
400         > + Unpin,
401     D::PollError: Into<Box<dyn StdError + Send + Sync>>,
402     I: AsyncRead + AsyncWrite + Unpin,
403     T: Http1Transaction + Unpin,
404     Bs: HttpBody + 'static,
405     Bs::Error: Into<Box<dyn StdError + Send + Sync>>,
406 {
407     type Output = crate::Result<Dispatched>;
408 
409     #[inline]
poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>410     fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> {
411         self.poll_catch(cx, true)
412     }
413 }
414 
415 // ===== impl OptGuard =====
416 
417 /// A drop guard to allow a mutable borrow of an Option while being able to
418 /// set whether the `Option` should be cleared on drop.
419 struct OptGuard<'a, T>(Pin<&'a mut Option<T>>, bool);
420 
421 impl<'a, T> OptGuard<'a, T> {
new(pin: Pin<&'a mut Option<T>>) -> Self422     fn new(pin: Pin<&'a mut Option<T>>) -> Self {
423         OptGuard(pin, false)
424     }
425 
guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool)426     fn guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool) {
427         (self.0.as_mut().as_pin_mut(), &mut self.1)
428     }
429 }
430 
431 impl<'a, T> Drop for OptGuard<'a, T> {
drop(&mut self)432     fn drop(&mut self) {
433         if self.1 {
434             self.0.set(None);
435         }
436     }
437 }
438 
439 // ===== impl Server =====
440 
441 impl<S, B> Server<S, B>
442 where
443     S: HttpService<B>,
444 {
new(service: S) -> Server<S, B>445     pub fn new(service: S) -> Server<S, B> {
446         Server {
447             in_flight: Box::pin(None),
448             service,
449         }
450     }
451 
into_service(self) -> S452     pub fn into_service(self) -> S {
453         self.service
454     }
455 }
456 
457 // Service is never pinned
458 impl<S: HttpService<B>, B> Unpin for Server<S, B> {}
459 
460 impl<S, Bs> Dispatch for Server<S, Body>
461 where
462     S: HttpService<Body, ResBody = Bs>,
463     S::Error: Into<Box<dyn StdError + Send + Sync>>,
464     Bs: HttpBody,
465 {
466     type PollItem = MessageHead<StatusCode>;
467     type PollBody = Bs;
468     type PollError = S::Error;
469     type RecvItem = RequestHead;
470 
poll_msg( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>471     fn poll_msg(
472         &mut self,
473         cx: &mut task::Context<'_>,
474     ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> {
475         let ret = if let Some(ref mut fut) = self.in_flight.as_mut().as_pin_mut() {
476             let resp = ready!(fut.as_mut().poll(cx)?);
477             let (parts, body) = resp.into_parts();
478             let head = MessageHead {
479                 version: parts.version,
480                 subject: parts.status,
481                 headers: parts.headers,
482             };
483             Poll::Ready(Some(Ok((head, body))))
484         } else {
485             unreachable!("poll_msg shouldn't be called if no inflight");
486         };
487 
488         // Since in_flight finished, remove it
489         self.in_flight.set(None);
490         ret
491     }
492 
recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>493     fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
494         let (msg, body) = msg?;
495         let mut req = Request::new(body);
496         *req.method_mut() = msg.subject.0;
497         *req.uri_mut() = msg.subject.1;
498         *req.headers_mut() = msg.headers;
499         *req.version_mut() = msg.version;
500         let fut = self.service.call(req);
501         self.in_flight.set(Some(fut));
502         Ok(())
503     }
504 
poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>505     fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
506         if self.in_flight.is_some() {
507             Poll::Pending
508         } else {
509             self.service.poll_ready(cx).map_err(|_e| {
510                 // FIXME: return error value.
511                 trace!("service closed");
512             })
513         }
514     }
515 
should_poll(&self) -> bool516     fn should_poll(&self) -> bool {
517         self.in_flight.is_some()
518     }
519 }
520 
521 // ===== impl Client =====
522 
523 impl<B> Client<B> {
new(rx: ClientRx<B>) -> Client<B>524     pub fn new(rx: ClientRx<B>) -> Client<B> {
525         Client {
526             callback: None,
527             rx,
528             rx_closed: false,
529         }
530     }
531 }
532 
533 impl<B> Dispatch for Client<B>
534 where
535     B: HttpBody,
536 {
537     type PollItem = RequestHead;
538     type PollBody = B;
539     type PollError = Never;
540     type RecvItem = ResponseHead;
541 
poll_msg( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Never>>>542     fn poll_msg(
543         &mut self,
544         cx: &mut task::Context<'_>,
545     ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Never>>> {
546         debug_assert!(!self.rx_closed);
547         match self.rx.poll_next(cx) {
548             Poll::Ready(Some((req, mut cb))) => {
549                 // check that future hasn't been canceled already
550                 match cb.poll_canceled(cx) {
551                     Poll::Ready(()) => {
552                         trace!("request canceled");
553                         Poll::Ready(None)
554                     }
555                     Poll::Pending => {
556                         let (parts, body) = req.into_parts();
557                         let head = RequestHead {
558                             version: parts.version,
559                             subject: RequestLine(parts.method, parts.uri),
560                             headers: parts.headers,
561                         };
562                         self.callback = Some(cb);
563                         Poll::Ready(Some(Ok((head, body))))
564                     }
565                 }
566             }
567             Poll::Ready(None) => {
568                 // user has dropped sender handle
569                 trace!("client tx closed");
570                 self.rx_closed = true;
571                 Poll::Ready(None)
572             }
573             Poll::Pending => Poll::Pending,
574         }
575     }
576 
recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>577     fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> {
578         match msg {
579             Ok((msg, body)) => {
580                 if let Some(cb) = self.callback.take() {
581                     let mut res = Response::new(body);
582                     *res.status_mut() = msg.subject;
583                     *res.headers_mut() = msg.headers;
584                     *res.version_mut() = msg.version;
585                     cb.send(Ok(res));
586                     Ok(())
587                 } else {
588                     // Getting here is likely a bug! An error should have happened
589                     // in Conn::require_empty_read() before ever parsing a
590                     // full message!
591                     Err(crate::Error::new_unexpected_message())
592                 }
593             }
594             Err(err) => {
595                 if let Some(cb) = self.callback.take() {
596                     cb.send(Err((err, None)));
597                     Ok(())
598                 } else if !self.rx_closed {
599                     self.rx.close();
600                     if let Some((req, cb)) = self.rx.try_recv() {
601                         trace!("canceling queued request with connection error: {}", err);
602                         // in this case, the message was never even started, so it's safe to tell
603                         // the user that the request was completely canceled
604                         cb.send(Err((crate::Error::new_canceled().with(err), Some(req))));
605                         Ok(())
606                     } else {
607                         Err(err)
608                     }
609                 } else {
610                     Err(err)
611                 }
612             }
613         }
614     }
615 
poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>616     fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> {
617         match self.callback {
618             Some(ref mut cb) => match cb.poll_canceled(cx) {
619                 Poll::Ready(()) => {
620                     trace!("callback receiver has dropped");
621                     Poll::Ready(Err(()))
622                 }
623                 Poll::Pending => Poll::Ready(Ok(())),
624             },
625             None => Poll::Ready(Err(())),
626         }
627     }
628 
should_poll(&self) -> bool629     fn should_poll(&self) -> bool {
630         self.callback.is_none()
631     }
632 }
633 
634 #[cfg(test)]
635 mod tests {
636     use super::*;
637     use crate::proto::h1::ClientTransaction;
638     use std::time::Duration;
639 
640     #[test]
client_read_bytes_before_writing_request()641     fn client_read_bytes_before_writing_request() {
642         let _ = pretty_env_logger::try_init();
643 
644         tokio_test::task::spawn(()).enter(|cx, _| {
645             let (io, mut handle) = tokio_test::io::Builder::new().build_with_handle();
646 
647             // Block at 0 for now, but we will release this response before
648             // the request is ready to write later...
649             //let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 0);
650             let (mut tx, rx) = crate::client::dispatch::channel();
651             let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
652             let mut dispatcher = Dispatcher::new(Client::new(rx), conn);
653 
654             // First poll is needed to allow tx to send...
655             assert!(Pin::new(&mut dispatcher).poll(cx).is_pending());
656 
657             // Unblock our IO, which has a response before we've sent request!
658             //
659             handle.read(b"HTTP/1.1 200 OK\r\n\r\n");
660 
661             let mut res_rx = tx
662                 .try_send(crate::Request::new(crate::Body::empty()))
663                 .unwrap();
664 
665             tokio_test::assert_ready_ok!(Pin::new(&mut dispatcher).poll(cx));
666             let err = tokio_test::assert_ready_ok!(Pin::new(&mut res_rx).poll(cx))
667                 .expect_err("callback should send error");
668 
669             match (err.0.kind(), err.1) {
670                 (&crate::error::Kind::Canceled, Some(_)) => (),
671                 other => panic!("expected Canceled, got {:?}", other),
672             }
673         });
674     }
675 
676     #[tokio::test]
body_empty_chunks_ignored()677     async fn body_empty_chunks_ignored() {
678         let _ = pretty_env_logger::try_init();
679 
680         let io = tokio_test::io::Builder::new()
681             // no reading or writing, just be blocked for the test...
682             .wait(Duration::from_secs(5))
683             .build();
684 
685         let (mut tx, rx) = crate::client::dispatch::channel();
686         let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io);
687         let mut dispatcher = tokio_test::task::spawn(Dispatcher::new(Client::new(rx), conn));
688 
689         // First poll is needed to allow tx to send...
690         assert!(dispatcher.poll().is_pending());
691 
692         let body = {
693             let (mut tx, body) = crate::Body::channel();
694             tx.try_send_data("".into()).unwrap();
695             body
696         };
697 
698         let _res_rx = tx.try_send(crate::Request::new(body)).unwrap();
699 
700         // Ensure conn.write_body wasn't called with the empty chunk.
701         // If it is, it will trigger an assertion.
702         assert!(dispatcher.poll().is_pending());
703     }
704 }
705