1 use std::error::Error as StdError; 2 3 use bytes::{Buf, Bytes}; 4 use http::Request; 5 use tokio::io::{AsyncRead, AsyncWrite}; 6 7 use super::{Http1Transaction, Wants}; 8 use crate::body::{Body, DecodedLength, HttpBody}; 9 use crate::common::{task, Future, Pin, Poll, Unpin}; 10 use crate::proto::{ 11 BodyLength, Conn, Dispatched, MessageHead, RequestHead, 12 }; 13 use crate::upgrade::OnUpgrade; 14 15 pub(crate) struct Dispatcher<D, Bs: HttpBody, I, T> { 16 conn: Conn<I, Bs::Data, T>, 17 dispatch: D, 18 body_tx: Option<crate::body::Sender>, 19 body_rx: Pin<Box<Option<Bs>>>, 20 is_closing: bool, 21 } 22 23 pub(crate) trait Dispatch { 24 type PollItem; 25 type PollBody; 26 type PollError; 27 type RecvItem; poll_msg( self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>28 fn poll_msg( 29 self: Pin<&mut Self>, 30 cx: &mut task::Context<'_>, 31 ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>; recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>32 fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>; poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>33 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>; should_poll(&self) -> bool34 fn should_poll(&self) -> bool; 35 } 36 37 cfg_server! { 38 use crate::service::HttpService; 39 40 pub(crate) struct Server<S: HttpService<B>, B> { 41 in_flight: Pin<Box<Option<S::Future>>>, 42 pub(crate) service: S, 43 } 44 } 45 46 cfg_client! { 47 pin_project_lite::pin_project! { 48 pub(crate) struct Client<B> { 49 callback: Option<crate::client::dispatch::Callback<Request<B>, http::Response<Body>>>, 50 #[pin] 51 rx: ClientRx<B>, 52 rx_closed: bool, 53 } 54 } 55 56 type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, http::Response<Body>>; 57 } 58 59 impl<D, Bs, I, T> Dispatcher<D, Bs, I, T> 60 where 61 D: Dispatch< 62 PollItem = MessageHead<T::Outgoing>, 63 PollBody = Bs, 64 RecvItem = MessageHead<T::Incoming>, 65 > + Unpin, 66 D::PollError: Into<Box<dyn StdError + Send + Sync>>, 67 I: AsyncRead + AsyncWrite + Unpin, 68 T: Http1Transaction + Unpin, 69 Bs: HttpBody + 'static, 70 Bs::Error: Into<Box<dyn StdError + Send + Sync>>, 71 { new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self72 pub(crate) fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self { 73 Dispatcher { 74 conn, 75 dispatch, 76 body_tx: None, 77 body_rx: Box::pin(None), 78 is_closing: false, 79 } 80 } 81 82 #[cfg(feature = "server")] disable_keep_alive(&mut self)83 pub(crate) fn disable_keep_alive(&mut self) { 84 self.conn.disable_keep_alive(); 85 if self.conn.is_write_closed() { 86 self.close(); 87 } 88 } 89 into_inner(self) -> (I, Bytes, D)90 pub(crate) fn into_inner(self) -> (I, Bytes, D) { 91 let (io, buf) = self.conn.into_inner(); 92 (io, buf, self.dispatch) 93 } 94 95 /// Run this dispatcher until HTTP says this connection is done, 96 /// but don't call `AsyncWrite::shutdown` on the underlying IO. 97 /// 98 /// This is useful for old-style HTTP upgrades, but ignores 99 /// newer-style upgrade API. poll_without_shutdown( &mut self, cx: &mut task::Context<'_>, ) -> Poll<crate::Result<()>> where Self: Unpin,100 pub(crate) fn poll_without_shutdown( 101 &mut self, 102 cx: &mut task::Context<'_>, 103 ) -> Poll<crate::Result<()>> 104 where 105 Self: Unpin, 106 { 107 Pin::new(self).poll_catch(cx, false).map_ok(|ds| { 108 if let Dispatched::Upgrade(pending) = ds { 109 pending.manual(); 110 } 111 }) 112 } 113 poll_catch( &mut self, cx: &mut task::Context<'_>, should_shutdown: bool, ) -> Poll<crate::Result<Dispatched>>114 fn poll_catch( 115 &mut self, 116 cx: &mut task::Context<'_>, 117 should_shutdown: bool, 118 ) -> Poll<crate::Result<Dispatched>> { 119 Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| { 120 // An error means we're shutting down either way. 121 // We just try to give the error to the user, 122 // and close the connection with an Ok. If we 123 // cannot give it to the user, then return the Err. 124 self.dispatch.recv_msg(Err(e))?; 125 Ok(Dispatched::Shutdown) 126 })) 127 } 128 poll_inner( &mut self, cx: &mut task::Context<'_>, should_shutdown: bool, ) -> Poll<crate::Result<Dispatched>>129 fn poll_inner( 130 &mut self, 131 cx: &mut task::Context<'_>, 132 should_shutdown: bool, 133 ) -> Poll<crate::Result<Dispatched>> { 134 T::update_date(); 135 136 ready!(self.poll_loop(cx))?; 137 138 if self.is_done() { 139 if let Some(pending) = self.conn.pending_upgrade() { 140 self.conn.take_error()?; 141 return Poll::Ready(Ok(Dispatched::Upgrade(pending))); 142 } else if should_shutdown { 143 ready!(self.conn.poll_shutdown(cx)).map_err(crate::Error::new_shutdown)?; 144 } 145 self.conn.take_error()?; 146 Poll::Ready(Ok(Dispatched::Shutdown)) 147 } else { 148 Poll::Pending 149 } 150 } 151 poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>152 fn poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 153 // Limit the looping on this connection, in case it is ready far too 154 // often, so that other futures don't starve. 155 // 156 // 16 was chosen arbitrarily, as that is number of pipelined requests 157 // benchmarks often use. Perhaps it should be a config option instead. 158 for _ in 0..16 { 159 let _ = self.poll_read(cx)?; 160 let _ = self.poll_write(cx)?; 161 let _ = self.poll_flush(cx)?; 162 163 // This could happen if reading paused before blocking on IO, 164 // such as getting to the end of a framed message, but then 165 // writing/flushing set the state back to Init. In that case, 166 // if the read buffer still had bytes, we'd want to try poll_read 167 // again, or else we wouldn't ever be woken up again. 168 // 169 // Using this instead of task::current() and notify() inside 170 // the Conn is noticeably faster in pipelined benchmarks. 171 if !self.conn.wants_read_again() { 172 //break; 173 return Poll::Ready(Ok(())); 174 } 175 } 176 177 trace!("poll_loop yielding (self = {:p})", self); 178 179 task::yield_now(cx).map(|never| match never {}) 180 } 181 poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>182 fn poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 183 loop { 184 if self.is_closing { 185 return Poll::Ready(Ok(())); 186 } else if self.conn.can_read_head() { 187 ready!(self.poll_read_head(cx))?; 188 } else if let Some(mut body) = self.body_tx.take() { 189 if self.conn.can_read_body() { 190 match body.poll_ready(cx) { 191 Poll::Ready(Ok(())) => (), 192 Poll::Pending => { 193 self.body_tx = Some(body); 194 return Poll::Pending; 195 } 196 Poll::Ready(Err(_canceled)) => { 197 // user doesn't care about the body 198 // so we should stop reading 199 trace!("body receiver dropped before eof, draining or closing"); 200 self.conn.poll_drain_or_close_read(cx); 201 continue; 202 } 203 } 204 match self.conn.poll_read_body(cx) { 205 Poll::Ready(Some(Ok(chunk))) => match body.try_send_data(chunk) { 206 Ok(()) => { 207 self.body_tx = Some(body); 208 } 209 Err(_canceled) => { 210 if self.conn.can_read_body() { 211 trace!("body receiver dropped before eof, closing"); 212 self.conn.close_read(); 213 } 214 } 215 }, 216 Poll::Ready(None) => { 217 // just drop, the body will close automatically 218 } 219 Poll::Pending => { 220 self.body_tx = Some(body); 221 return Poll::Pending; 222 } 223 Poll::Ready(Some(Err(e))) => { 224 body.send_error(crate::Error::new_body(e)); 225 } 226 } 227 } else { 228 // just drop, the body will close automatically 229 } 230 } else { 231 return self.conn.poll_read_keep_alive(cx); 232 } 233 } 234 } 235 poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>236 fn poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 237 // can dispatch receive, or does it still care about, an incoming message? 238 match ready!(self.dispatch.poll_ready(cx)) { 239 Ok(()) => (), 240 Err(()) => { 241 trace!("dispatch no longer receiving messages"); 242 self.close(); 243 return Poll::Ready(Ok(())); 244 } 245 } 246 // dispatch is ready for a message, try to read one 247 match ready!(self.conn.poll_read_head(cx)) { 248 Some(Ok((mut head, body_len, wants))) => { 249 let body = match body_len { 250 DecodedLength::ZERO => Body::empty(), 251 other => { 252 let (tx, rx) = Body::new_channel(other, wants.contains(Wants::EXPECT)); 253 self.body_tx = Some(tx); 254 rx 255 } 256 }; 257 if wants.contains(Wants::UPGRADE) { 258 let upgrade = self.conn.on_upgrade(); 259 debug_assert!(!upgrade.is_none(), "empty upgrade"); 260 debug_assert!(head.extensions.get::<OnUpgrade>().is_none(), "OnUpgrade already set"); 261 head.extensions.insert(upgrade); 262 } 263 self.dispatch.recv_msg(Ok((head, body)))?; 264 Poll::Ready(Ok(())) 265 } 266 Some(Err(err)) => { 267 debug!("read_head error: {}", err); 268 self.dispatch.recv_msg(Err(err))?; 269 // if here, the dispatcher gave the user the error 270 // somewhere else. we still need to shutdown, but 271 // not as a second error. 272 self.close(); 273 Poll::Ready(Ok(())) 274 } 275 None => { 276 // read eof, the write side will have been closed too unless 277 // allow_read_close was set to true, in which case just do 278 // nothing... 279 debug_assert!(self.conn.is_read_closed()); 280 if self.conn.is_write_closed() { 281 self.close(); 282 } 283 Poll::Ready(Ok(())) 284 } 285 } 286 } 287 poll_write(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>288 fn poll_write(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 289 loop { 290 if self.is_closing { 291 return Poll::Ready(Ok(())); 292 } else if self.body_rx.is_none() 293 && self.conn.can_write_head() 294 && self.dispatch.should_poll() 295 { 296 if let Some(msg) = ready!(Pin::new(&mut self.dispatch).poll_msg(cx)) { 297 let (head, mut body) = msg.map_err(crate::Error::new_user_service)?; 298 299 // Check if the body knows its full data immediately. 300 // 301 // If so, we can skip a bit of bookkeeping that streaming 302 // bodies need to do. 303 if let Some(full) = crate::body::take_full_data(&mut body) { 304 self.conn.write_full_msg(head, full); 305 return Poll::Ready(Ok(())); 306 } 307 308 let body_type = if body.is_end_stream() { 309 self.body_rx.set(None); 310 None 311 } else { 312 let btype = body 313 .size_hint() 314 .exact() 315 .map(BodyLength::Known) 316 .or_else(|| Some(BodyLength::Unknown)); 317 self.body_rx.set(Some(body)); 318 btype 319 }; 320 self.conn.write_head(head, body_type); 321 } else { 322 self.close(); 323 return Poll::Ready(Ok(())); 324 } 325 } else if !self.conn.can_buffer_body() { 326 ready!(self.poll_flush(cx))?; 327 } else { 328 // A new scope is needed :( 329 if let (Some(mut body), clear_body) = 330 OptGuard::new(self.body_rx.as_mut()).guard_mut() 331 { 332 debug_assert!(!*clear_body, "opt guard defaults to keeping body"); 333 if !self.conn.can_write_body() { 334 trace!( 335 "no more write body allowed, user body is_end_stream = {}", 336 body.is_end_stream(), 337 ); 338 *clear_body = true; 339 continue; 340 } 341 342 let item = ready!(body.as_mut().poll_data(cx)); 343 if let Some(item) = item { 344 let chunk = item.map_err(|e| { 345 *clear_body = true; 346 crate::Error::new_user_body(e) 347 })?; 348 let eos = body.is_end_stream(); 349 if eos { 350 *clear_body = true; 351 if chunk.remaining() == 0 { 352 trace!("discarding empty chunk"); 353 self.conn.end_body()?; 354 } else { 355 self.conn.write_body_and_end(chunk); 356 } 357 } else { 358 if chunk.remaining() == 0 { 359 trace!("discarding empty chunk"); 360 continue; 361 } 362 self.conn.write_body(chunk); 363 } 364 } else { 365 *clear_body = true; 366 self.conn.end_body()?; 367 } 368 } else { 369 return Poll::Pending; 370 } 371 } 372 } 373 } 374 poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>375 fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 376 self.conn.poll_flush(cx).map_err(|err| { 377 debug!("error writing: {}", err); 378 crate::Error::new_body_write(err) 379 }) 380 } 381 close(&mut self)382 fn close(&mut self) { 383 self.is_closing = true; 384 self.conn.close_read(); 385 self.conn.close_write(); 386 } 387 is_done(&self) -> bool388 fn is_done(&self) -> bool { 389 if self.is_closing { 390 return true; 391 } 392 393 let read_done = self.conn.is_read_closed(); 394 395 if !T::should_read_first() && read_done { 396 // a client that cannot read may was well be done. 397 true 398 } else { 399 let write_done = self.conn.is_write_closed() 400 || (!self.dispatch.should_poll() && self.body_rx.is_none()); 401 read_done && write_done 402 } 403 } 404 } 405 406 impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T> 407 where 408 D: Dispatch< 409 PollItem = MessageHead<T::Outgoing>, 410 PollBody = Bs, 411 RecvItem = MessageHead<T::Incoming>, 412 > + Unpin, 413 D::PollError: Into<Box<dyn StdError + Send + Sync>>, 414 I: AsyncRead + AsyncWrite + Unpin, 415 T: Http1Transaction + Unpin, 416 Bs: HttpBody + 'static, 417 Bs::Error: Into<Box<dyn StdError + Send + Sync>>, 418 { 419 type Output = crate::Result<Dispatched>; 420 421 #[inline] poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>422 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { 423 self.poll_catch(cx, true) 424 } 425 } 426 427 // ===== impl OptGuard ===== 428 429 /// A drop guard to allow a mutable borrow of an Option while being able to 430 /// set whether the `Option` should be cleared on drop. 431 struct OptGuard<'a, T>(Pin<&'a mut Option<T>>, bool); 432 433 impl<'a, T> OptGuard<'a, T> { new(pin: Pin<&'a mut Option<T>>) -> Self434 fn new(pin: Pin<&'a mut Option<T>>) -> Self { 435 OptGuard(pin, false) 436 } 437 guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool)438 fn guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool) { 439 (self.0.as_mut().as_pin_mut(), &mut self.1) 440 } 441 } 442 443 impl<'a, T> Drop for OptGuard<'a, T> { drop(&mut self)444 fn drop(&mut self) { 445 if self.1 { 446 self.0.set(None); 447 } 448 } 449 } 450 451 // ===== impl Server ===== 452 453 cfg_server! { 454 impl<S, B> Server<S, B> 455 where 456 S: HttpService<B>, 457 { 458 pub(crate) fn new(service: S) -> Server<S, B> { 459 Server { 460 in_flight: Box::pin(None), 461 service, 462 } 463 } 464 465 pub(crate) fn into_service(self) -> S { 466 self.service 467 } 468 } 469 470 // Service is never pinned 471 impl<S: HttpService<B>, B> Unpin for Server<S, B> {} 472 473 impl<S, Bs> Dispatch for Server<S, Body> 474 where 475 S: HttpService<Body, ResBody = Bs>, 476 S::Error: Into<Box<dyn StdError + Send + Sync>>, 477 Bs: HttpBody, 478 { 479 type PollItem = MessageHead<http::StatusCode>; 480 type PollBody = Bs; 481 type PollError = S::Error; 482 type RecvItem = RequestHead; 483 484 fn poll_msg( 485 mut self: Pin<&mut Self>, 486 cx: &mut task::Context<'_>, 487 ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> { 488 let mut this = self.as_mut(); 489 let ret = if let Some(ref mut fut) = this.in_flight.as_mut().as_pin_mut() { 490 let resp = ready!(fut.as_mut().poll(cx)?); 491 let (parts, body) = resp.into_parts(); 492 let head = MessageHead { 493 version: parts.version, 494 subject: parts.status, 495 headers: parts.headers, 496 extensions: parts.extensions, 497 }; 498 Poll::Ready(Some(Ok((head, body)))) 499 } else { 500 unreachable!("poll_msg shouldn't be called if no inflight"); 501 }; 502 503 // Since in_flight finished, remove it 504 this.in_flight.set(None); 505 ret 506 } 507 508 fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> { 509 let (msg, body) = msg?; 510 let mut req = Request::new(body); 511 *req.method_mut() = msg.subject.0; 512 *req.uri_mut() = msg.subject.1; 513 *req.headers_mut() = msg.headers; 514 *req.version_mut() = msg.version; 515 *req.extensions_mut() = msg.extensions; 516 let fut = self.service.call(req); 517 self.in_flight.set(Some(fut)); 518 Ok(()) 519 } 520 521 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> { 522 if self.in_flight.is_some() { 523 Poll::Pending 524 } else { 525 self.service.poll_ready(cx).map_err(|_e| { 526 // FIXME: return error value. 527 trace!("service closed"); 528 }) 529 } 530 } 531 532 fn should_poll(&self) -> bool { 533 self.in_flight.is_some() 534 } 535 } 536 } 537 538 // ===== impl Client ===== 539 540 cfg_client! { 541 impl<B> Client<B> { 542 pub(crate) fn new(rx: ClientRx<B>) -> Client<B> { 543 Client { 544 callback: None, 545 rx, 546 rx_closed: false, 547 } 548 } 549 } 550 551 impl<B> Dispatch for Client<B> 552 where 553 B: HttpBody, 554 { 555 type PollItem = RequestHead; 556 type PollBody = B; 557 type PollError = crate::common::Never; 558 type RecvItem = crate::proto::ResponseHead; 559 560 fn poll_msg( 561 mut self: Pin<&mut Self>, 562 cx: &mut task::Context<'_>, 563 ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), crate::common::Never>>> { 564 let mut this = self.as_mut(); 565 debug_assert!(!this.rx_closed); 566 match this.rx.poll_recv(cx) { 567 Poll::Ready(Some((req, mut cb))) => { 568 // check that future hasn't been canceled already 569 match cb.poll_canceled(cx) { 570 Poll::Ready(()) => { 571 trace!("request canceled"); 572 Poll::Ready(None) 573 } 574 Poll::Pending => { 575 let (parts, body) = req.into_parts(); 576 let head = RequestHead { 577 version: parts.version, 578 subject: crate::proto::RequestLine(parts.method, parts.uri), 579 headers: parts.headers, 580 extensions: parts.extensions, 581 }; 582 this.callback = Some(cb); 583 Poll::Ready(Some(Ok((head, body)))) 584 } 585 } 586 } 587 Poll::Ready(None) => { 588 // user has dropped sender handle 589 trace!("client tx closed"); 590 this.rx_closed = true; 591 Poll::Ready(None) 592 } 593 Poll::Pending => Poll::Pending, 594 } 595 } 596 597 fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> { 598 match msg { 599 Ok((msg, body)) => { 600 if let Some(cb) = self.callback.take() { 601 let res = msg.into_response(body); 602 cb.send(Ok(res)); 603 Ok(()) 604 } else { 605 // Getting here is likely a bug! An error should have happened 606 // in Conn::require_empty_read() before ever parsing a 607 // full message! 608 Err(crate::Error::new_unexpected_message()) 609 } 610 } 611 Err(err) => { 612 if let Some(cb) = self.callback.take() { 613 cb.send(Err((err, None))); 614 Ok(()) 615 } else if !self.rx_closed { 616 self.rx.close(); 617 if let Some((req, cb)) = self.rx.try_recv() { 618 trace!("canceling queued request with connection error: {}", err); 619 // in this case, the message was never even started, so it's safe to tell 620 // the user that the request was completely canceled 621 cb.send(Err((crate::Error::new_canceled().with(err), Some(req)))); 622 Ok(()) 623 } else { 624 Err(err) 625 } 626 } else { 627 Err(err) 628 } 629 } 630 } 631 } 632 633 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> { 634 match self.callback { 635 Some(ref mut cb) => match cb.poll_canceled(cx) { 636 Poll::Ready(()) => { 637 trace!("callback receiver has dropped"); 638 Poll::Ready(Err(())) 639 } 640 Poll::Pending => Poll::Ready(Ok(())), 641 }, 642 None => Poll::Ready(Err(())), 643 } 644 } 645 646 fn should_poll(&self) -> bool { 647 self.callback.is_none() 648 } 649 } 650 } 651 652 #[cfg(test)] 653 mod tests { 654 use super::*; 655 use crate::proto::h1::ClientTransaction; 656 use std::time::Duration; 657 658 #[test] client_read_bytes_before_writing_request()659 fn client_read_bytes_before_writing_request() { 660 let _ = pretty_env_logger::try_init(); 661 662 tokio_test::task::spawn(()).enter(|cx, _| { 663 let (io, mut handle) = tokio_test::io::Builder::new().build_with_handle(); 664 665 // Block at 0 for now, but we will release this response before 666 // the request is ready to write later... 667 //let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 0); 668 let (mut tx, rx) = crate::client::dispatch::channel(); 669 let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io); 670 let mut dispatcher = Dispatcher::new(Client::new(rx), conn); 671 672 // First poll is needed to allow tx to send... 673 assert!(Pin::new(&mut dispatcher).poll(cx).is_pending()); 674 675 // Unblock our IO, which has a response before we've sent request! 676 // 677 handle.read(b"HTTP/1.1 200 OK\r\n\r\n"); 678 679 let mut res_rx = tx 680 .try_send(crate::Request::new(crate::Body::empty())) 681 .unwrap(); 682 683 tokio_test::assert_ready_ok!(Pin::new(&mut dispatcher).poll(cx)); 684 let err = tokio_test::assert_ready_ok!(Pin::new(&mut res_rx).poll(cx)) 685 .expect_err("callback should send error"); 686 687 match (err.0.kind(), err.1) { 688 (&crate::error::Kind::Canceled, Some(_)) => (), 689 other => panic!("expected Canceled, got {:?}", other), 690 } 691 }); 692 } 693 694 #[tokio::test] body_empty_chunks_ignored()695 async fn body_empty_chunks_ignored() { 696 let _ = pretty_env_logger::try_init(); 697 698 let io = tokio_test::io::Builder::new() 699 // no reading or writing, just be blocked for the test... 700 .wait(Duration::from_secs(5)) 701 .build(); 702 703 let (mut tx, rx) = crate::client::dispatch::channel(); 704 let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io); 705 let mut dispatcher = tokio_test::task::spawn(Dispatcher::new(Client::new(rx), conn)); 706 707 // First poll is needed to allow tx to send... 708 assert!(dispatcher.poll().is_pending()); 709 710 let body = { 711 let (mut tx, body) = crate::Body::channel(); 712 tx.try_send_data("".into()).unwrap(); 713 body 714 }; 715 716 let _res_rx = tx.try_send(crate::Request::new(body)).unwrap(); 717 718 // Ensure conn.write_body wasn't called with the empty chunk. 719 // If it is, it will trigger an assertion. 720 assert!(dispatcher.poll().is_pending()); 721 } 722 } 723