1 use std::error::Error as StdError; 2 3 use bytes::{Buf, Bytes}; 4 use http::Request; 5 use tokio::io::{AsyncRead, AsyncWrite}; 6 7 use super::{Http1Transaction, Wants}; 8 use crate::body::{Body, DecodedLength, HttpBody}; 9 use crate::common::{task, Future, Pin, Poll, Unpin}; 10 use crate::proto::{ 11 BodyLength, Conn, Dispatched, MessageHead, RequestHead, 12 }; 13 use crate::upgrade::OnUpgrade; 14 15 pub(crate) struct Dispatcher<D, Bs: HttpBody, I, T> { 16 conn: Conn<I, Bs::Data, T>, 17 dispatch: D, 18 body_tx: Option<crate::body::Sender>, 19 body_rx: Pin<Box<Option<Bs>>>, 20 is_closing: bool, 21 } 22 23 pub(crate) trait Dispatch { 24 type PollItem; 25 type PollBody; 26 type PollError; 27 type RecvItem; poll_msg( self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>28 fn poll_msg( 29 self: Pin<&mut Self>, 30 cx: &mut task::Context<'_>, 31 ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>; recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>32 fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>; poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>33 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>; should_poll(&self) -> bool34 fn should_poll(&self) -> bool; 35 } 36 37 cfg_server! { 38 use crate::service::HttpService; 39 40 pub(crate) struct Server<S: HttpService<B>, B> { 41 in_flight: Pin<Box<Option<S::Future>>>, 42 pub(crate) service: S, 43 } 44 } 45 46 cfg_client! { 47 pub(crate) struct Client<B> { 48 callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>, 49 rx: ClientRx<B>, 50 rx_closed: bool, 51 } 52 53 type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, http::Response<Body>>; 54 } 55 56 impl<D, Bs, I, T> Dispatcher<D, Bs, I, T> 57 where 58 D: Dispatch< 59 PollItem = MessageHead<T::Outgoing>, 60 PollBody = Bs, 61 RecvItem = MessageHead<T::Incoming>, 62 > + Unpin, 63 D::PollError: Into<Box<dyn StdError + Send + Sync>>, 64 I: AsyncRead + AsyncWrite + Unpin, 65 T: Http1Transaction + Unpin, 66 Bs: HttpBody + 'static, 67 Bs::Error: Into<Box<dyn StdError + Send + Sync>>, 68 { new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self69 pub(crate) fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self { 70 Dispatcher { 71 conn, 72 dispatch, 73 body_tx: None, 74 body_rx: Box::pin(None), 75 is_closing: false, 76 } 77 } 78 79 #[cfg(feature = "server")] disable_keep_alive(&mut self)80 pub(crate) fn disable_keep_alive(&mut self) { 81 self.conn.disable_keep_alive(); 82 if self.conn.is_write_closed() { 83 self.close(); 84 } 85 } 86 into_inner(self) -> (I, Bytes, D)87 pub(crate) fn into_inner(self) -> (I, Bytes, D) { 88 let (io, buf) = self.conn.into_inner(); 89 (io, buf, self.dispatch) 90 } 91 92 /// Run this dispatcher until HTTP says this connection is done, 93 /// but don't call `AsyncWrite::shutdown` on the underlying IO. 94 /// 95 /// This is useful for old-style HTTP upgrades, but ignores 96 /// newer-style upgrade API. poll_without_shutdown( &mut self, cx: &mut task::Context<'_>, ) -> Poll<crate::Result<()>> where Self: Unpin,97 pub(crate) fn poll_without_shutdown( 98 &mut self, 99 cx: &mut task::Context<'_>, 100 ) -> Poll<crate::Result<()>> 101 where 102 Self: Unpin, 103 { 104 Pin::new(self).poll_catch(cx, false).map_ok(|ds| { 105 if let Dispatched::Upgrade(pending) = ds { 106 pending.manual(); 107 } 108 }) 109 } 110 poll_catch( &mut self, cx: &mut task::Context<'_>, should_shutdown: bool, ) -> Poll<crate::Result<Dispatched>>111 fn poll_catch( 112 &mut self, 113 cx: &mut task::Context<'_>, 114 should_shutdown: bool, 115 ) -> Poll<crate::Result<Dispatched>> { 116 Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| { 117 // An error means we're shutting down either way. 118 // We just try to give the error to the user, 119 // and close the connection with an Ok. If we 120 // cannot give it to the user, then return the Err. 121 self.dispatch.recv_msg(Err(e))?; 122 Ok(Dispatched::Shutdown) 123 })) 124 } 125 poll_inner( &mut self, cx: &mut task::Context<'_>, should_shutdown: bool, ) -> Poll<crate::Result<Dispatched>>126 fn poll_inner( 127 &mut self, 128 cx: &mut task::Context<'_>, 129 should_shutdown: bool, 130 ) -> Poll<crate::Result<Dispatched>> { 131 T::update_date(); 132 133 ready!(self.poll_loop(cx))?; 134 135 if self.is_done() { 136 if let Some(pending) = self.conn.pending_upgrade() { 137 self.conn.take_error()?; 138 return Poll::Ready(Ok(Dispatched::Upgrade(pending))); 139 } else if should_shutdown { 140 ready!(self.conn.poll_shutdown(cx)).map_err(crate::Error::new_shutdown)?; 141 } 142 self.conn.take_error()?; 143 Poll::Ready(Ok(Dispatched::Shutdown)) 144 } else { 145 Poll::Pending 146 } 147 } 148 poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>149 fn poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 150 // Limit the looping on this connection, in case it is ready far too 151 // often, so that other futures don't starve. 152 // 153 // 16 was chosen arbitrarily, as that is number of pipelined requests 154 // benchmarks often use. Perhaps it should be a config option instead. 155 for _ in 0..16 { 156 let _ = self.poll_read(cx)?; 157 let _ = self.poll_write(cx)?; 158 let _ = self.poll_flush(cx)?; 159 160 // This could happen if reading paused before blocking on IO, 161 // such as getting to the end of a framed message, but then 162 // writing/flushing set the state back to Init. In that case, 163 // if the read buffer still had bytes, we'd want to try poll_read 164 // again, or else we wouldn't ever be woken up again. 165 // 166 // Using this instead of task::current() and notify() inside 167 // the Conn is noticeably faster in pipelined benchmarks. 168 if !self.conn.wants_read_again() { 169 //break; 170 return Poll::Ready(Ok(())); 171 } 172 } 173 174 trace!("poll_loop yielding (self = {:p})", self); 175 176 task::yield_now(cx).map(|never| match never {}) 177 } 178 poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>179 fn poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 180 loop { 181 if self.is_closing { 182 return Poll::Ready(Ok(())); 183 } else if self.conn.can_read_head() { 184 ready!(self.poll_read_head(cx))?; 185 } else if let Some(mut body) = self.body_tx.take() { 186 if self.conn.can_read_body() { 187 match body.poll_ready(cx) { 188 Poll::Ready(Ok(())) => (), 189 Poll::Pending => { 190 self.body_tx = Some(body); 191 return Poll::Pending; 192 } 193 Poll::Ready(Err(_canceled)) => { 194 // user doesn't care about the body 195 // so we should stop reading 196 trace!("body receiver dropped before eof, draining or closing"); 197 self.conn.poll_drain_or_close_read(cx); 198 continue; 199 } 200 } 201 match self.conn.poll_read_body(cx) { 202 Poll::Ready(Some(Ok(chunk))) => match body.try_send_data(chunk) { 203 Ok(()) => { 204 self.body_tx = Some(body); 205 } 206 Err(_canceled) => { 207 if self.conn.can_read_body() { 208 trace!("body receiver dropped before eof, closing"); 209 self.conn.close_read(); 210 } 211 } 212 }, 213 Poll::Ready(None) => { 214 // just drop, the body will close automatically 215 } 216 Poll::Pending => { 217 self.body_tx = Some(body); 218 return Poll::Pending; 219 } 220 Poll::Ready(Some(Err(e))) => { 221 body.send_error(crate::Error::new_body(e)); 222 } 223 } 224 } else { 225 // just drop, the body will close automatically 226 } 227 } else { 228 return self.conn.poll_read_keep_alive(cx); 229 } 230 } 231 } 232 poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>233 fn poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 234 // can dispatch receive, or does it still care about, an incoming message? 235 match ready!(self.dispatch.poll_ready(cx)) { 236 Ok(()) => (), 237 Err(()) => { 238 trace!("dispatch no longer receiving messages"); 239 self.close(); 240 return Poll::Ready(Ok(())); 241 } 242 } 243 // dispatch is ready for a message, try to read one 244 match ready!(self.conn.poll_read_head(cx)) { 245 Some(Ok((mut head, body_len, wants))) => { 246 let body = match body_len { 247 DecodedLength::ZERO => Body::empty(), 248 other => { 249 let (tx, rx) = Body::new_channel(other, wants.contains(Wants::EXPECT)); 250 self.body_tx = Some(tx); 251 rx 252 } 253 }; 254 if wants.contains(Wants::UPGRADE) { 255 let upgrade = self.conn.on_upgrade(); 256 debug_assert!(!upgrade.is_none(), "empty upgrade"); 257 debug_assert!(head.extensions.get::<OnUpgrade>().is_none(), "OnUpgrade already set"); 258 head.extensions.insert(upgrade); 259 } 260 self.dispatch.recv_msg(Ok((head, body)))?; 261 Poll::Ready(Ok(())) 262 } 263 Some(Err(err)) => { 264 debug!("read_head error: {}", err); 265 self.dispatch.recv_msg(Err(err))?; 266 // if here, the dispatcher gave the user the error 267 // somewhere else. we still need to shutdown, but 268 // not as a second error. 269 self.close(); 270 Poll::Ready(Ok(())) 271 } 272 None => { 273 // read eof, the write side will have been closed too unless 274 // allow_read_close was set to true, in which case just do 275 // nothing... 276 debug_assert!(self.conn.is_read_closed()); 277 if self.conn.is_write_closed() { 278 self.close(); 279 } 280 Poll::Ready(Ok(())) 281 } 282 } 283 } 284 poll_write(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>285 fn poll_write(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 286 loop { 287 if self.is_closing { 288 return Poll::Ready(Ok(())); 289 } else if self.body_rx.is_none() 290 && self.conn.can_write_head() 291 && self.dispatch.should_poll() 292 { 293 if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) { 294 let (head, mut body) = msg.map_err(crate::Error::new_user_service)?; 295 296 // Check if the body knows its full data immediately. 297 // 298 // If so, we can skip a bit of bookkeeping that streaming 299 // bodies need to do. 300 if let Some(full) = crate::body::take_full_data(&mut body) { 301 self.conn.write_full_msg(head, full); 302 return Poll::Ready(Ok(())); 303 } 304 305 let body_type = if body.is_end_stream() { 306 self.body_rx.set(None); 307 None 308 } else { 309 let btype = body 310 .size_hint() 311 .exact() 312 .map(BodyLength::Known) 313 .or_else(|| Some(BodyLength::Unknown)); 314 self.body_rx.set(Some(body)); 315 btype 316 }; 317 self.conn.write_head(head, body_type); 318 } else { 319 self.close(); 320 return Poll::Ready(Ok(())); 321 } 322 } else if !self.conn.can_buffer_body() { 323 ready!(self.poll_flush(cx))?; 324 } else { 325 // A new scope is needed :( 326 if let (Some(mut body), clear_body) = 327 OptGuard::new(self.body_rx.as_mut()).guard_mut() 328 { 329 debug_assert!(!*clear_body, "opt guard defaults to keeping body"); 330 if !self.conn.can_write_body() { 331 trace!( 332 "no more write body allowed, user body is_end_stream = {}", 333 body.is_end_stream(), 334 ); 335 *clear_body = true; 336 continue; 337 } 338 339 let item = ready!(body.as_mut().poll_data(cx)); 340 if let Some(item) = item { 341 let chunk = item.map_err(|e| { 342 *clear_body = true; 343 crate::Error::new_user_body(e) 344 })?; 345 let eos = body.is_end_stream(); 346 if eos { 347 *clear_body = true; 348 if chunk.remaining() == 0 { 349 trace!("discarding empty chunk"); 350 self.conn.end_body()?; 351 } else { 352 self.conn.write_body_and_end(chunk); 353 } 354 } else { 355 if chunk.remaining() == 0 { 356 trace!("discarding empty chunk"); 357 continue; 358 } 359 self.conn.write_body(chunk); 360 } 361 } else { 362 *clear_body = true; 363 self.conn.end_body()?; 364 } 365 } else { 366 return Poll::Pending; 367 } 368 } 369 } 370 } 371 poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>372 fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 373 self.conn.poll_flush(cx).map_err(|err| { 374 debug!("error writing: {}", err); 375 crate::Error::new_body_write(err) 376 }) 377 } 378 close(&mut self)379 fn close(&mut self) { 380 self.is_closing = true; 381 self.conn.close_read(); 382 self.conn.close_write(); 383 } 384 is_done(&self) -> bool385 fn is_done(&self) -> bool { 386 if self.is_closing { 387 return true; 388 } 389 390 let read_done = self.conn.is_read_closed(); 391 392 if !T::should_read_first() && read_done { 393 // a client that cannot read may was well be done. 394 true 395 } else { 396 let write_done = self.conn.is_write_closed() 397 || (!self.dispatch.should_poll() && self.body_rx.is_none()); 398 read_done && write_done 399 } 400 } 401 } 402 403 impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T> 404 where 405 D: Dispatch< 406 PollItem = MessageHead<T::Outgoing>, 407 PollBody = Bs, 408 RecvItem = MessageHead<T::Incoming>, 409 > + Unpin, 410 D::PollError: Into<Box<dyn StdError + Send + Sync>>, 411 I: AsyncRead + AsyncWrite + Unpin, 412 T: Http1Transaction + Unpin, 413 Bs: HttpBody + 'static, 414 Bs::Error: Into<Box<dyn StdError + Send + Sync>>, 415 { 416 type Output = crate::Result<Dispatched>; 417 418 #[inline] poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>419 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { 420 self.poll_catch(cx, true) 421 } 422 } 423 424 // ===== impl OptGuard ===== 425 426 /// A drop guard to allow a mutable borrow of an Option while being able to 427 /// set whether the `Option` should be cleared on drop. 428 struct OptGuard<'a, T>(Pin<&'a mut Option<T>>, bool); 429 430 impl<'a, T> OptGuard<'a, T> { new(pin: Pin<&'a mut Option<T>>) -> Self431 fn new(pin: Pin<&'a mut Option<T>>) -> Self { 432 OptGuard(pin, false) 433 } 434 guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool)435 fn guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool) { 436 (self.0.as_mut().as_pin_mut(), &mut self.1) 437 } 438 } 439 440 impl<'a, T> Drop for OptGuard<'a, T> { drop(&mut self)441 fn drop(&mut self) { 442 if self.1 { 443 self.0.set(None); 444 } 445 } 446 } 447 448 // ===== impl Server ===== 449 450 cfg_server! { 451 impl<S, B> Server<S, B> 452 where 453 S: HttpService<B>, 454 { 455 pub(crate) fn new(service: S) -> Server<S, B> { 456 Server { 457 in_flight: Box::pin(None), 458 service, 459 } 460 } 461 462 pub(crate) fn into_service(self) -> S { 463 self.service 464 } 465 } 466 467 // Service is never pinned 468 impl<S: HttpService<B>, B> Unpin for Server<S, B> {} 469 470 impl<S, Bs> Dispatch for Server<S, Body> 471 where 472 S: HttpService<Body, ResBody = Bs>, 473 S::Error: Into<Box<dyn StdError + Send + Sync>>, 474 Bs: HttpBody, 475 { 476 type PollItem = MessageHead<http::StatusCode>; 477 type PollBody = Bs; 478 type PollError = S::Error; 479 type RecvItem = RequestHead; 480 481 fn poll_msg( 482 mut self: Pin<&mut Self>, 483 cx: &mut task::Context<'_>, 484 ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> { 485 let mut this = self.as_mut(); 486 let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() { 487 let resp = ready!(fut.as_mut().poll(cx)?); 488 let (parts, body) = resp.into_parts(); 489 let head = MessageHead { 490 version: parts.version, 491 subject: parts.status, 492 headers: parts.headers, 493 extensions: parts.extensions, 494 }; 495 Poll::Ready(Some(Ok((head, body)))) 496 } else { 497 unreachable!("poll_msg shouldn't be called if no inflight"); 498 }; 499 500 // Since in_flight finished, remove it 501 this.in_flight.set(None); 502 ret 503 } 504 505 fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> { 506 let (msg, body) = msg?; 507 let mut req = Request::new(body); 508 *req.method_mut() = msg.subject.0; 509 *req.uri_mut() = msg.subject.1; 510 *req.headers_mut() = msg.headers; 511 *req.version_mut() = msg.version; 512 *req.extensions_mut() = msg.extensions; 513 let fut = self.service.call(req); 514 self.in_flight.set(Some(fut)); 515 Ok(()) 516 } 517 518 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> { 519 if self.in_flight.is_some() { 520 Poll::Pending 521 } else { 522 self.service.poll_ready(cx).map_err(|_e| { 523 // FIXME: return error value. 524 trace!("service closed"); 525 }) 526 } 527 } 528 529 fn should_poll(&self) -> bool { 530 self.in_flight.is_some() 531 } 532 } 533 } 534 535 // ===== impl Client ===== 536 537 cfg_client! { 538 impl<B> Client<B> { 539 pub(crate) fn new(rx: ClientRx<B>) -> Client<B> { 540 Client { 541 callback: None, 542 rx, 543 rx_closed: false, 544 } 545 } 546 } 547 548 impl<B> Dispatch for Client<B> 549 where 550 B: HttpBody, 551 { 552 type PollItem = RequestHead; 553 type PollBody = B; 554 type PollError = crate::common::Never; 555 type RecvItem = crate::proto::ResponseHead; 556 557 fn poll_msg( 558 mut self: Pin<&mut Self>, 559 cx: &mut task::Context<'_>, 560 ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), crate::common::Never>>> { 561 let mut this = self.as_mut(); 562 debug_assert!(!this.rx_closed); 563 match this.rx.poll_recv(cx) { 564 Poll::Ready(Some((req, mut cb))) => { 565 // check that future hasn't been canceled already 566 match cb.poll_canceled(cx) { 567 Poll::Ready(()) => { 568 trace!("request canceled"); 569 Poll::Ready(None) 570 } 571 Poll::Pending => { 572 let (parts, body) = req.into_parts(); 573 let head = RequestHead { 574 version: parts.version, 575 subject: crate::proto::RequestLine(parts.method, parts.uri), 576 headers: parts.headers, 577 extensions: parts.extensions, 578 }; 579 this.callback = Some(cb); 580 Poll::Ready(Some(Ok((head, body)))) 581 } 582 } 583 } 584 Poll::Ready(None) => { 585 // user has dropped sender handle 586 trace!("client tx closed"); 587 this.rx_closed = true; 588 Poll::Ready(None) 589 } 590 Poll::Pending => Poll::Pending, 591 } 592 } 593 594 fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> { 595 match msg { 596 Ok((msg, body)) => { 597 if let Some(cb) = self.callback.take() { 598 let mut res = http::Response::new(body); 599 *res.status_mut() = msg.subject; 600 *res.headers_mut() = msg.headers; 601 *res.version_mut() = msg.version; 602 *res.extensions_mut() = msg.extensions; 603 cb.send(Ok(res)); 604 Ok(()) 605 } else { 606 // Getting here is likely a bug! An error should have happened 607 // in Conn::require_empty_read() before ever parsing a 608 // full message! 609 Err(crate::Error::new_unexpected_message()) 610 } 611 } 612 Err(err) => { 613 if let Some(cb) = self.callback.take() { 614 cb.send(Err((err, None))); 615 Ok(()) 616 } else if !self.rx_closed { 617 self.rx.close(); 618 if let Some((req, cb)) = self.rx.try_recv() { 619 trace!("canceling queued request with connection error: {}", err); 620 // in this case, the message was never even started, so it's safe to tell 621 // the user that the request was completely canceled 622 cb.send(Err((crate::Error::new_canceled().with(err), Some(req)))); 623 Ok(()) 624 } else { 625 Err(err) 626 } 627 } else { 628 Err(err) 629 } 630 } 631 } 632 } 633 634 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> { 635 match self.callback { 636 Some(ref mut cb) => match cb.poll_canceled(cx) { 637 Poll::Ready(()) => { 638 trace!("callback receiver has dropped"); 639 Poll::Ready(Err(())) 640 } 641 Poll::Pending => Poll::Ready(Ok(())), 642 }, 643 None => Poll::Ready(Err(())), 644 } 645 } 646 647 fn should_poll(&self) -> bool { 648 self.callback.is_none() 649 } 650 } 651 } 652 653 #[cfg(test)] 654 mod tests { 655 use super::*; 656 use crate::proto::h1::ClientTransaction; 657 use std::time::Duration; 658 659 #[test] client_read_bytes_before_writing_request()660 fn client_read_bytes_before_writing_request() { 661 let _ = pretty_env_logger::try_init(); 662 663 tokio_test::task::spawn(()).enter(|cx, _| { 664 let (io, mut handle) = tokio_test::io::Builder::new().build_with_handle(); 665 666 // Block at 0 for now, but we will release this response before 667 // the request is ready to write later... 668 //let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 0); 669 let (mut tx, rx) = crate::client::dispatch::channel(); 670 let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io); 671 let mut dispatcher = Dispatcher::new(Client::new(rx), conn); 672 673 // First poll is needed to allow tx to send... 674 assert!(Pin::new(&mut dispatcher).poll(cx).is_pending()); 675 676 // Unblock our IO, which has a response before we've sent request! 677 // 678 handle.read(b"HTTP/1.1 200 OK\r\n\r\n"); 679 680 let mut res_rx = tx 681 .try_send(crate::Request::new(crate::Body::empty())) 682 .unwrap(); 683 684 tokio_test::assert_ready_ok!(Pin::new(&mut dispatcher).poll(cx)); 685 let err = tokio_test::assert_ready_ok!(Pin::new(&mut res_rx).poll(cx)) 686 .expect_err("callback should send error"); 687 688 match (err.0.kind(), err.1) { 689 (&crate::error::Kind::Canceled, Some(_)) => (), 690 other => panic!("expected Canceled, got {:?}", other), 691 } 692 }); 693 } 694 695 #[tokio::test] body_empty_chunks_ignored()696 async fn body_empty_chunks_ignored() { 697 let _ = pretty_env_logger::try_init(); 698 699 let io = tokio_test::io::Builder::new() 700 // no reading or writing, just be blocked for the test... 701 .wait(Duration::from_secs(5)) 702 .build(); 703 704 let (mut tx, rx) = crate::client::dispatch::channel(); 705 let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io); 706 let mut dispatcher = tokio_test::task::spawn(Dispatcher::new(Client::new(rx), conn)); 707 708 // First poll is needed to allow tx to send... 709 assert!(dispatcher.poll().is_pending()); 710 711 let body = { 712 let (mut tx, body) = crate::Body::channel(); 713 tx.try_send_data("".into()).unwrap(); 714 body 715 }; 716 717 let _res_rx = tx.try_send(crate::Request::new(body)).unwrap(); 718 719 // Ensure conn.write_body wasn't called with the empty chunk. 720 // If it is, it will trigger an assertion. 721 assert!(dispatcher.poll().is_pending()); 722 } 723 } 724