1 use std::borrow::Cow; 2 #[cfg(feature = "stream")] 3 use std::error::Error as StdError; 4 use std::fmt; 5 6 use bytes::Bytes; 7 use futures_channel::mpsc; 8 use futures_channel::oneshot; 9 use futures_core::Stream; // for mpsc::Receiver 10 #[cfg(feature = "stream")] 11 use futures_util::TryStreamExt; 12 use http::HeaderMap; 13 use http_body::{Body as HttpBody, SizeHint}; 14 15 use super::DecodedLength; 16 #[cfg(feature = "stream")] 17 use crate::common::sync_wrapper::SyncWrapper; 18 use crate::common::Future; 19 #[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] 20 use crate::common::Never; 21 use crate::common::{task, watch, Pin, Poll}; 22 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] 23 use crate::proto::h2::ping; 24 25 type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>; 26 type TrailersSender = oneshot::Sender<HeaderMap>; 27 28 /// A stream of `Bytes`, used when receiving bodies. 29 /// 30 /// A good default [`HttpBody`](crate::body::HttpBody) to use in many 31 /// applications. 32 /// 33 /// Note: To read the full body, use [`body::to_bytes`](crate::body::to_bytes) 34 /// or [`body::aggregate`](crate::body::aggregate). 35 #[must_use = "streams do nothing unless polled"] 36 pub struct Body { 37 kind: Kind, 38 /// Keep the extra bits in an `Option<Box<Extra>>`, so that 39 /// Body stays small in the common case (no extras needed). 40 extra: Option<Box<Extra>>, 41 } 42 43 enum Kind { 44 Once(Option<Bytes>), 45 Chan { 46 content_length: DecodedLength, 47 want_tx: watch::Sender, 48 data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>, 49 trailers_rx: oneshot::Receiver<HeaderMap>, 50 }, 51 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] 52 H2 { 53 ping: ping::Recorder, 54 content_length: DecodedLength, 55 recv: h2::RecvStream, 56 }, 57 #[cfg(feature = "ffi")] 58 Ffi(crate::ffi::UserBody), 59 #[cfg(feature = "stream")] 60 Wrapped( 61 SyncWrapper< 62 Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>, 63 >, 64 ), 65 } 66 67 struct Extra { 68 /// Allow the client to pass a future to delay the `Body` from returning 69 /// EOF. This allows the `Client` to try to put the idle connection 70 /// back into the pool before the body is "finished". 71 /// 72 /// The reason for this is so that creating a new request after finishing 73 /// streaming the body of a response could sometimes result in creating 74 /// a brand new connection, since the pool didn't know about the idle 75 /// connection yet. 76 delayed_eof: Option<DelayEof>, 77 } 78 79 #[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))] 80 type DelayEofUntil = oneshot::Receiver<Never>; 81 82 enum DelayEof { 83 /// Initial state, stream hasn't seen EOF yet. 84 #[cfg(any(feature = "http1", feature = "http2"))] 85 #[cfg(feature = "client")] 86 NotEof(DelayEofUntil), 87 /// Transitions to this state once we've seen `poll` try to 88 /// return EOF (`None`). This future is then polled, and 89 /// when it completes, the Body finally returns EOF (`None`). 90 #[cfg(any(feature = "http1", feature = "http2"))] 91 #[cfg(feature = "client")] 92 Eof(DelayEofUntil), 93 } 94 95 /// A sender half created through [`Body::channel()`]. 96 /// 97 /// Useful when wanting to stream chunks from another thread. 98 /// 99 /// ## Body Closing 100 /// 101 /// Note that the request body will always be closed normally when the sender is dropped (meaning 102 /// that the empty terminating chunk will be sent to the remote). If you desire to close the 103 /// connection with an incomplete response (e.g. in the case of an error during asynchronous 104 /// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion. 105 /// 106 /// [`Body::channel()`]: struct.Body.html#method.channel 107 /// [`Sender::abort()`]: struct.Sender.html#method.abort 108 #[must_use = "Sender does nothing unless sent on"] 109 pub struct Sender { 110 want_rx: watch::Receiver, 111 data_tx: BodySender, 112 trailers_tx: Option<TrailersSender>, 113 } 114 115 const WANT_PENDING: usize = 1; 116 const WANT_READY: usize = 2; 117 118 impl Body { 119 /// Create an empty `Body` stream. 120 /// 121 /// # Example 122 /// 123 /// ``` 124 /// use hyper::{Body, Request}; 125 /// 126 /// // create a `GET /` request 127 /// let get = Request::new(Body::empty()); 128 /// ``` 129 #[inline] empty() -> Body130 pub fn empty() -> Body { 131 Body::new(Kind::Once(None)) 132 } 133 134 /// Create a `Body` stream with an associated sender half. 135 /// 136 /// Useful when wanting to stream chunks from another thread. 137 #[inline] channel() -> (Sender, Body)138 pub fn channel() -> (Sender, Body) { 139 Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false) 140 } 141 new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body)142 pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) { 143 let (data_tx, data_rx) = mpsc::channel(0); 144 let (trailers_tx, trailers_rx) = oneshot::channel(); 145 146 // If wanter is true, `Sender::poll_ready()` won't becoming ready 147 // until the `Body` has been polled for data once. 148 let want = if wanter { WANT_PENDING } else { WANT_READY }; 149 150 let (want_tx, want_rx) = watch::channel(want); 151 152 let tx = Sender { 153 want_rx, 154 data_tx, 155 trailers_tx: Some(trailers_tx), 156 }; 157 let rx = Body::new(Kind::Chan { 158 content_length, 159 want_tx, 160 data_rx, 161 trailers_rx, 162 }); 163 164 (tx, rx) 165 } 166 167 /// Wrap a futures `Stream` in a box inside `Body`. 168 /// 169 /// # Example 170 /// 171 /// ``` 172 /// # use hyper::Body; 173 /// let chunks: Vec<Result<_, std::io::Error>> = vec![ 174 /// Ok("hello"), 175 /// Ok(" "), 176 /// Ok("world"), 177 /// ]; 178 /// 179 /// let stream = futures_util::stream::iter(chunks); 180 /// 181 /// let body = Body::wrap_stream(stream); 182 /// ``` 183 /// 184 /// # Optional 185 /// 186 /// This function requires enabling the `stream` feature in your 187 /// `Cargo.toml`. 188 #[cfg(feature = "stream")] 189 #[cfg_attr(docsrs, doc(cfg(feature = "stream")))] wrap_stream<S, O, E>(stream: S) -> Body where S: Stream<Item = Result<O, E>> + Send + 'static, O: Into<Bytes> + 'static, E: Into<Box<dyn StdError + Send + Sync>> + 'static,190 pub fn wrap_stream<S, O, E>(stream: S) -> Body 191 where 192 S: Stream<Item = Result<O, E>> + Send + 'static, 193 O: Into<Bytes> + 'static, 194 E: Into<Box<dyn StdError + Send + Sync>> + 'static, 195 { 196 let mapped = stream.map_ok(Into::into).map_err(Into::into); 197 Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped)))) 198 } 199 new(kind: Kind) -> Body200 fn new(kind: Kind) -> Body { 201 Body { kind, extra: None } 202 } 203 204 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] h2( recv: h2::RecvStream, mut content_length: DecodedLength, ping: ping::Recorder, ) -> Self205 pub(crate) fn h2( 206 recv: h2::RecvStream, 207 mut content_length: DecodedLength, 208 ping: ping::Recorder, 209 ) -> Self { 210 // If the stream is already EOS, then the "unknown length" is clearly 211 // actually ZERO. 212 if !content_length.is_exact() && recv.is_end_stream() { 213 content_length = DecodedLength::ZERO; 214 } 215 let body = Body::new(Kind::H2 { 216 ping, 217 content_length, 218 recv, 219 }); 220 221 body 222 } 223 224 #[cfg(any(feature = "http1", feature = "http2"))] 225 #[cfg(feature = "client")] delayed_eof(&mut self, fut: DelayEofUntil)226 pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) { 227 self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut)); 228 } 229 take_delayed_eof(&mut self) -> Option<DelayEof>230 fn take_delayed_eof(&mut self) -> Option<DelayEof> { 231 self.extra 232 .as_mut() 233 .and_then(|extra| extra.delayed_eof.take()) 234 } 235 236 #[cfg(any(feature = "http1", feature = "http2"))] extra_mut(&mut self) -> &mut Extra237 fn extra_mut(&mut self) -> &mut Extra { 238 self.extra 239 .get_or_insert_with(|| Box::new(Extra { delayed_eof: None })) 240 } 241 poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>>242 fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> { 243 match self.take_delayed_eof() { 244 #[cfg(any(feature = "http1", feature = "http2"))] 245 #[cfg(feature = "client")] 246 Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) { 247 ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => { 248 self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay)); 249 ok 250 } 251 Poll::Ready(None) => match Pin::new(&mut delay).poll(cx) { 252 Poll::Ready(Ok(never)) => match never {}, 253 Poll::Pending => { 254 self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay)); 255 Poll::Pending 256 } 257 Poll::Ready(Err(_done)) => Poll::Ready(None), 258 }, 259 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))), 260 }, 261 #[cfg(any(feature = "http1", feature = "http2"))] 262 #[cfg(feature = "client")] 263 Some(DelayEof::Eof(mut delay)) => match Pin::new(&mut delay).poll(cx) { 264 Poll::Ready(Ok(never)) => match never {}, 265 Poll::Pending => { 266 self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay)); 267 Poll::Pending 268 } 269 Poll::Ready(Err(_done)) => Poll::Ready(None), 270 }, 271 #[cfg(any( 272 not(any(feature = "http1", feature = "http2")), 273 not(feature = "client") 274 ))] 275 Some(delay_eof) => match delay_eof {}, 276 None => self.poll_inner(cx), 277 } 278 } 279 280 #[cfg(feature = "ffi")] as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody281 pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody { 282 match self.kind { 283 Kind::Ffi(ref mut body) => return body, 284 _ => { 285 self.kind = Kind::Ffi(crate::ffi::UserBody::new()); 286 } 287 } 288 289 match self.kind { 290 Kind::Ffi(ref mut body) => body, 291 _ => unreachable!(), 292 } 293 } 294 poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>>295 fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> { 296 match self.kind { 297 Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)), 298 Kind::Chan { 299 content_length: ref mut len, 300 ref mut data_rx, 301 ref mut want_tx, 302 .. 303 } => { 304 want_tx.send(WANT_READY); 305 306 match ready!(Pin::new(data_rx).poll_next(cx)?) { 307 Some(chunk) => { 308 len.sub_if(chunk.len() as u64); 309 Poll::Ready(Some(Ok(chunk))) 310 } 311 None => Poll::Ready(None), 312 } 313 } 314 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] 315 Kind::H2 { 316 ref ping, 317 recv: ref mut h2, 318 content_length: ref mut len, 319 } => match ready!(h2.poll_data(cx)) { 320 Some(Ok(bytes)) => { 321 let _ = h2.flow_control().release_capacity(bytes.len()); 322 len.sub_if(bytes.len() as u64); 323 ping.record_data(bytes.len()); 324 Poll::Ready(Some(Ok(bytes))) 325 } 326 Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))), 327 None => Poll::Ready(None), 328 }, 329 330 #[cfg(feature = "ffi")] 331 Kind::Ffi(ref mut body) => body.poll_data(cx), 332 333 #[cfg(feature = "stream")] 334 Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) { 335 Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))), 336 None => Poll::Ready(None), 337 }, 338 } 339 } 340 341 #[cfg(feature = "http1")] take_full_data(&mut self) -> Option<Bytes>342 pub(super) fn take_full_data(&mut self) -> Option<Bytes> { 343 if let Kind::Once(ref mut chunk) = self.kind { 344 chunk.take() 345 } else { 346 None 347 } 348 } 349 } 350 351 impl Default for Body { 352 /// Returns `Body::empty()`. 353 #[inline] default() -> Body354 fn default() -> Body { 355 Body::empty() 356 } 357 } 358 359 impl HttpBody for Body { 360 type Data = Bytes; 361 type Error = crate::Error; 362 poll_data( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<Self::Data, Self::Error>>>363 fn poll_data( 364 mut self: Pin<&mut Self>, 365 cx: &mut task::Context<'_>, 366 ) -> Poll<Option<Result<Self::Data, Self::Error>>> { 367 self.poll_eof(cx) 368 } 369 poll_trailers( #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>, #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut task::Context<'_>, ) -> Poll<Result<Option<HeaderMap>, Self::Error>>370 fn poll_trailers( 371 #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>, 372 #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut task::Context<'_>, 373 ) -> Poll<Result<Option<HeaderMap>, Self::Error>> { 374 match self.kind { 375 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] 376 Kind::H2 { 377 recv: ref mut h2, 378 ref ping, 379 .. 380 } => match ready!(h2.poll_trailers(cx)) { 381 Ok(t) => { 382 ping.record_non_data(); 383 Poll::Ready(Ok(t)) 384 } 385 Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))), 386 }, 387 Kind::Chan { 388 ref mut trailers_rx, 389 .. 390 } => match ready!(Pin::new(trailers_rx).poll(cx)) { 391 Ok(t) => Poll::Ready(Ok(Some(t))), 392 Err(_) => Poll::Ready(Ok(None)), 393 }, 394 #[cfg(feature = "ffi")] 395 Kind::Ffi(ref mut body) => body.poll_trailers(cx), 396 _ => Poll::Ready(Ok(None)), 397 } 398 } 399 is_end_stream(&self) -> bool400 fn is_end_stream(&self) -> bool { 401 match self.kind { 402 Kind::Once(ref val) => val.is_none(), 403 Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO, 404 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] 405 Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(), 406 #[cfg(feature = "ffi")] 407 Kind::Ffi(..) => false, 408 #[cfg(feature = "stream")] 409 Kind::Wrapped(..) => false, 410 } 411 } 412 size_hint(&self) -> SizeHint413 fn size_hint(&self) -> SizeHint { 414 macro_rules! opt_len { 415 ($content_length:expr) => {{ 416 let mut hint = SizeHint::default(); 417 418 if let Some(content_length) = $content_length.into_opt() { 419 hint.set_exact(content_length); 420 } 421 422 hint 423 }}; 424 } 425 426 match self.kind { 427 Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64), 428 Kind::Once(None) => SizeHint::with_exact(0), 429 #[cfg(feature = "stream")] 430 Kind::Wrapped(..) => SizeHint::default(), 431 Kind::Chan { content_length, .. } => opt_len!(content_length), 432 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))] 433 Kind::H2 { content_length, .. } => opt_len!(content_length), 434 #[cfg(feature = "ffi")] 435 Kind::Ffi(..) => SizeHint::default(), 436 } 437 } 438 } 439 440 impl fmt::Debug for Body { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result441 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 442 #[derive(Debug)] 443 struct Streaming; 444 #[derive(Debug)] 445 struct Empty; 446 #[derive(Debug)] 447 struct Full<'a>(&'a Bytes); 448 449 let mut builder = f.debug_tuple("Body"); 450 match self.kind { 451 Kind::Once(None) => builder.field(&Empty), 452 Kind::Once(Some(ref chunk)) => builder.field(&Full(chunk)), 453 _ => builder.field(&Streaming), 454 }; 455 456 builder.finish() 457 } 458 } 459 460 /// # Optional 461 /// 462 /// This function requires enabling the `stream` feature in your 463 /// `Cargo.toml`. 464 #[cfg(feature = "stream")] 465 impl Stream for Body { 466 type Item = crate::Result<Bytes>; 467 poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>>468 fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> { 469 HttpBody::poll_data(self, cx) 470 } 471 } 472 473 /// # Optional 474 /// 475 /// This function requires enabling the `stream` feature in your 476 /// `Cargo.toml`. 477 #[cfg(feature = "stream")] 478 impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body { 479 #[inline] from( stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>, ) -> Body480 fn from( 481 stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>, 482 ) -> Body { 483 Body::new(Kind::Wrapped(SyncWrapper::new(stream.into()))) 484 } 485 } 486 487 impl From<Bytes> for Body { 488 #[inline] from(chunk: Bytes) -> Body489 fn from(chunk: Bytes) -> Body { 490 if chunk.is_empty() { 491 Body::empty() 492 } else { 493 Body::new(Kind::Once(Some(chunk))) 494 } 495 } 496 } 497 498 impl From<Vec<u8>> for Body { 499 #[inline] from(vec: Vec<u8>) -> Body500 fn from(vec: Vec<u8>) -> Body { 501 Body::from(Bytes::from(vec)) 502 } 503 } 504 505 impl From<&'static [u8]> for Body { 506 #[inline] from(slice: &'static [u8]) -> Body507 fn from(slice: &'static [u8]) -> Body { 508 Body::from(Bytes::from(slice)) 509 } 510 } 511 512 impl From<Cow<'static, [u8]>> for Body { 513 #[inline] from(cow: Cow<'static, [u8]>) -> Body514 fn from(cow: Cow<'static, [u8]>) -> Body { 515 match cow { 516 Cow::Borrowed(b) => Body::from(b), 517 Cow::Owned(o) => Body::from(o), 518 } 519 } 520 } 521 522 impl From<String> for Body { 523 #[inline] from(s: String) -> Body524 fn from(s: String) -> Body { 525 Body::from(Bytes::from(s.into_bytes())) 526 } 527 } 528 529 impl From<&'static str> for Body { 530 #[inline] from(slice: &'static str) -> Body531 fn from(slice: &'static str) -> Body { 532 Body::from(Bytes::from(slice.as_bytes())) 533 } 534 } 535 536 impl From<Cow<'static, str>> for Body { 537 #[inline] from(cow: Cow<'static, str>) -> Body538 fn from(cow: Cow<'static, str>) -> Body { 539 match cow { 540 Cow::Borrowed(b) => Body::from(b), 541 Cow::Owned(o) => Body::from(o), 542 } 543 } 544 } 545 546 impl Sender { 547 /// Check to see if this `Sender` can send more data. poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>548 pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 549 // Check if the receiver end has tried polling for the body yet 550 ready!(self.poll_want(cx)?); 551 self.data_tx 552 .poll_ready(cx) 553 .map_err(|_| crate::Error::new_closed()) 554 } 555 poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>556 fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> { 557 match self.want_rx.load(cx) { 558 WANT_READY => Poll::Ready(Ok(())), 559 WANT_PENDING => Poll::Pending, 560 watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())), 561 unexpected => unreachable!("want_rx value: {}", unexpected), 562 } 563 } 564 ready(&mut self) -> crate::Result<()>565 async fn ready(&mut self) -> crate::Result<()> { 566 futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await 567 } 568 569 /// Send data on data channel when it is ready. send_data(&mut self, chunk: Bytes) -> crate::Result<()>570 pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> { 571 self.ready().await?; 572 self.data_tx 573 .try_send(Ok(chunk)) 574 .map_err(|_| crate::Error::new_closed()) 575 } 576 577 /// Send trailers on trailers channel. send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()>578 pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> { 579 let tx = match self.trailers_tx.take() { 580 Some(tx) => tx, 581 None => return Err(crate::Error::new_closed()), 582 }; 583 tx.send(trailers).map_err(|_| crate::Error::new_closed()) 584 } 585 586 /// Try to send data on this channel. 587 /// 588 /// # Errors 589 /// 590 /// Returns `Err(Bytes)` if the channel could not (currently) accept 591 /// another `Bytes`. 592 /// 593 /// # Note 594 /// 595 /// This is mostly useful for when trying to send from some other thread 596 /// that doesn't have an async context. If in an async context, prefer 597 /// `send_data()` instead. try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes>598 pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> { 599 self.data_tx 600 .try_send(Ok(chunk)) 601 .map_err(|err| err.into_inner().expect("just sent Ok")) 602 } 603 604 /// Aborts the body in an abnormal fashion. abort(self)605 pub fn abort(self) { 606 let _ = self 607 .data_tx 608 // clone so the send works even if buffer is full 609 .clone() 610 .try_send(Err(crate::Error::new_body_write_aborted())); 611 } 612 613 #[cfg(feature = "http1")] send_error(&mut self, err: crate::Error)614 pub(crate) fn send_error(&mut self, err: crate::Error) { 615 let _ = self.data_tx.try_send(Err(err)); 616 } 617 } 618 619 impl fmt::Debug for Sender { fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result620 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 621 #[derive(Debug)] 622 struct Open; 623 #[derive(Debug)] 624 struct Closed; 625 626 let mut builder = f.debug_tuple("Sender"); 627 match self.want_rx.peek() { 628 watch::CLOSED => builder.field(&Closed), 629 _ => builder.field(&Open), 630 }; 631 632 builder.finish() 633 } 634 } 635 636 #[cfg(test)] 637 mod tests { 638 use std::mem; 639 use std::task::Poll; 640 641 use super::{Body, DecodedLength, HttpBody, Sender, SizeHint}; 642 643 #[test] test_size_of()644 fn test_size_of() { 645 // These are mostly to help catch *accidentally* increasing 646 // the size by too much. 647 648 let body_size = mem::size_of::<Body>(); 649 let body_expected_size = mem::size_of::<u64>() * 6; 650 assert!( 651 body_size <= body_expected_size, 652 "Body size = {} <= {}", 653 body_size, 654 body_expected_size, 655 ); 656 657 assert_eq!(body_size, mem::size_of::<Option<Body>>(), "Option<Body>"); 658 659 assert_eq!( 660 mem::size_of::<Sender>(), 661 mem::size_of::<usize>() * 5, 662 "Sender" 663 ); 664 665 assert_eq!( 666 mem::size_of::<Sender>(), 667 mem::size_of::<Option<Sender>>(), 668 "Option<Sender>" 669 ); 670 } 671 672 #[test] size_hint()673 fn size_hint() { 674 fn eq(body: Body, b: SizeHint, note: &str) { 675 let a = body.size_hint(); 676 assert_eq!(a.lower(), b.lower(), "lower for {:?}", note); 677 assert_eq!(a.upper(), b.upper(), "upper for {:?}", note); 678 } 679 680 eq(Body::from("Hello"), SizeHint::with_exact(5), "from str"); 681 682 eq(Body::empty(), SizeHint::with_exact(0), "empty"); 683 684 eq(Body::channel().1, SizeHint::new(), "channel"); 685 686 eq( 687 Body::new_channel(DecodedLength::new(4), /*wanter =*/ false).1, 688 SizeHint::with_exact(4), 689 "channel with length", 690 ); 691 } 692 693 #[tokio::test] channel_abort()694 async fn channel_abort() { 695 let (tx, mut rx) = Body::channel(); 696 697 tx.abort(); 698 699 let err = rx.data().await.unwrap().unwrap_err(); 700 assert!(err.is_body_write_aborted(), "{:?}", err); 701 } 702 703 #[tokio::test] channel_abort_when_buffer_is_full()704 async fn channel_abort_when_buffer_is_full() { 705 let (mut tx, mut rx) = Body::channel(); 706 707 tx.try_send_data("chunk 1".into()).expect("send 1"); 708 // buffer is full, but can still send abort 709 tx.abort(); 710 711 let chunk1 = rx.data().await.expect("item 1").expect("chunk 1"); 712 assert_eq!(chunk1, "chunk 1"); 713 714 let err = rx.data().await.unwrap().unwrap_err(); 715 assert!(err.is_body_write_aborted(), "{:?}", err); 716 } 717 718 #[test] channel_buffers_one()719 fn channel_buffers_one() { 720 let (mut tx, _rx) = Body::channel(); 721 722 tx.try_send_data("chunk 1".into()).expect("send 1"); 723 724 // buffer is now full 725 let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2"); 726 assert_eq!(chunk2, "chunk 2"); 727 } 728 729 #[tokio::test] channel_empty()730 async fn channel_empty() { 731 let (_, mut rx) = Body::channel(); 732 733 assert!(rx.data().await.is_none()); 734 } 735 736 #[test] channel_ready()737 fn channel_ready() { 738 let (mut tx, _rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ false); 739 740 let mut tx_ready = tokio_test::task::spawn(tx.ready()); 741 742 assert!(tx_ready.poll().is_ready(), "tx is ready immediately"); 743 } 744 745 #[test] channel_wanter()746 fn channel_wanter() { 747 let (mut tx, mut rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); 748 749 let mut tx_ready = tokio_test::task::spawn(tx.ready()); 750 let mut rx_data = tokio_test::task::spawn(rx.data()); 751 752 assert!( 753 tx_ready.poll().is_pending(), 754 "tx isn't ready before rx has been polled" 755 ); 756 757 assert!(rx_data.poll().is_pending(), "poll rx.data"); 758 assert!(tx_ready.is_woken(), "rx poll wakes tx"); 759 760 assert!( 761 tx_ready.poll().is_ready(), 762 "tx is ready after rx has been polled" 763 ); 764 } 765 766 #[test] channel_notices_closure()767 fn channel_notices_closure() { 768 let (mut tx, rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true); 769 770 let mut tx_ready = tokio_test::task::spawn(tx.ready()); 771 772 assert!( 773 tx_ready.poll().is_pending(), 774 "tx isn't ready before rx has been polled" 775 ); 776 777 drop(rx); 778 assert!(tx_ready.is_woken(), "dropping rx wakes tx"); 779 780 match tx_ready.poll() { 781 Poll::Ready(Err(ref e)) if e.is_closed() => (), 782 unexpected => panic!("tx poll ready unexpected: {:?}", unexpected), 783 } 784 } 785 } 786