1 use std::borrow::Cow; 2 #[cfg(feature = "stream")] 3 use std::error::Error as StdError; 4 use std::fmt; 5 6 use bytes::Bytes; 7 use futures_channel::{mpsc, oneshot}; 8 use futures_core::Stream; // for mpsc::Receiver 9 #[cfg(feature = "stream")] 10 use futures_util::TryStreamExt; 11 use http::HeaderMap; 12 use http_body::{Body as HttpBody, SizeHint}; 13 14 use crate::common::sync_wrapper::SyncWrapper; 15 use crate::common::{task, watch, Future, Never, Pin, Poll}; 16 use crate::proto::h2::ping; 17 use crate::proto::DecodedLength; 18 use crate::upgrade::OnUpgrade; 19 20 type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>; 21 22 /// A stream of `Bytes`, used when receiving bodies. 23 /// 24 /// A good default [`HttpBody`](crate::body::HttpBody) to use in many 25 /// applications. 26 #[must_use = "streams do nothing unless polled"] 27 pub struct Body { 28 kind: Kind, 29 /// Keep the extra bits in an `Option<Box<Extra>>`, so that 30 /// Body stays small in the common case (no extras needed). 31 extra: Option<Box<Extra>>, 32 } 33 34 enum Kind { 35 Once(Option<Bytes>), 36 Chan { 37 content_length: DecodedLength, 38 want_tx: watch::Sender, 39 rx: mpsc::Receiver<Result<Bytes, crate::Error>>, 40 }, 41 H2 { 42 ping: ping::Recorder, 43 content_length: DecodedLength, 44 recv: h2::RecvStream, 45 }, 46 #[cfg(feature = "stream")] 47 Wrapped( 48 SyncWrapper< 49 Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>, 50 >, 51 ), 52 } 53 54 struct Extra { 55 /// Allow the client to pass a future to delay the `Body` from returning 56 /// EOF. This allows the `Client` to try to put the idle connection 57 /// back into the pool before the body is "finished". 58 /// 59 /// The reason for this is so that creating a new request after finishing 60 /// streaming the body of a response could sometimes result in creating 61 /// a brand new connection, since the pool didn't know about the idle 62 /// connection yet. 63 delayed_eof: Option<DelayEof>, 64 on_upgrade: OnUpgrade, 65 } 66 67 type DelayEofUntil = oneshot::Receiver<Never>; 68 69 enum DelayEof { 70 /// Initial state, stream hasn't seen EOF yet. 71 NotEof(DelayEofUntil), 72 /// Transitions to this state once we've seen `poll` try to 73 /// return EOF (`None`). This future is then polled, and 74 /// when it completes, the Body finally returns EOF (`None`). 75 Eof(DelayEofUntil), 76 } 77 78 /// A sender half used with `Body::channel()`. 79 /// 80 /// Useful when wanting to stream chunks from another thread. See 81 /// [`Body::channel`](Body::channel) for more. 82 #[must_use = "Sender does nothing unless sent on"] 83 pub struct Sender { 84 want_rx: watch::Receiver, 85 tx: BodySender, 86 } 87 88 const WANT_PENDING: usize = 1; 89 const WANT_READY: usize = 2; 90 91 impl Body { 92 /// Create an empty `Body` stream. 93 /// 94 /// # Example 95 /// 96 /// ``` 97 /// use hyper::{Body, Request}; 98 /// 99 /// // create a `GET /` request 100 /// let get = Request::new(Body::empty()); 101 /// ``` 102 #[inline] empty() -> Body103 pub fn empty() -> Body { 104 Body::new(Kind::Once(None)) 105 } 106 107 /// Create a `Body` stream with an associated sender half. 108 /// 109 /// Useful when wanting to stream chunks from another thread. 110 #[inline] channel() -> (Sender, Body)111 pub fn channel() -> (Sender, Body) { 112 Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false) 113 } 114 new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body)115 pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) { 116 let (tx, rx) = mpsc::channel(0); 117 118 // If wanter is true, `Sender::poll_ready()` won't becoming ready 119 // until the `Body` has been polled for data once. 120 let want = if wanter { WANT_PENDING } else { WANT_READY }; 121 122 let (want_tx, want_rx) = watch::channel(want); 123 124 let tx = Sender { want_rx, tx }; 125 let rx = Body::new(Kind::Chan { 126 content_length, 127 want_tx, 128 rx, 129 }); 130 131 (tx, rx) 132 } 133 134 /// Wrap a futures `Stream` in a box inside `Body`. 135 /// 136 /// # Example 137 /// 138 /// ``` 139 /// # use hyper::Body; 140 /// let chunks: Vec<Result<_, std::io::Error>> = vec![ 141 /// Ok("hello"), 142 /// Ok(" "), 143 /// Ok("world"), 144 /// ]; 145 /// 146 /// let stream = futures_util::stream::iter(chunks); 147 /// 148 /// let body = Body::wrap_stream(stream); 149 /// ``` 150 /// 151 /// # Optional 152 /// 153 /// This function requires enabling the `stream` feature in your 154 /// `Cargo.toml`. 155 #[cfg(feature = "stream")] wrap_stream<S, O, E>(stream: S) -> Body where S: Stream<Item = Result<O, E>> + Send + 'static, O: Into<Bytes> + 'static, E: Into<Box<dyn StdError + Send + Sync>> + 'static,156 pub fn wrap_stream<S, O, E>(stream: S) -> Body 157 where 158 S: Stream<Item = Result<O, E>> + Send + 'static, 159 O: Into<Bytes> + 'static, 160 E: Into<Box<dyn StdError + Send + Sync>> + 'static, 161 { 162 let mapped = stream.map_ok(Into::into).map_err(Into::into); 163 Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped)))) 164 } 165 166 /// Converts this `Body` into a `Future` of a pending HTTP upgrade. 167 /// 168 /// See [the `upgrade` module](crate::upgrade) for more. on_upgrade(self) -> OnUpgrade169 pub fn on_upgrade(self) -> OnUpgrade { 170 self.extra 171 .map(|ex| ex.on_upgrade) 172 .unwrap_or_else(OnUpgrade::none) 173 } 174 new(kind: Kind) -> Body175 fn new(kind: Kind) -> Body { 176 Body { kind, extra: None } 177 } 178 h2( recv: h2::RecvStream, content_length: DecodedLength, ping: ping::Recorder, ) -> Self179 pub(crate) fn h2( 180 recv: h2::RecvStream, 181 content_length: DecodedLength, 182 ping: ping::Recorder, 183 ) -> Self { 184 let body = Body::new(Kind::H2 { 185 ping, 186 content_length, 187 recv, 188 }); 189 190 body 191 } 192 set_on_upgrade(&mut self, upgrade: OnUpgrade)193 pub(crate) fn set_on_upgrade(&mut self, upgrade: OnUpgrade) { 194 debug_assert!(!upgrade.is_none(), "set_on_upgrade with empty upgrade"); 195 let extra = self.extra_mut(); 196 debug_assert!(extra.on_upgrade.is_none(), "set_on_upgrade twice"); 197 extra.on_upgrade = upgrade; 198 } 199 delayed_eof(&mut self, fut: DelayEofUntil)200 pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) { 201 self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut)); 202 } 203 take_delayed_eof(&mut self) -> Option<DelayEof>204 fn take_delayed_eof(&mut self) -> Option<DelayEof> { 205 self.extra 206 .as_mut() 207 .and_then(|extra| extra.delayed_eof.take()) 208 } 209 extra_mut(&mut self) -> &mut Extra210 fn extra_mut(&mut self) -> &mut Extra { 211 self.extra.get_or_insert_with(|| { 212 Box::new(Extra { 213 delayed_eof: None, 214 on_upgrade: OnUpgrade::none(), 215 }) 216 }) 217 } 218 poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>>219 fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> { 220 match self.take_delayed_eof() { 221 Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) { 222 ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => { 223 self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay)); 224 ok 225 } 226 Poll::Ready(None) => match Pin::new(&mut delay).poll(cx) { 227 Poll::Ready(Ok(never)) => match never {}, 228 Poll::Pending => { 229 self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay)); 230 Poll::Pending 231 } 232 Poll::Ready(Err(_done)) => Poll::Ready(None), 233 }, 234 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), 235 }, 236 Some(DelayEof::Eof(mut delay)) => match Pin::new(&mut delay).poll(cx) { 237 Poll::Ready(Ok(never)) => match never {}, 238 Poll::Pending => { 239 self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay)); 240 Poll::Pending 241 } 242 Poll::Ready(Err(_done)) => Poll::Ready(None), 243 }, 244 None => self.poll_inner(cx), 245 } 246 } 247 poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>>248 fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> { 249 match self.kind { 250 Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)), 251 Kind::Chan { 252 content_length: ref mut len, 253 ref mut rx, 254 ref mut want_tx, 255 } => { 256 want_tx.send(WANT_READY); 257 258 match ready!(Pin::new(rx).poll_next(cx)?) { 259 Some(chunk) => { 260 len.sub_if(chunk.len() as u64); 261 Poll::Ready(Some(Ok(chunk))) 262 } 263 None => Poll::Ready(None), 264 } 265 } 266 Kind::H2 { 267 ref ping, 268 recv: ref mut h2, 269 content_length: ref mut len, 270 } => match ready!(h2.poll_data(cx)) { 271 Some(Ok(bytes)) => { 272 let _ = h2.flow_control().release_capacity(bytes.len()); 273 len.sub_if(bytes.len() as u64); 274 ping.record_data(bytes.len()); 275 Poll::Ready(Some(Ok(bytes))) 276 } 277 Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))), 278 None => Poll::Ready(None), 279 }, 280 281 #[cfg(feature = "stream")] 282 Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) { 283 Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))), 284 None => Poll::Ready(None), 285 }, 286 } 287 } 288 take_full_data(&mut self) -> Option<Bytes>289 pub(super) fn take_full_data(&mut self) -> Option<Bytes> { 290 if let Kind::Once(ref mut chunk) = self.kind { 291 chunk.take() 292 } else { 293 None 294 } 295 } 296 } 297 298 impl Default for Body { 299 /// Returns `Body::empty()`. 300 #[inline] default() -> Body301 fn default() -> Body { 302 Body::empty() 303 } 304 } 305 306 impl HttpBody for Body { 307 type Data = Bytes; 308 type Error = crate::Error; 309 poll_data( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<Self::Data, Self::Error>>>310 fn poll_data( 311 mut self: Pin<&mut Self>, 312 cx: &mut task::Context<'_>, 313 ) -> Poll<Option<Result<Self::Data, Self::Error>>> { 314 self.poll_eof(cx) 315 } 316 poll_trailers( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll<Result<Option<HeaderMap>, Self::Error>>317 fn poll_trailers( 318 mut self: Pin<&mut Self>, 319 cx: &mut task::Context<'_>, 320 ) -> Poll<Result<Option<HeaderMap>, Self::Error>> { 321 match self.kind { 322 Kind::H2 { 323 recv: ref mut h2, 324 ref ping, 325 .. 326 } => match ready!(h2.poll_trailers(cx)) { 327 Ok(t) => { 328 ping.record_non_data(); 329 Poll::Ready(Ok(t)) 330 } 331 Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))), 332 }, 333 _ => Poll::Ready(Ok(None)), 334 } 335 } 336 is_end_stream(&self) -> bool337 fn is_end_stream(&self) -> bool { 338 match self.kind { 339 Kind::Once(ref val) => val.is_none(), 340 Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO, 341 Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(), 342 #[cfg(feature = "stream")] 343 Kind::Wrapped(..) => false, 344 } 345 } 346 size_hint(&self) -> SizeHint347 fn size_hint(&self) -> SizeHint { 348 match self.kind { 349 Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64), 350 Kind::Once(None) => SizeHint::with_exact(0), 351 #[cfg(feature = "stream")] 352 Kind::Wrapped(..) => SizeHint::default(), 353 Kind::Chan { content_length, .. } | Kind::H2 { content_length, .. } => { 354 let mut hint = SizeHint::default(); 355 356 if let Some(content_length) = content_length.into_opt() { 357 hint.set_exact(content_length); 358 } 359 360 hint 361 } 362 } 363 } 364 } 365 366 impl fmt::Debug for Body { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result367 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 368 #[derive(Debug)] 369 struct Streaming; 370 #[derive(Debug)] 371 struct Empty; 372 #[derive(Debug)] 373 struct Full<'a>(&'a Bytes); 374 375 let mut builder = f.debug_tuple("Body"); 376 match self.kind { 377 Kind::Once(None) => builder.field(&Empty), 378 Kind::Once(Some(ref chunk)) => builder.field(&Full(chunk)), 379 _ => builder.field(&Streaming), 380 }; 381 382 builder.finish() 383 } 384 } 385 386 /// # Optional 387 /// 388 /// This function requires enabling the `stream` feature in your 389 /// `Cargo.toml`. 390 #[cfg(feature = "stream")] 391 impl Stream for Body { 392 type Item = crate::Result<Bytes>; 393 poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>>394 fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { 395 HttpBody::poll_data(self, cx) 396 } 397 } 398 399 /// # Optional 400 /// 401 /// This function requires enabling the `stream` feature in your 402 /// `Cargo.toml`. 403 #[cfg(feature = "stream")] 404 impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body { 405 #[inline] from( stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>, ) -> Body406 fn from( 407 stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>, 408 ) -> Body { 409 Body::new(Kind::Wrapped(SyncWrapper::new(stream.into()))) 410 } 411 } 412 413 impl From<Bytes> for Body { 414 #[inline] from(chunk: Bytes) -> Body415 fn from(chunk: Bytes) -> Body { 416 if chunk.is_empty() { 417 Body::empty() 418 } else { 419 Body::new(Kind::Once(Some(chunk))) 420 } 421 } 422 } 423 424 impl From<Vec<u8>> for Body { 425 #[inline] from(vec: Vec<u8>) -> Body426 fn from(vec: Vec<u8>) -> Body { 427 Body::from(Bytes::from(vec)) 428 } 429 } 430 431 impl From<&'static [u8]> for Body { 432 #[inline] from(slice: &'static [u8]) -> Body433 fn from(slice: &'static [u8]) -> Body { 434 Body::from(Bytes::from(slice)) 435 } 436 } 437 438 impl From<Cow<'static, [u8]>> for Body { 439 #[inline] from(cow: Cow<'static, [u8]>) -> Body440 fn from(cow: Cow<'static, [u8]>) -> Body { 441 match cow { 442 Cow::Borrowed(b) => Body::from(b), 443 Cow::Owned(o) => Body::from(o), 444 } 445 } 446 } 447 448 impl From<String> for Body { 449 #[inline] from(s: String) -> Body450 fn from(s: String) -> Body { 451 Body::from(Bytes::from(s.into_bytes())) 452 } 453 } 454 455 impl From<&'static str> for Body { 456 #[inline] from(slice: &'static str) -> Body457 fn from(slice: &'static str) -> Body { 458 Body::from(Bytes::from(slice.as_bytes())) 459 } 460 } 461 462 impl From<Cow<'static, str>> for Body { 463 #[inline] from(cow: Cow<'static, str>) -> Body464 fn from(cow: Cow<'static, str>) -> Body { 465 match cow { 466 Cow::Borrowed(b) => Body::from(b), 467 Cow::Owned(o) => Body::from(o), 468 } 469 } 470 } 471 472 impl Sender { 473 /// Check to see if this `Sender` can send more data. poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>474 pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 475 // Check if the receiver end has tried polling for the body yet 476 ready!(self.poll_want(cx)?); 477 self.tx 478 .poll_ready(cx) 479 .map_err(|_| crate::Error::new_closed()) 480 } 481 poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>482 fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 483 match self.want_rx.load(cx) { 484 WANT_READY => Poll::Ready(Ok(())), 485 WANT_PENDING => Poll::Pending, 486 watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())), 487 unexpected => unreachable!("want_rx value: {}", unexpected), 488 } 489 } 490 ready(&mut self) -> crate::Result<()>491 async fn ready(&mut self) -> crate::Result<()> { 492 futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await 493 } 494 495 /// Send data on this channel when it is ready. send_data(&mut self, chunk: Bytes) -> crate::Result<()>496 pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> { 497 self.ready().await?; 498 self.tx 499 .try_send(Ok(chunk)) 500 .map_err(|_| crate::Error::new_closed()) 501 } 502 503 /// Try to send data on this channel. 504 /// 505 /// # Errors 506 /// 507 /// Returns `Err(Bytes)` if the channel could not (currently) accept 508 /// another `Bytes`. 509 /// 510 /// # Note 511 /// 512 /// This is mostly useful for when trying to send from some other thread 513 /// that doesn't have an async context. If in an async context, prefer 514 /// `send_data()` instead. try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes>515 pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> { 516 self.tx 517 .try_send(Ok(chunk)) 518 .map_err(|err| err.into_inner().expect("just sent Ok")) 519 } 520 521 /// Aborts the body in an abnormal fashion. abort(self)522 pub fn abort(self) { 523 let _ = self 524 .tx 525 // clone so the send works even if buffer is full 526 .clone() 527 .try_send(Err(crate::Error::new_body_write_aborted())); 528 } 529 send_error(&mut self, err: crate::Error)530 pub(crate) fn send_error(&mut self, err: crate::Error) { 531 let _ = self.tx.try_send(Err(err)); 532 } 533 } 534 535 impl fmt::Debug for Sender { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result536 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 537 #[derive(Debug)] 538 struct Open; 539 #[derive(Debug)] 540 struct Closed; 541 542 let mut builder = f.debug_tuple("Sender"); 543 match self.want_rx.peek() { 544 watch::CLOSED => builder.field(&Closed), 545 _ => builder.field(&Open), 546 }; 547 548 builder.finish() 549 } 550 } 551 552 #[cfg(test)] 553 mod tests { 554 use std::mem; 555 use std::task::Poll; 556 557 use super::{Body, DecodedLength, HttpBody, Sender, SizeHint}; 558 559 #[test] test_size_of()560 fn test_size_of() { 561 // These are mostly to help catch *accidentally* increasing 562 // the size by too much. 563 564 let body_size = mem::size_of::<Body>(); 565 let body_expected_size = mem::size_of::<u64>() * 6; 566 assert!( 567 body_size <= body_expected_size, 568 "Body size = {} <= {}", 569 body_size, 570 body_expected_size, 571 ); 572 573 assert_eq!(body_size, mem::size_of::<Option<Body>>(), "Option<Body>"); 574 575 assert_eq!( 576 mem::size_of::<Sender>(), 577 mem::size_of::<usize>() * 4, 578 "Sender" 579 ); 580 581 assert_eq!( 582 mem::size_of::<Sender>(), 583 mem::size_of::<Option<Sender>>(), 584 "Option<Sender>" 585 ); 586 } 587 588 #[test] size_hint()589 fn size_hint() { 590 fn eq(body: Body, b: SizeHint, note: &str) { 591 let a = body.size_hint(); 592 assert_eq!(a.lower(), b.lower(), "lower for {:?}", note); 593 assert_eq!(a.upper(), b.upper(), "upper for {:?}", note); 594 } 595 596 eq(Body::from("Hello"), SizeHint::with_exact(5), "from str"); 597 598 eq(Body::empty(), SizeHint::with_exact(0), "empty"); 599 600 eq(Body::channel().1, SizeHint::new(), "channel"); 601 602 eq( 603 Body::new_channel(DecodedLength::new(4), /*wanter =*/ false).1, 604 SizeHint::with_exact(4), 605 "channel with length", 606 ); 607 } 608 609 #[tokio::test] channel_abort()610 async fn channel_abort() { 611 let (tx, mut rx) = Body::channel(); 612 613 tx.abort(); 614 615 let err = rx.data().await.unwrap().unwrap_err(); 616 assert!(err.is_body_write_aborted(), "{:?}", err); 617 } 618 619 #[tokio::test] channel_abort_when_buffer_is_full()620 async fn channel_abort_when_buffer_is_full() { 621 let (mut tx, mut rx) = Body::channel(); 622 623 tx.try_send_data("chunk 1".into()).expect("send 1"); 624 // buffer is full, but can still send abort 625 tx.abort(); 626 627 let chunk1 = rx.data().await.expect("item 1").expect("chunk 1"); 628 assert_eq!(chunk1, "chunk 1"); 629 630 let err = rx.data().await.unwrap().unwrap_err(); 631 assert!(err.is_body_write_aborted(), "{:?}", err); 632 } 633 634 #[test] channel_buffers_one()635 fn channel_buffers_one() { 636 let (mut tx, _rx) = Body::channel(); 637 638 tx.try_send_data("chunk 1".into()).expect("send 1"); 639 640 // buffer is now full 641 let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2"); 642 assert_eq!(chunk2, "chunk 2"); 643 } 644 645 #[tokio::test] channel_empty()646 async fn channel_empty() { 647 let (_, mut rx) = Body::channel(); 648 649 assert!(rx.data().await.is_none()); 650 } 651 652 #[test] channel_ready()653 fn channel_ready() { 654 let (mut tx, _rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ false); 655 656 let mut tx_ready = tokio_test::task::spawn(tx.ready()); 657 658 assert!(tx_ready.poll().is_ready(), "tx is ready immediately"); 659 } 660 661 #[test] channel_wanter()662 fn channel_wanter() { 663 let (mut tx, mut rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); 664 665 let mut tx_ready = tokio_test::task::spawn(tx.ready()); 666 let mut rx_data = tokio_test::task::spawn(rx.data()); 667 668 assert!( 669 tx_ready.poll().is_pending(), 670 "tx isn't ready before rx has been polled" 671 ); 672 673 assert!(rx_data.poll().is_pending(), "poll rx.data"); 674 assert!(tx_ready.is_woken(), "rx poll wakes tx"); 675 676 assert!( 677 tx_ready.poll().is_ready(), 678 "tx is ready after rx has been polled" 679 ); 680 } 681 682 #[test] channel_notices_closure()683 fn channel_notices_closure() { 684 let (mut tx, rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); 685 686 let mut tx_ready = tokio_test::task::spawn(tx.ready()); 687 688 assert!( 689 tx_ready.poll().is_pending(), 690 "tx isn't ready before rx has been polled" 691 ); 692 693 drop(rx); 694 assert!(tx_ready.is_woken(), "dropping rx wakes tx"); 695 696 match tx_ready.poll() { 697 Poll::Ready(Err(ref e)) if e.is_closed() => (), 698 unexpected => panic!("tx poll ready unexpected: {:?}", unexpected), 699 } 700 } 701 } 702