1 use std::borrow::Cow; 2 #[cfg(feature = "stream")] 3 use std::error::Error as StdError; 4 use std::fmt; 5 6 use bytes::Bytes; 7 use futures_channel::mpsc; 8 use futures_channel::oneshot; 9 use futures_core::Stream; // for mpsc::Receiver 10 #[cfg(feature = "stream")] 11 use futures_util::TryStreamExt; 12 use http::HeaderMap; 13 use http_body::{Body as HttpBody, SizeHint}; 14 15 use super::DecodedLength; 16 #[cfg(feature = "stream")] 17 use crate::common::sync_wrapper::SyncWrapper; 18 use crate::common::Future; 19 #[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] 20 use crate::common::Never; 21 use crate::common::{task, watch, Pin, Poll}; 22 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] 23 use crate::proto::h2::ping; 24 25 type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>; 26 type TrailersSender = oneshot::Sender<HeaderMap>; 27 28 /// A stream of `Bytes`, used when receiving bodies. 29 /// 30 /// A good default [`HttpBody`](crate::body::HttpBody) to use in many 31 /// applications. 32 /// 33 /// Note: To read the full body, use [`body::to_bytes`](crate::body::to_bytes) 34 /// or [`body::aggregate`](crate::body::aggregate). 35 #[must_use = "streams do nothing unless polled"] 36 pub struct Body { 37 kind: Kind, 38 /// Keep the extra bits in an `Option<Box<Extra>>`, so that 39 /// Body stays small in the common case (no extras needed). 40 extra: Option<Box<Extra>>, 41 } 42 43 enum Kind { 44 Once(Option<Bytes>), 45 Chan { 46 content_length: DecodedLength, 47 want_tx: watch::Sender, 48 data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>, 49 trailers_rx: oneshot::Receiver<HeaderMap>, 50 }, 51 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] 52 H2 { 53 ping: ping::Recorder, 54 content_length: DecodedLength, 55 recv: h2::RecvStream, 56 }, 57 #[cfg(feature = "ffi")] 58 Ffi(crate::ffi::UserBody), 59 #[cfg(feature = "stream")] 60 Wrapped( 61 SyncWrapper< 62 Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>, 63 >, 64 ), 65 } 66 67 struct Extra { 68 /// Allow the client to pass a future to delay the `Body` from returning 69 /// EOF. This allows the `Client` to try to put the idle connection 70 /// back into the pool before the body is "finished". 71 /// 72 /// The reason for this is so that creating a new request after finishing 73 /// streaming the body of a response could sometimes result in creating 74 /// a brand new connection, since the pool didn't know about the idle 75 /// connection yet. 76 delayed_eof: Option<DelayEof>, 77 } 78 79 #[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] 80 type DelayEofUntil = oneshot::Receiver<Never>; 81 82 enum DelayEof { 83 /// Initial state, stream hasn't seen EOF yet. 84 #[cfg(any(feature = "http1", feature = "http2"))] 85 #[cfg(feature = "client")] 86 NotEof(DelayEofUntil), 87 /// Transitions to this state once we've seen `poll` try to 88 /// return EOF (`None`). This future is then polled, and 89 /// when it completes, the Body finally returns EOF (`None`). 90 #[cfg(any(feature = "http1", feature = "http2"))] 91 #[cfg(feature = "client")] 92 Eof(DelayEofUntil), 93 } 94 95 /// A sender half created through [`Body::channel()`]. 96 /// 97 /// Useful when wanting to stream chunks from another thread. 98 /// 99 /// ## Body Closing 100 /// 101 /// Note that the request body will always be closed normally when the sender is dropped (meaning 102 /// that the empty terminating chunk will be sent to the remote). If you desire to close the 103 /// connection with an incomplete response (e.g. in the case of an error during asynchronous 104 /// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion. 105 /// 106 /// [`Body::channel()`]: struct.Body.html#method.channel 107 /// [`Sender::abort()`]: struct.Sender.html#method.abort 108 #[must_use = "Sender does nothing unless sent on"] 109 pub struct Sender { 110 want_rx: watch::Receiver, 111 data_tx: BodySender, 112 trailers_tx: Option<TrailersSender>, 113 } 114 115 const WANT_PENDING: usize = 1; 116 const WANT_READY: usize = 2; 117 118 impl Body { 119 /// Create an empty `Body` stream. 120 /// 121 /// # Example 122 /// 123 /// ``` 124 /// use hyper::{Body, Request}; 125 /// 126 /// // create a `GET /` request 127 /// let get = Request::new(Body::empty()); 128 /// ``` 129 #[inline] empty() -> Body130 pub fn empty() -> Body { 131 Body::new(Kind::Once(None)) 132 } 133 134 /// Create a `Body` stream with an associated sender half. 135 /// 136 /// Useful when wanting to stream chunks from another thread. 137 #[inline] channel() -> (Sender, Body)138 pub fn channel() -> (Sender, Body) { 139 Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false) 140 } 141 new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body)142 pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) { 143 let (data_tx, data_rx) = mpsc::channel(0); 144 let (trailers_tx, trailers_rx) = oneshot::channel(); 145 146 // If wanter is true, `Sender::poll_ready()` won't becoming ready 147 // until the `Body` has been polled for data once. 148 let want = if wanter { WANT_PENDING } else { WANT_READY }; 149 150 let (want_tx, want_rx) = watch::channel(want); 151 152 let tx = Sender { 153 want_rx, 154 data_tx, 155 trailers_tx: Some(trailers_tx), 156 }; 157 let rx = Body::new(Kind::Chan { 158 content_length, 159 want_tx, 160 data_rx, 161 trailers_rx, 162 }); 163 164 (tx, rx) 165 } 166 167 /// Wrap a futures `Stream` in a box inside `Body`. 168 /// 169 /// # Example 170 /// 171 /// ``` 172 /// # use hyper::Body; 173 /// let chunks: Vec<Result<_, std::io::Error>> = vec![ 174 /// Ok("hello"), 175 /// Ok(" "), 176 /// Ok("world"), 177 /// ]; 178 /// 179 /// let stream = futures_util::stream::iter(chunks); 180 /// 181 /// let body = Body::wrap_stream(stream); 182 /// ``` 183 /// 184 /// # Optional 185 /// 186 /// This function requires enabling the `stream` feature in your 187 /// `Cargo.toml`. 188 #[cfg(feature = "stream")] 189 #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] wrap_stream<S, O, E>(stream: S) -> Body where S: Stream<Item = Result<O, E>> + Send + 'static, O: Into<Bytes> + 'static, E: Into<Box<dyn StdError + Send + Sync>> + 'static,190 pub fn wrap_stream<S, O, E>(stream: S) -> Body 191 where 192 S: Stream<Item = Result<O, E>> + Send + 'static, 193 O: Into<Bytes> + 'static, 194 E: Into<Box<dyn StdError + Send + Sync>> + 'static, 195 { 196 let mapped = stream.map_ok(Into::into).map_err(Into::into); 197 Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped)))) 198 } 199 new(kind: Kind) -> Body200 fn new(kind: Kind) -> Body { 201 Body { kind, extra: None } 202 } 203 204 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] h2( recv: h2::RecvStream, content_length: DecodedLength, ping: ping::Recorder, ) -> Self205 pub(crate) fn h2( 206 recv: h2::RecvStream, 207 content_length: DecodedLength, 208 ping: ping::Recorder, 209 ) -> Self { 210 let body = Body::new(Kind::H2 { 211 ping, 212 content_length, 213 recv, 214 }); 215 216 body 217 } 218 219 #[cfg(any(feature = "http1", feature = "http2"))] 220 #[cfg(feature = "client")] delayed_eof(&mut self, fut: DelayEofUntil)221 pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) { 222 self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut)); 223 } 224 take_delayed_eof(&mut self) -> Option<DelayEof>225 fn take_delayed_eof(&mut self) -> Option<DelayEof> { 226 self.extra 227 .as_mut() 228 .and_then(|extra| extra.delayed_eof.take()) 229 } 230 231 #[cfg(any(feature = "http1", feature = "http2"))] extra_mut(&mut self) -> &mut Extra232 fn extra_mut(&mut self) -> &mut Extra { 233 self.extra 234 .get_or_insert_with(|| Box::new(Extra { delayed_eof: None })) 235 } 236 poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>>237 fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> { 238 match self.take_delayed_eof() { 239 #[cfg(any(feature = "http1", feature = "http2"))] 240 #[cfg(feature = "client")] 241 Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) { 242 ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => { 243 self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay)); 244 ok 245 } 246 Poll::Ready(None) => match Pin::new(&mut delay).poll(cx) { 247 Poll::Ready(Ok(never)) => match never {}, 248 Poll::Pending => { 249 self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay)); 250 Poll::Pending 251 } 252 Poll::Ready(Err(_done)) => Poll::Ready(None), 253 }, 254 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), 255 }, 256 #[cfg(any(feature = "http1", feature = "http2"))] 257 #[cfg(feature = "client")] 258 Some(DelayEof::Eof(mut delay)) => match Pin::new(&mut delay).poll(cx) { 259 Poll::Ready(Ok(never)) => match never {}, 260 Poll::Pending => { 261 self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay)); 262 Poll::Pending 263 } 264 Poll::Ready(Err(_done)) => Poll::Ready(None), 265 }, 266 #[cfg(any( 267 not(any(feature = "http1", feature = "http2")), 268 not(feature = "client") 269 ))] 270 Some(delay_eof) => match delay_eof {}, 271 None => self.poll_inner(cx), 272 } 273 } 274 275 #[cfg(feature = "ffi")] as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody276 pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody { 277 match self.kind { 278 Kind::Ffi(ref mut body) => return body, 279 _ => { 280 self.kind = Kind::Ffi(crate::ffi::UserBody::new()); 281 } 282 } 283 284 match self.kind { 285 Kind::Ffi(ref mut body) => body, 286 _ => unreachable!(), 287 } 288 } 289 poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>>290 fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> { 291 match self.kind { 292 Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)), 293 Kind::Chan { 294 content_length: ref mut len, 295 ref mut data_rx, 296 ref mut want_tx, 297 .. 298 } => { 299 want_tx.send(WANT_READY); 300 301 match ready!(Pin::new(data_rx).poll_next(cx)?) { 302 Some(chunk) => { 303 len.sub_if(chunk.len() as u64); 304 Poll::Ready(Some(Ok(chunk))) 305 } 306 None => Poll::Ready(None), 307 } 308 } 309 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] 310 Kind::H2 { 311 ref ping, 312 recv: ref mut h2, 313 content_length: ref mut len, 314 } => match ready!(h2.poll_data(cx)) { 315 Some(Ok(bytes)) => { 316 let _ = h2.flow_control().release_capacity(bytes.len()); 317 len.sub_if(bytes.len() as u64); 318 ping.record_data(bytes.len()); 319 Poll::Ready(Some(Ok(bytes))) 320 } 321 Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))), 322 None => Poll::Ready(None), 323 }, 324 325 #[cfg(feature = "ffi")] 326 Kind::Ffi(ref mut body) => body.poll_data(cx), 327 328 #[cfg(feature = "stream")] 329 Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) { 330 Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))), 331 None => Poll::Ready(None), 332 }, 333 } 334 } 335 336 #[cfg(feature = "http1")] take_full_data(&mut self) -> Option<Bytes>337 pub(super) fn take_full_data(&mut self) -> Option<Bytes> { 338 if let Kind::Once(ref mut chunk) = self.kind { 339 chunk.take() 340 } else { 341 None 342 } 343 } 344 } 345 346 impl Default for Body { 347 /// Returns `Body::empty()`. 348 #[inline] default() -> Body349 fn default() -> Body { 350 Body::empty() 351 } 352 } 353 354 impl HttpBody for Body { 355 type Data = Bytes; 356 type Error = crate::Error; 357 poll_data( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<Self::Data, Self::Error>>>358 fn poll_data( 359 mut self: Pin<&mut Self>, 360 cx: &mut task::Context<'_>, 361 ) -> Poll<Option<Result<Self::Data, Self::Error>>> { 362 self.poll_eof(cx) 363 } 364 poll_trailers( #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>, #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut task::Context<'_>, ) -> Poll<Result<Option<HeaderMap>, Self::Error>>365 fn poll_trailers( 366 #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>, 367 #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut task::Context<'_>, 368 ) -> Poll<Result<Option<HeaderMap>, Self::Error>> { 369 match self.kind { 370 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] 371 Kind::H2 { 372 recv: ref mut h2, 373 ref ping, 374 .. 375 } => match ready!(h2.poll_trailers(cx)) { 376 Ok(t) => { 377 ping.record_non_data(); 378 Poll::Ready(Ok(t)) 379 } 380 Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))), 381 }, 382 Kind::Chan { 383 ref mut trailers_rx, 384 .. 385 } => match ready!(Pin::new(trailers_rx).poll(cx)) { 386 Ok(t) => Poll::Ready(Ok(Some(t))), 387 Err(_) => Poll::Ready(Ok(None)), 388 }, 389 #[cfg(feature = "ffi")] 390 Kind::Ffi(ref mut body) => body.poll_trailers(cx), 391 _ => Poll::Ready(Ok(None)), 392 } 393 } 394 is_end_stream(&self) -> bool395 fn is_end_stream(&self) -> bool { 396 match self.kind { 397 Kind::Once(ref val) => val.is_none(), 398 Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO, 399 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] 400 Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(), 401 #[cfg(feature = "ffi")] 402 Kind::Ffi(..) => false, 403 #[cfg(feature = "stream")] 404 Kind::Wrapped(..) => false, 405 } 406 } 407 size_hint(&self) -> SizeHint408 fn size_hint(&self) -> SizeHint { 409 macro_rules! opt_len { 410 ($content_length:expr) => {{ 411 let mut hint = SizeHint::default(); 412 413 if let Some(content_length) = $content_length.into_opt() { 414 hint.set_exact(content_length); 415 } 416 417 hint 418 }}; 419 } 420 421 match self.kind { 422 Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64), 423 Kind::Once(None) => SizeHint::with_exact(0), 424 #[cfg(feature = "stream")] 425 Kind::Wrapped(..) => SizeHint::default(), 426 Kind::Chan { content_length, .. } => opt_len!(content_length), 427 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] 428 Kind::H2 { content_length, .. } => opt_len!(content_length), 429 #[cfg(feature = "ffi")] 430 Kind::Ffi(..) => SizeHint::default(), 431 } 432 } 433 } 434 435 impl fmt::Debug for Body { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result436 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 437 #[derive(Debug)] 438 struct Streaming; 439 #[derive(Debug)] 440 struct Empty; 441 #[derive(Debug)] 442 struct Full<'a>(&'a Bytes); 443 444 let mut builder = f.debug_tuple("Body"); 445 match self.kind { 446 Kind::Once(None) => builder.field(&Empty), 447 Kind::Once(Some(ref chunk)) => builder.field(&Full(chunk)), 448 _ => builder.field(&Streaming), 449 }; 450 451 builder.finish() 452 } 453 } 454 455 /// # Optional 456 /// 457 /// This function requires enabling the `stream` feature in your 458 /// `Cargo.toml`. 459 #[cfg(feature = "stream")] 460 impl Stream for Body { 461 type Item = crate::Result<Bytes>; 462 poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>>463 fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { 464 HttpBody::poll_data(self, cx) 465 } 466 } 467 468 /// # Optional 469 /// 470 /// This function requires enabling the `stream` feature in your 471 /// `Cargo.toml`. 472 #[cfg(feature = "stream")] 473 impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body { 474 #[inline] from( stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>, ) -> Body475 fn from( 476 stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>, 477 ) -> Body { 478 Body::new(Kind::Wrapped(SyncWrapper::new(stream.into()))) 479 } 480 } 481 482 impl From<Bytes> for Body { 483 #[inline] from(chunk: Bytes) -> Body484 fn from(chunk: Bytes) -> Body { 485 if chunk.is_empty() { 486 Body::empty() 487 } else { 488 Body::new(Kind::Once(Some(chunk))) 489 } 490 } 491 } 492 493 impl From<Vec<u8>> for Body { 494 #[inline] from(vec: Vec<u8>) -> Body495 fn from(vec: Vec<u8>) -> Body { 496 Body::from(Bytes::from(vec)) 497 } 498 } 499 500 impl From<&'static [u8]> for Body { 501 #[inline] from(slice: &'static [u8]) -> Body502 fn from(slice: &'static [u8]) -> Body { 503 Body::from(Bytes::from(slice)) 504 } 505 } 506 507 impl From<Cow<'static, [u8]>> for Body { 508 #[inline] from(cow: Cow<'static, [u8]>) -> Body509 fn from(cow: Cow<'static, [u8]>) -> Body { 510 match cow { 511 Cow::Borrowed(b) => Body::from(b), 512 Cow::Owned(o) => Body::from(o), 513 } 514 } 515 } 516 517 impl From<String> for Body { 518 #[inline] from(s: String) -> Body519 fn from(s: String) -> Body { 520 Body::from(Bytes::from(s.into_bytes())) 521 } 522 } 523 524 impl From<&'static str> for Body { 525 #[inline] from(slice: &'static str) -> Body526 fn from(slice: &'static str) -> Body { 527 Body::from(Bytes::from(slice.as_bytes())) 528 } 529 } 530 531 impl From<Cow<'static, str>> for Body { 532 #[inline] from(cow: Cow<'static, str>) -> Body533 fn from(cow: Cow<'static, str>) -> Body { 534 match cow { 535 Cow::Borrowed(b) => Body::from(b), 536 Cow::Owned(o) => Body::from(o), 537 } 538 } 539 } 540 541 impl Sender { 542 /// Check to see if this `Sender` can send more data. poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>543 pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 544 // Check if the receiver end has tried polling for the body yet 545 ready!(self.poll_want(cx)?); 546 self.data_tx 547 .poll_ready(cx) 548 .map_err(|_| crate::Error::new_closed()) 549 } 550 poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>551 fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 552 match self.want_rx.load(cx) { 553 WANT_READY => Poll::Ready(Ok(())), 554 WANT_PENDING => Poll::Pending, 555 watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())), 556 unexpected => unreachable!("want_rx value: {}", unexpected), 557 } 558 } 559 ready(&mut self) -> crate::Result<()>560 async fn ready(&mut self) -> crate::Result<()> { 561 futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await 562 } 563 564 /// Send data on data channel when it is ready. send_data(&mut self, chunk: Bytes) -> crate::Result<()>565 pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> { 566 self.ready().await?; 567 self.data_tx 568 .try_send(Ok(chunk)) 569 .map_err(|_| crate::Error::new_closed()) 570 } 571 572 /// Send trailers on trailers channel. send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()>573 pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> { 574 let tx = match self.trailers_tx.take() { 575 Some(tx) => tx, 576 None => return Err(crate::Error::new_closed()), 577 }; 578 tx.send(trailers).map_err(|_| crate::Error::new_closed()) 579 } 580 581 /// Try to send data on this channel. 582 /// 583 /// # Errors 584 /// 585 /// Returns `Err(Bytes)` if the channel could not (currently) accept 586 /// another `Bytes`. 587 /// 588 /// # Note 589 /// 590 /// This is mostly useful for when trying to send from some other thread 591 /// that doesn't have an async context. If in an async context, prefer 592 /// `send_data()` instead. try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes>593 pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> { 594 self.data_tx 595 .try_send(Ok(chunk)) 596 .map_err(|err| err.into_inner().expect("just sent Ok")) 597 } 598 599 /// Aborts the body in an abnormal fashion. abort(self)600 pub fn abort(self) { 601 let _ = self 602 .data_tx 603 // clone so the send works even if buffer is full 604 .clone() 605 .try_send(Err(crate::Error::new_body_write_aborted())); 606 } 607 608 #[cfg(feature = "http1")] send_error(&mut self, err: crate::Error)609 pub(crate) fn send_error(&mut self, err: crate::Error) { 610 let _ = self.data_tx.try_send(Err(err)); 611 } 612 } 613 614 impl fmt::Debug for Sender { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result615 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 616 #[derive(Debug)] 617 struct Open; 618 #[derive(Debug)] 619 struct Closed; 620 621 let mut builder = f.debug_tuple("Sender"); 622 match self.want_rx.peek() { 623 watch::CLOSED => builder.field(&Closed), 624 _ => builder.field(&Open), 625 }; 626 627 builder.finish() 628 } 629 } 630 631 #[cfg(test)] 632 mod tests { 633 use std::mem; 634 use std::task::Poll; 635 636 use super::{Body, DecodedLength, HttpBody, Sender, SizeHint}; 637 638 #[test] test_size_of()639 fn test_size_of() { 640 // These are mostly to help catch *accidentally* increasing 641 // the size by too much. 642 643 let body_size = mem::size_of::<Body>(); 644 let body_expected_size = mem::size_of::<u64>() * 6; 645 assert!( 646 body_size <= body_expected_size, 647 "Body size = {} <= {}", 648 body_size, 649 body_expected_size, 650 ); 651 652 assert_eq!(body_size, mem::size_of::<Option<Body>>(), "Option<Body>"); 653 654 assert_eq!( 655 mem::size_of::<Sender>(), 656 mem::size_of::<usize>() * 5, 657 "Sender" 658 ); 659 660 assert_eq!( 661 mem::size_of::<Sender>(), 662 mem::size_of::<Option<Sender>>(), 663 "Option<Sender>" 664 ); 665 } 666 667 #[test] size_hint()668 fn size_hint() { 669 fn eq(body: Body, b: SizeHint, note: &str) { 670 let a = body.size_hint(); 671 assert_eq!(a.lower(), b.lower(), "lower for {:?}", note); 672 assert_eq!(a.upper(), b.upper(), "upper for {:?}", note); 673 } 674 675 eq(Body::from("Hello"), SizeHint::with_exact(5), "from str"); 676 677 eq(Body::empty(), SizeHint::with_exact(0), "empty"); 678 679 eq(Body::channel().1, SizeHint::new(), "channel"); 680 681 eq( 682 Body::new_channel(DecodedLength::new(4), /*wanter =*/ false).1, 683 SizeHint::with_exact(4), 684 "channel with length", 685 ); 686 } 687 688 #[tokio::test] channel_abort()689 async fn channel_abort() { 690 let (tx, mut rx) = Body::channel(); 691 692 tx.abort(); 693 694 let err = rx.data().await.unwrap().unwrap_err(); 695 assert!(err.is_body_write_aborted(), "{:?}", err); 696 } 697 698 #[tokio::test] channel_abort_when_buffer_is_full()699 async fn channel_abort_when_buffer_is_full() { 700 let (mut tx, mut rx) = Body::channel(); 701 702 tx.try_send_data("chunk 1".into()).expect("send 1"); 703 // buffer is full, but can still send abort 704 tx.abort(); 705 706 let chunk1 = rx.data().await.expect("item 1").expect("chunk 1"); 707 assert_eq!(chunk1, "chunk 1"); 708 709 let err = rx.data().await.unwrap().unwrap_err(); 710 assert!(err.is_body_write_aborted(), "{:?}", err); 711 } 712 713 #[test] channel_buffers_one()714 fn channel_buffers_one() { 715 let (mut tx, _rx) = Body::channel(); 716 717 tx.try_send_data("chunk 1".into()).expect("send 1"); 718 719 // buffer is now full 720 let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2"); 721 assert_eq!(chunk2, "chunk 2"); 722 } 723 724 #[tokio::test] channel_empty()725 async fn channel_empty() { 726 let (_, mut rx) = Body::channel(); 727 728 assert!(rx.data().await.is_none()); 729 } 730 731 #[test] channel_ready()732 fn channel_ready() { 733 let (mut tx, _rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ false); 734 735 let mut tx_ready = tokio_test::task::spawn(tx.ready()); 736 737 assert!(tx_ready.poll().is_ready(), "tx is ready immediately"); 738 } 739 740 #[test] channel_wanter()741 fn channel_wanter() { 742 let (mut tx, mut rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); 743 744 let mut tx_ready = tokio_test::task::spawn(tx.ready()); 745 let mut rx_data = tokio_test::task::spawn(rx.data()); 746 747 assert!( 748 tx_ready.poll().is_pending(), 749 "tx isn't ready before rx has been polled" 750 ); 751 752 assert!(rx_data.poll().is_pending(), "poll rx.data"); 753 assert!(tx_ready.is_woken(), "rx poll wakes tx"); 754 755 assert!( 756 tx_ready.poll().is_ready(), 757 "tx is ready after rx has been polled" 758 ); 759 } 760 761 #[test] channel_notices_closure()762 fn channel_notices_closure() { 763 let (mut tx, rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); 764 765 let mut tx_ready = tokio_test::task::spawn(tx.ready()); 766 767 assert!( 768 tx_ready.poll().is_pending(), 769 "tx isn't ready before rx has been polled" 770 ); 771 772 drop(rx); 773 assert!(tx_ready.is_woken(), "dropping rx wakes tx"); 774 775 match tx_ready.poll() { 776 Poll::Ready(Err(ref e)) if e.is_closed() => (), 777 unexpected => panic!("tx poll ready unexpected: {:?}", unexpected), 778 } 779 } 780 } 781