1 use std::error::Error as StdError; 2 3 use bytes::{Buf, Bytes}; 4 use http::{Request, Response, StatusCode}; 5 use tokio::io::{AsyncRead, AsyncWrite}; 6 7 use super::{Http1Transaction, Wants}; 8 use crate::body::{Body, HttpBody}; 9 use crate::common::{task, Future, Never, Pin, Poll, Unpin}; 10 use crate::proto::{ 11 BodyLength, Conn, DecodedLength, Dispatched, MessageHead, RequestHead, RequestLine, 12 ResponseHead, 13 }; 14 use crate::service::HttpService; 15 16 pub(crate) struct Dispatcher<D, Bs: HttpBody, I, T> { 17 conn: Conn<I, Bs::Data, T>, 18 dispatch: D, 19 body_tx: Option<crate::body::Sender>, 20 body_rx: Pin<Box<Option<Bs>>>, 21 is_closing: bool, 22 } 23 24 pub(crate) trait Dispatch { 25 type PollItem; 26 type PollBody; 27 type PollError; 28 type RecvItem; poll_msg( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>29 fn poll_msg( 30 &mut self, 31 cx: &mut task::Context<'_>, 32 ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>; recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>33 fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>; poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>34 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>; should_poll(&self) -> bool35 fn should_poll(&self) -> bool; 36 } 37 38 pub struct Server<S: HttpService<B>, B> { 39 in_flight: Pin<Box<Option<S::Future>>>, 40 pub(crate) service: S, 41 } 42 43 pub struct Client<B> { 44 callback: Option<crate::client::dispatch::Callback<Request<B>, Response<Body>>>, 45 rx: ClientRx<B>, 46 rx_closed: bool, 47 } 48 49 type ClientRx<B> = crate::client::dispatch::Receiver<Request<B>, Response<Body>>; 50 51 impl<D, Bs, I, T> Dispatcher<D, Bs, I, T> 52 where 53 D: Dispatch< 54 PollItem = MessageHead<T::Outgoing>, 55 PollBody = Bs, 56 RecvItem = MessageHead<T::Incoming>, 57 > + Unpin, 58 D::PollError: Into<Box<dyn StdError + Send + Sync>>, 59 I: AsyncRead + AsyncWrite + Unpin, 60 T: Http1Transaction + Unpin, 61 Bs: HttpBody + 'static, 62 Bs::Error: Into<Box<dyn StdError + Send + Sync>>, 63 { new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self64 pub fn new(dispatch: D, conn: Conn<I, Bs::Data, T>) -> Self { 65 Dispatcher { 66 conn, 67 dispatch, 68 body_tx: None, 69 body_rx: Box::pin(None), 70 is_closing: false, 71 } 72 } 73 disable_keep_alive(&mut self)74 pub fn disable_keep_alive(&mut self) { 75 self.conn.disable_keep_alive(); 76 if self.conn.is_write_closed() { 77 self.close(); 78 } 79 } 80 into_inner(self) -> (I, Bytes, D)81 pub fn into_inner(self) -> (I, Bytes, D) { 82 let (io, buf) = self.conn.into_inner(); 83 (io, buf, self.dispatch) 84 } 85 86 /// Run this dispatcher until HTTP says this connection is done, 87 /// but don't call `AsyncWrite::shutdown` on the underlying IO. 88 /// 89 /// This is useful for old-style HTTP upgrades, but ignores 90 /// newer-style upgrade API. poll_without_shutdown( &mut self, cx: &mut task::Context<'_>, ) -> Poll<crate::Result<()>> where Self: Unpin,91 pub(crate) fn poll_without_shutdown( 92 &mut self, 93 cx: &mut task::Context<'_>, 94 ) -> Poll<crate::Result<()>> 95 where 96 Self: Unpin, 97 { 98 Pin::new(self).poll_catch(cx, false).map_ok(|ds| { 99 if let Dispatched::Upgrade(pending) = ds { 100 pending.manual(); 101 } 102 }) 103 } 104 poll_catch( &mut self, cx: &mut task::Context<'_>, should_shutdown: bool, ) -> Poll<crate::Result<Dispatched>>105 fn poll_catch( 106 &mut self, 107 cx: &mut task::Context<'_>, 108 should_shutdown: bool, 109 ) -> Poll<crate::Result<Dispatched>> { 110 Poll::Ready(ready!(self.poll_inner(cx, should_shutdown)).or_else(|e| { 111 // An error means we're shutting down either way. 112 // We just try to give the error to the user, 113 // and close the connection with an Ok. If we 114 // cannot give it to the user, then return the Err. 115 self.dispatch.recv_msg(Err(e))?; 116 Ok(Dispatched::Shutdown) 117 })) 118 } 119 poll_inner( &mut self, cx: &mut task::Context<'_>, should_shutdown: bool, ) -> Poll<crate::Result<Dispatched>>120 fn poll_inner( 121 &mut self, 122 cx: &mut task::Context<'_>, 123 should_shutdown: bool, 124 ) -> Poll<crate::Result<Dispatched>> { 125 T::update_date(); 126 127 ready!(self.poll_loop(cx))?; 128 129 if self.is_done() { 130 if let Some(pending) = self.conn.pending_upgrade() { 131 self.conn.take_error()?; 132 return Poll::Ready(Ok(Dispatched::Upgrade(pending))); 133 } else if should_shutdown { 134 ready!(self.conn.poll_shutdown(cx)).map_err(crate::Error::new_shutdown)?; 135 } 136 self.conn.take_error()?; 137 Poll::Ready(Ok(Dispatched::Shutdown)) 138 } else { 139 Poll::Pending 140 } 141 } 142 poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>143 fn poll_loop(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 144 // Limit the looping on this connection, in case it is ready far too 145 // often, so that other futures don't starve. 146 // 147 // 16 was chosen arbitrarily, as that is number of pipelined requests 148 // benchmarks often use. Perhaps it should be a config option instead. 149 for _ in 0..16 { 150 let _ = self.poll_read(cx)?; 151 let _ = self.poll_write(cx)?; 152 let _ = self.poll_flush(cx)?; 153 154 // This could happen if reading paused before blocking on IO, 155 // such as getting to the end of a framed message, but then 156 // writing/flushing set the state back to Init. In that case, 157 // if the read buffer still had bytes, we'd want to try poll_read 158 // again, or else we wouldn't ever be woken up again. 159 // 160 // Using this instead of task::current() and notify() inside 161 // the Conn is noticeably faster in pipelined benchmarks. 162 if !self.conn.wants_read_again() { 163 //break; 164 return Poll::Ready(Ok(())); 165 } 166 } 167 168 trace!("poll_loop yielding (self = {:p})", self); 169 170 task::yield_now(cx).map(|never| match never {}) 171 } 172 poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>173 fn poll_read(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 174 loop { 175 if self.is_closing { 176 return Poll::Ready(Ok(())); 177 } else if self.conn.can_read_head() { 178 ready!(self.poll_read_head(cx))?; 179 } else if let Some(mut body) = self.body_tx.take() { 180 if self.conn.can_read_body() { 181 match body.poll_ready(cx) { 182 Poll::Ready(Ok(())) => (), 183 Poll::Pending => { 184 self.body_tx = Some(body); 185 return Poll::Pending; 186 } 187 Poll::Ready(Err(_canceled)) => { 188 // user doesn't care about the body 189 // so we should stop reading 190 trace!("body receiver dropped before eof, draining or closing"); 191 self.conn.poll_drain_or_close_read(cx); 192 continue; 193 } 194 } 195 match self.conn.poll_read_body(cx) { 196 Poll::Ready(Some(Ok(chunk))) => match body.try_send_data(chunk) { 197 Ok(()) => { 198 self.body_tx = Some(body); 199 } 200 Err(_canceled) => { 201 if self.conn.can_read_body() { 202 trace!("body receiver dropped before eof, closing"); 203 self.conn.close_read(); 204 } 205 } 206 }, 207 Poll::Ready(None) => { 208 // just drop, the body will close automatically 209 } 210 Poll::Pending => { 211 self.body_tx = Some(body); 212 return Poll::Pending; 213 } 214 Poll::Ready(Some(Err(e))) => { 215 body.send_error(crate::Error::new_body(e)); 216 } 217 } 218 } else { 219 // just drop, the body will close automatically 220 } 221 } else { 222 return self.conn.poll_read_keep_alive(cx); 223 } 224 } 225 } 226 poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>227 fn poll_read_head(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 228 // can dispatch receive, or does it still care about, an incoming message? 229 match ready!(self.dispatch.poll_ready(cx)) { 230 Ok(()) => (), 231 Err(()) => { 232 trace!("dispatch no longer receiving messages"); 233 self.close(); 234 return Poll::Ready(Ok(())); 235 } 236 } 237 // dispatch is ready for a message, try to read one 238 match ready!(self.conn.poll_read_head(cx)) { 239 Some(Ok((head, body_len, wants))) => { 240 let mut body = match body_len { 241 DecodedLength::ZERO => Body::empty(), 242 other => { 243 let (tx, rx) = Body::new_channel(other, wants.contains(Wants::EXPECT)); 244 self.body_tx = Some(tx); 245 rx 246 } 247 }; 248 if wants.contains(Wants::UPGRADE) { 249 body.set_on_upgrade(self.conn.on_upgrade()); 250 } 251 self.dispatch.recv_msg(Ok((head, body)))?; 252 Poll::Ready(Ok(())) 253 } 254 Some(Err(err)) => { 255 debug!("read_head error: {}", err); 256 self.dispatch.recv_msg(Err(err))?; 257 // if here, the dispatcher gave the user the error 258 // somewhere else. we still need to shutdown, but 259 // not as a second error. 260 self.close(); 261 Poll::Ready(Ok(())) 262 } 263 None => { 264 // read eof, the write side will have been closed too unless 265 // allow_read_close was set to true, in which case just do 266 // nothing... 267 debug_assert!(self.conn.is_read_closed()); 268 if self.conn.is_write_closed() { 269 self.close(); 270 } 271 Poll::Ready(Ok(())) 272 } 273 } 274 } 275 poll_write(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>276 fn poll_write(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 277 loop { 278 if self.is_closing { 279 return Poll::Ready(Ok(())); 280 } else if self.body_rx.is_none() 281 && self.conn.can_write_head() 282 && self.dispatch.should_poll() 283 { 284 if let Some(msg) = ready!(self.dispatch.poll_msg(cx)) { 285 let (head, mut body) = msg.map_err(crate::Error::new_user_service)?; 286 287 // Check if the body knows its full data immediately. 288 // 289 // If so, we can skip a bit of bookkeeping that streaming 290 // bodies need to do. 291 if let Some(full) = crate::body::take_full_data(&mut body) { 292 self.conn.write_full_msg(head, full); 293 return Poll::Ready(Ok(())); 294 } 295 296 let body_type = if body.is_end_stream() { 297 self.body_rx.set(None); 298 None 299 } else { 300 let btype = body 301 .size_hint() 302 .exact() 303 .map(BodyLength::Known) 304 .or_else(|| Some(BodyLength::Unknown)); 305 self.body_rx.set(Some(body)); 306 btype 307 }; 308 self.conn.write_head(head, body_type); 309 } else { 310 self.close(); 311 return Poll::Ready(Ok(())); 312 } 313 } else if !self.conn.can_buffer_body() { 314 ready!(self.poll_flush(cx))?; 315 } else { 316 // A new scope is needed :( 317 if let (Some(mut body), clear_body) = 318 OptGuard::new(self.body_rx.as_mut()).guard_mut() 319 { 320 debug_assert!(!*clear_body, "opt guard defaults to keeping body"); 321 if !self.conn.can_write_body() { 322 trace!( 323 "no more write body allowed, user body is_end_stream = {}", 324 body.is_end_stream(), 325 ); 326 *clear_body = true; 327 continue; 328 } 329 330 let item = ready!(body.as_mut().poll_data(cx)); 331 if let Some(item) = item { 332 let chunk = item.map_err(|e| { 333 *clear_body = true; 334 crate::Error::new_user_body(e) 335 })?; 336 let eos = body.is_end_stream(); 337 if eos { 338 *clear_body = true; 339 if chunk.remaining() == 0 { 340 trace!("discarding empty chunk"); 341 self.conn.end_body()?; 342 } else { 343 self.conn.write_body_and_end(chunk); 344 } 345 } else { 346 if chunk.remaining() == 0 { 347 trace!("discarding empty chunk"); 348 continue; 349 } 350 self.conn.write_body(chunk); 351 } 352 } else { 353 *clear_body = true; 354 self.conn.end_body()?; 355 } 356 } else { 357 return Poll::Pending; 358 } 359 } 360 } 361 } 362 poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>363 fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 364 self.conn.poll_flush(cx).map_err(|err| { 365 debug!("error writing: {}", err); 366 crate::Error::new_body_write(err) 367 }) 368 } 369 close(&mut self)370 fn close(&mut self) { 371 self.is_closing = true; 372 self.conn.close_read(); 373 self.conn.close_write(); 374 } 375 is_done(&self) -> bool376 fn is_done(&self) -> bool { 377 if self.is_closing { 378 return true; 379 } 380 381 let read_done = self.conn.is_read_closed(); 382 383 if !T::should_read_first() && read_done { 384 // a client that cannot read may was well be done. 385 true 386 } else { 387 let write_done = self.conn.is_write_closed() 388 || (!self.dispatch.should_poll() && self.body_rx.is_none()); 389 read_done && write_done 390 } 391 } 392 } 393 394 impl<D, Bs, I, T> Future for Dispatcher<D, Bs, I, T> 395 where 396 D: Dispatch< 397 PollItem = MessageHead<T::Outgoing>, 398 PollBody = Bs, 399 RecvItem = MessageHead<T::Incoming>, 400 > + Unpin, 401 D::PollError: Into<Box<dyn StdError + Send + Sync>>, 402 I: AsyncRead + AsyncWrite + Unpin, 403 T: Http1Transaction + Unpin, 404 Bs: HttpBody + 'static, 405 Bs::Error: Into<Box<dyn StdError + Send + Sync>>, 406 { 407 type Output = crate::Result<Dispatched>; 408 409 #[inline] poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output>410 fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Self::Output> { 411 self.poll_catch(cx, true) 412 } 413 } 414 415 // ===== impl OptGuard ===== 416 417 /// A drop guard to allow a mutable borrow of an Option while being able to 418 /// set whether the `Option` should be cleared on drop. 419 struct OptGuard<'a, T>(Pin<&'a mut Option<T>>, bool); 420 421 impl<'a, T> OptGuard<'a, T> { new(pin: Pin<&'a mut Option<T>>) -> Self422 fn new(pin: Pin<&'a mut Option<T>>) -> Self { 423 OptGuard(pin, false) 424 } 425 guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool)426 fn guard_mut(&mut self) -> (Option<Pin<&mut T>>, &mut bool) { 427 (self.0.as_mut().as_pin_mut(), &mut self.1) 428 } 429 } 430 431 impl<'a, T> Drop for OptGuard<'a, T> { drop(&mut self)432 fn drop(&mut self) { 433 if self.1 { 434 self.0.set(None); 435 } 436 } 437 } 438 439 // ===== impl Server ===== 440 441 impl<S, B> Server<S, B> 442 where 443 S: HttpService<B>, 444 { new(service: S) -> Server<S, B>445 pub fn new(service: S) -> Server<S, B> { 446 Server { 447 in_flight: Box::pin(None), 448 service, 449 } 450 } 451 into_service(self) -> S452 pub fn into_service(self) -> S { 453 self.service 454 } 455 } 456 457 // Service is never pinned 458 impl<S: HttpService<B>, B> Unpin for Server<S, B> {} 459 460 impl<S, Bs> Dispatch for Server<S, Body> 461 where 462 S: HttpService<Body, ResBody = Bs>, 463 S::Error: Into<Box<dyn StdError + Send + Sync>>, 464 Bs: HttpBody, 465 { 466 type PollItem = MessageHead<StatusCode>; 467 type PollBody = Bs; 468 type PollError = S::Error; 469 type RecvItem = RequestHead; 470 poll_msg( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>>471 fn poll_msg( 472 &mut self, 473 cx: &mut task::Context<'_>, 474 ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Self::PollError>>> { 475 let ret = if let Some(ref mut fut) = self.in_flight.as_mut().as_pin_mut() { 476 let resp = ready!(fut.as_mut().poll(cx)?); 477 let (parts, body) = resp.into_parts(); 478 let head = MessageHead { 479 version: parts.version, 480 subject: parts.status, 481 headers: parts.headers, 482 }; 483 Poll::Ready(Some(Ok((head, body)))) 484 } else { 485 unreachable!("poll_msg shouldn't be called if no inflight"); 486 }; 487 488 // Since in_flight finished, remove it 489 self.in_flight.set(None); 490 ret 491 } 492 recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>493 fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> { 494 let (msg, body) = msg?; 495 let mut req = Request::new(body); 496 *req.method_mut() = msg.subject.0; 497 *req.uri_mut() = msg.subject.1; 498 *req.headers_mut() = msg.headers; 499 *req.version_mut() = msg.version; 500 let fut = self.service.call(req); 501 self.in_flight.set(Some(fut)); 502 Ok(()) 503 } 504 poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>505 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> { 506 if self.in_flight.is_some() { 507 Poll::Pending 508 } else { 509 self.service.poll_ready(cx).map_err(|_e| { 510 // FIXME: return error value. 511 trace!("service closed"); 512 }) 513 } 514 } 515 should_poll(&self) -> bool516 fn should_poll(&self) -> bool { 517 self.in_flight.is_some() 518 } 519 } 520 521 // ===== impl Client ===== 522 523 impl<B> Client<B> { new(rx: ClientRx<B>) -> Client<B>524 pub fn new(rx: ClientRx<B>) -> Client<B> { 525 Client { 526 callback: None, 527 rx, 528 rx_closed: false, 529 } 530 } 531 } 532 533 impl<B> Dispatch for Client<B> 534 where 535 B: HttpBody, 536 { 537 type PollItem = RequestHead; 538 type PollBody = B; 539 type PollError = Never; 540 type RecvItem = ResponseHead; 541 poll_msg( &mut self, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Never>>>542 fn poll_msg( 543 &mut self, 544 cx: &mut task::Context<'_>, 545 ) -> Poll<Option<Result<(Self::PollItem, Self::PollBody), Never>>> { 546 debug_assert!(!self.rx_closed); 547 match self.rx.poll_next(cx) { 548 Poll::Ready(Some((req, mut cb))) => { 549 // check that future hasn't been canceled already 550 match cb.poll_canceled(cx) { 551 Poll::Ready(()) => { 552 trace!("request canceled"); 553 Poll::Ready(None) 554 } 555 Poll::Pending => { 556 let (parts, body) = req.into_parts(); 557 let head = RequestHead { 558 version: parts.version, 559 subject: RequestLine(parts.method, parts.uri), 560 headers: parts.headers, 561 }; 562 self.callback = Some(cb); 563 Poll::Ready(Some(Ok((head, body)))) 564 } 565 } 566 } 567 Poll::Ready(None) => { 568 // user has dropped sender handle 569 trace!("client tx closed"); 570 self.rx_closed = true; 571 Poll::Ready(None) 572 } 573 Poll::Pending => Poll::Pending, 574 } 575 } 576 recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()>577 fn recv_msg(&mut self, msg: crate::Result<(Self::RecvItem, Body)>) -> crate::Result<()> { 578 match msg { 579 Ok((msg, body)) => { 580 if let Some(cb) = self.callback.take() { 581 let mut res = Response::new(body); 582 *res.status_mut() = msg.subject; 583 *res.headers_mut() = msg.headers; 584 *res.version_mut() = msg.version; 585 cb.send(Ok(res)); 586 Ok(()) 587 } else { 588 // Getting here is likely a bug! An error should have happened 589 // in Conn::require_empty_read() before ever parsing a 590 // full message! 591 Err(crate::Error::new_unexpected_message()) 592 } 593 } 594 Err(err) => { 595 if let Some(cb) = self.callback.take() { 596 cb.send(Err((err, None))); 597 Ok(()) 598 } else if !self.rx_closed { 599 self.rx.close(); 600 if let Some((req, cb)) = self.rx.try_recv() { 601 trace!("canceling queued request with connection error: {}", err); 602 // in this case, the message was never even started, so it's safe to tell 603 // the user that the request was completely canceled 604 cb.send(Err((crate::Error::new_canceled().with(err), Some(req)))); 605 Ok(()) 606 } else { 607 Err(err) 608 } 609 } else { 610 Err(err) 611 } 612 } 613 } 614 } 615 poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>>616 fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<Result<(), ()>> { 617 match self.callback { 618 Some(ref mut cb) => match cb.poll_canceled(cx) { 619 Poll::Ready(()) => { 620 trace!("callback receiver has dropped"); 621 Poll::Ready(Err(())) 622 } 623 Poll::Pending => Poll::Ready(Ok(())), 624 }, 625 None => Poll::Ready(Err(())), 626 } 627 } 628 should_poll(&self) -> bool629 fn should_poll(&self) -> bool { 630 self.callback.is_none() 631 } 632 } 633 634 #[cfg(test)] 635 mod tests { 636 use super::*; 637 use crate::proto::h1::ClientTransaction; 638 use std::time::Duration; 639 640 #[test] client_read_bytes_before_writing_request()641 fn client_read_bytes_before_writing_request() { 642 let _ = pretty_env_logger::try_init(); 643 644 tokio_test::task::spawn(()).enter(|cx, _| { 645 let (io, mut handle) = tokio_test::io::Builder::new().build_with_handle(); 646 647 // Block at 0 for now, but we will release this response before 648 // the request is ready to write later... 649 //let io = AsyncIo::new_buf(b"HTTP/1.1 200 OK\r\n\r\n".to_vec(), 0); 650 let (mut tx, rx) = crate::client::dispatch::channel(); 651 let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io); 652 let mut dispatcher = Dispatcher::new(Client::new(rx), conn); 653 654 // First poll is needed to allow tx to send... 655 assert!(Pin::new(&mut dispatcher).poll(cx).is_pending()); 656 657 // Unblock our IO, which has a response before we've sent request! 658 // 659 handle.read(b"HTTP/1.1 200 OK\r\n\r\n"); 660 661 let mut res_rx = tx 662 .try_send(crate::Request::new(crate::Body::empty())) 663 .unwrap(); 664 665 tokio_test::assert_ready_ok!(Pin::new(&mut dispatcher).poll(cx)); 666 let err = tokio_test::assert_ready_ok!(Pin::new(&mut res_rx).poll(cx)) 667 .expect_err("callback should send error"); 668 669 match (err.0.kind(), err.1) { 670 (&crate::error::Kind::Canceled, Some(_)) => (), 671 other => panic!("expected Canceled, got {:?}", other), 672 } 673 }); 674 } 675 676 #[tokio::test] body_empty_chunks_ignored()677 async fn body_empty_chunks_ignored() { 678 let _ = pretty_env_logger::try_init(); 679 680 let io = tokio_test::io::Builder::new() 681 // no reading or writing, just be blocked for the test... 682 .wait(Duration::from_secs(5)) 683 .build(); 684 685 let (mut tx, rx) = crate::client::dispatch::channel(); 686 let conn = Conn::<_, bytes::Bytes, ClientTransaction>::new(io); 687 let mut dispatcher = tokio_test::task::spawn(Dispatcher::new(Client::new(rx), conn)); 688 689 // First poll is needed to allow tx to send... 690 assert!(dispatcher.poll().is_pending()); 691 692 let body = { 693 let (mut tx, body) = crate::Body::channel(); 694 tx.try_send_data("".into()).unwrap(); 695 body 696 }; 697 698 let _res_rx = tx.try_send(crate::Request::new(body)).unwrap(); 699 700 // Ensure conn.write_body wasn't called with the empty chunk. 701 // If it is, it will trigger an assertion. 702 assert!(dispatcher.poll().is_pending()); 703 } 704 } 705