1 use std::borrow::Cow;
2 #[cfg(feature = "stream")]
3 use std::error::Error as StdError;
4 use std::fmt;
5 
6 use bytes::Bytes;
7 use futures_channel::mpsc;
8 use futures_channel::oneshot;
9 use futures_core::Stream; // for mpsc::Receiver
10 #[cfg(feature = "stream")]
11 use futures_util::TryStreamExt;
12 use http::HeaderMap;
13 use http_body::{Body as HttpBody, SizeHint};
14 
15 use super::DecodedLength;
16 #[cfg(feature = "stream")]
17 use crate::common::sync_wrapper::SyncWrapper;
18 use crate::common::Future;
19 #[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
20 use crate::common::Never;
21 use crate::common::{task, watch, Pin, Poll};
22 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
23 use crate::proto::h2::ping;
24 
25 type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
26 type TrailersSender = oneshot::Sender<HeaderMap>;
27 
28 /// A stream of `Bytes`, used when receiving bodies.
29 ///
30 /// A good default [`HttpBody`](crate::body::HttpBody) to use in many
31 /// applications.
32 ///
33 /// Note: To read the full body, use [`body::to_bytes`](crate::body::to_bytes)
34 /// or [`body::aggregate`](crate::body::aggregate).
35 #[must_use = "streams do nothing unless polled"]
36 pub struct Body {
37     kind: Kind,
38     /// Keep the extra bits in an `Option<Box<Extra>>`, so that
39     /// Body stays small in the common case (no extras needed).
40     extra: Option<Box<Extra>>,
41 }
42 
43 enum Kind {
44     Once(Option<Bytes>),
45     Chan {
46         content_length: DecodedLength,
47         want_tx: watch::Sender,
48         data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
49         trailers_rx: oneshot::Receiver<HeaderMap>,
50     },
51     #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
52     H2 {
53         ping: ping::Recorder,
54         content_length: DecodedLength,
55         recv: h2::RecvStream,
56     },
57     #[cfg(feature = "ffi")]
58     Ffi(crate::ffi::UserBody),
59     #[cfg(feature = "stream")]
60     Wrapped(
61         SyncWrapper<
62             Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>,
63         >,
64     ),
65 }
66 
67 struct Extra {
68     /// Allow the client to pass a future to delay the `Body` from returning
69     /// EOF. This allows the `Client` to try to put the idle connection
70     /// back into the pool before the body is "finished".
71     ///
72     /// The reason for this is so that creating a new request after finishing
73     /// streaming the body of a response could sometimes result in creating
74     /// a brand new connection, since the pool didn't know about the idle
75     /// connection yet.
76     delayed_eof: Option<DelayEof>,
77 }
78 
79 #[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
80 type DelayEofUntil = oneshot::Receiver<Never>;
81 
82 enum DelayEof {
83     /// Initial state, stream hasn't seen EOF yet.
84     #[cfg(any(feature = "http1", feature = "http2"))]
85     #[cfg(feature = "client")]
86     NotEof(DelayEofUntil),
87     /// Transitions to this state once we've seen `poll` try to
88     /// return EOF (`None`). This future is then polled, and
89     /// when it completes, the Body finally returns EOF (`None`).
90     #[cfg(any(feature = "http1", feature = "http2"))]
91     #[cfg(feature = "client")]
92     Eof(DelayEofUntil),
93 }
94 
95 /// A sender half created through [`Body::channel()`].
96 ///
97 /// Useful when wanting to stream chunks from another thread.
98 ///
99 /// ## Body Closing
100 ///
101 /// Note that the request body will always be closed normally when the sender is dropped (meaning
102 /// that the empty terminating chunk will be sent to the remote). If you desire to close the
103 /// connection with an incomplete response (e.g. in the case of an error during asynchronous
104 /// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion.
105 ///
106 /// [`Body::channel()`]: struct.Body.html#method.channel
107 /// [`Sender::abort()`]: struct.Sender.html#method.abort
108 #[must_use = "Sender does nothing unless sent on"]
109 pub struct Sender {
110     want_rx: watch::Receiver,
111     data_tx: BodySender,
112     trailers_tx: Option<TrailersSender>,
113 }
114 
115 const WANT_PENDING: usize = 1;
116 const WANT_READY: usize = 2;
117 
118 impl Body {
119     /// Create an empty `Body` stream.
120     ///
121     /// # Example
122     ///
123     /// ```
124     /// use hyper::{Body, Request};
125     ///
126     /// // create a `GET /` request
127     /// let get = Request::new(Body::empty());
128     /// ```
129     #[inline]
empty() -> Body130     pub fn empty() -> Body {
131         Body::new(Kind::Once(None))
132     }
133 
134     /// Create a `Body` stream with an associated sender half.
135     ///
136     /// Useful when wanting to stream chunks from another thread.
137     #[inline]
channel() -> (Sender, Body)138     pub fn channel() -> (Sender, Body) {
139         Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
140     }
141 
new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body)142     pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) {
143         let (data_tx, data_rx) = mpsc::channel(0);
144         let (trailers_tx, trailers_rx) = oneshot::channel();
145 
146         // If wanter is true, `Sender::poll_ready()` won't becoming ready
147         // until the `Body` has been polled for data once.
148         let want = if wanter { WANT_PENDING } else { WANT_READY };
149 
150         let (want_tx, want_rx) = watch::channel(want);
151 
152         let tx = Sender {
153             want_rx,
154             data_tx,
155             trailers_tx: Some(trailers_tx),
156         };
157         let rx = Body::new(Kind::Chan {
158             content_length,
159             want_tx,
160             data_rx,
161             trailers_rx,
162         });
163 
164         (tx, rx)
165     }
166 
167     /// Wrap a futures `Stream` in a box inside `Body`.
168     ///
169     /// # Example
170     ///
171     /// ```
172     /// # use hyper::Body;
173     /// let chunks: Vec<Result<_, std::io::Error>> = vec![
174     ///     Ok("hello"),
175     ///     Ok(" "),
176     ///     Ok("world"),
177     /// ];
178     ///
179     /// let stream = futures_util::stream::iter(chunks);
180     ///
181     /// let body = Body::wrap_stream(stream);
182     /// ```
183     ///
184     /// # Optional
185     ///
186     /// This function requires enabling the `stream` feature in your
187     /// `Cargo.toml`.
188     #[cfg(feature = "stream")]
189     #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
wrap_stream<S, O, E>(stream: S) -> Body where S: Stream<Item = Result<O, E>> + Send + 'static, O: Into<Bytes> + 'static, E: Into<Box<dyn StdError + Send + Sync>> + 'static,190     pub fn wrap_stream<S, O, E>(stream: S) -> Body
191     where
192         S: Stream<Item = Result<O, E>> + Send + 'static,
193         O: Into<Bytes> + 'static,
194         E: Into<Box<dyn StdError + Send + Sync>> + 'static,
195     {
196         let mapped = stream.map_ok(Into::into).map_err(Into::into);
197         Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
198     }
199 
new(kind: Kind) -> Body200     fn new(kind: Kind) -> Body {
201         Body { kind, extra: None }
202     }
203 
204     #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
h2( recv: h2::RecvStream, content_length: DecodedLength, ping: ping::Recorder, ) -> Self205     pub(crate) fn h2(
206         recv: h2::RecvStream,
207         content_length: DecodedLength,
208         ping: ping::Recorder,
209     ) -> Self {
210         let body = Body::new(Kind::H2 {
211             ping,
212             content_length,
213             recv,
214         });
215 
216         body
217     }
218 
219     #[cfg(any(feature = "http1", feature = "http2"))]
220     #[cfg(feature = "client")]
delayed_eof(&mut self, fut: DelayEofUntil)221     pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
222         self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut));
223     }
224 
take_delayed_eof(&mut self) -> Option<DelayEof>225     fn take_delayed_eof(&mut self) -> Option<DelayEof> {
226         self.extra
227             .as_mut()
228             .and_then(|extra| extra.delayed_eof.take())
229     }
230 
231     #[cfg(any(feature = "http1", feature = "http2"))]
extra_mut(&mut self) -> &mut Extra232     fn extra_mut(&mut self) -> &mut Extra {
233         self.extra
234             .get_or_insert_with(|| Box::new(Extra { delayed_eof: None }))
235     }
236 
poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>>237     fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
238         match self.take_delayed_eof() {
239             #[cfg(any(feature = "http1", feature = "http2"))]
240             #[cfg(feature = "client")]
241             Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) {
242                 ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => {
243                     self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay));
244                     ok
245                 }
246                 Poll::Ready(None) => match Pin::new(&mut delay).poll(cx) {
247                     Poll::Ready(Ok(never)) => match never {},
248                     Poll::Pending => {
249                         self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
250                         Poll::Pending
251                     }
252                     Poll::Ready(Err(_done)) => Poll::Ready(None),
253                 },
254                 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
255             },
256             #[cfg(any(feature = "http1", feature = "http2"))]
257             #[cfg(feature = "client")]
258             Some(DelayEof::Eof(mut delay)) => match Pin::new(&mut delay).poll(cx) {
259                 Poll::Ready(Ok(never)) => match never {},
260                 Poll::Pending => {
261                     self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
262                     Poll::Pending
263                 }
264                 Poll::Ready(Err(_done)) => Poll::Ready(None),
265             },
266             #[cfg(any(
267                 not(any(feature = "http1", feature = "http2")),
268                 not(feature = "client")
269             ))]
270             Some(delay_eof) => match delay_eof {},
271             None => self.poll_inner(cx),
272         }
273     }
274 
275     #[cfg(feature = "ffi")]
as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody276     pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody {
277         match self.kind {
278             Kind::Ffi(ref mut body) => return body,
279             _ => {
280                 self.kind = Kind::Ffi(crate::ffi::UserBody::new());
281             }
282         }
283 
284         match self.kind {
285             Kind::Ffi(ref mut body) => body,
286             _ => unreachable!(),
287         }
288     }
289 
poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>>290     fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
291         match self.kind {
292             Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)),
293             Kind::Chan {
294                 content_length: ref mut len,
295                 ref mut data_rx,
296                 ref mut want_tx,
297                 ..
298             } => {
299                 want_tx.send(WANT_READY);
300 
301                 match ready!(Pin::new(data_rx).poll_next(cx)?) {
302                     Some(chunk) => {
303                         len.sub_if(chunk.len() as u64);
304                         Poll::Ready(Some(Ok(chunk)))
305                     }
306                     None => Poll::Ready(None),
307                 }
308             }
309             #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
310             Kind::H2 {
311                 ref ping,
312                 recv: ref mut h2,
313                 content_length: ref mut len,
314             } => match ready!(h2.poll_data(cx)) {
315                 Some(Ok(bytes)) => {
316                     let _ = h2.flow_control().release_capacity(bytes.len());
317                     len.sub_if(bytes.len() as u64);
318                     ping.record_data(bytes.len());
319                     Poll::Ready(Some(Ok(bytes)))
320                 }
321                 Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
322                 None => Poll::Ready(None),
323             },
324 
325             #[cfg(feature = "ffi")]
326             Kind::Ffi(ref mut body) => body.poll_data(cx),
327 
328             #[cfg(feature = "stream")]
329             Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
330                 Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
331                 None => Poll::Ready(None),
332             },
333         }
334     }
335 
336     #[cfg(feature = "http1")]
take_full_data(&mut self) -> Option<Bytes>337     pub(super) fn take_full_data(&mut self) -> Option<Bytes> {
338         if let Kind::Once(ref mut chunk) = self.kind {
339             chunk.take()
340         } else {
341             None
342         }
343     }
344 }
345 
346 impl Default for Body {
347     /// Returns `Body::empty()`.
348     #[inline]
default() -> Body349     fn default() -> Body {
350         Body::empty()
351     }
352 }
353 
354 impl HttpBody for Body {
355     type Data = Bytes;
356     type Error = crate::Error;
357 
poll_data( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<Self::Data, Self::Error>>>358     fn poll_data(
359         mut self: Pin<&mut Self>,
360         cx: &mut task::Context<'_>,
361     ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
362         self.poll_eof(cx)
363     }
364 
poll_trailers( #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>, #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut task::Context<'_>, ) -> Poll<Result<Option<HeaderMap>, Self::Error>>365     fn poll_trailers(
366         #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>,
367         #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut task::Context<'_>,
368     ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
369         match self.kind {
370             #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
371             Kind::H2 {
372                 recv: ref mut h2,
373                 ref ping,
374                 ..
375             } => match ready!(h2.poll_trailers(cx)) {
376                 Ok(t) => {
377                     ping.record_non_data();
378                     Poll::Ready(Ok(t))
379                 }
380                 Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))),
381             },
382             Kind::Chan {
383                 ref mut trailers_rx,
384                 ..
385             } => match ready!(Pin::new(trailers_rx).poll(cx)) {
386                 Ok(t) => Poll::Ready(Ok(Some(t))),
387                 Err(_) => Poll::Ready(Ok(None)),
388             },
389             #[cfg(feature = "ffi")]
390             Kind::Ffi(ref mut body) => body.poll_trailers(cx),
391             _ => Poll::Ready(Ok(None)),
392         }
393     }
394 
is_end_stream(&self) -> bool395     fn is_end_stream(&self) -> bool {
396         match self.kind {
397             Kind::Once(ref val) => val.is_none(),
398             Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
399             #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
400             Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
401             #[cfg(feature = "ffi")]
402             Kind::Ffi(..) => false,
403             #[cfg(feature = "stream")]
404             Kind::Wrapped(..) => false,
405         }
406     }
407 
size_hint(&self) -> SizeHint408     fn size_hint(&self) -> SizeHint {
409         macro_rules! opt_len {
410             ($content_length:expr) => {{
411                 let mut hint = SizeHint::default();
412 
413                 if let Some(content_length) = $content_length.into_opt() {
414                     hint.set_exact(content_length);
415                 }
416 
417                 hint
418             }};
419         }
420 
421         match self.kind {
422             Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64),
423             Kind::Once(None) => SizeHint::with_exact(0),
424             #[cfg(feature = "stream")]
425             Kind::Wrapped(..) => SizeHint::default(),
426             Kind::Chan { content_length, .. } => opt_len!(content_length),
427             #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
428             Kind::H2 { content_length, .. } => opt_len!(content_length),
429             #[cfg(feature = "ffi")]
430             Kind::Ffi(..) => SizeHint::default(),
431         }
432     }
433 }
434 
435 impl fmt::Debug for Body {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result436     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
437         #[derive(Debug)]
438         struct Streaming;
439         #[derive(Debug)]
440         struct Empty;
441         #[derive(Debug)]
442         struct Full<'a>(&'a Bytes);
443 
444         let mut builder = f.debug_tuple("Body");
445         match self.kind {
446             Kind::Once(None) => builder.field(&Empty),
447             Kind::Once(Some(ref chunk)) => builder.field(&Full(chunk)),
448             _ => builder.field(&Streaming),
449         };
450 
451         builder.finish()
452     }
453 }
454 
455 /// # Optional
456 ///
457 /// This function requires enabling the `stream` feature in your
458 /// `Cargo.toml`.
459 #[cfg(feature = "stream")]
460 impl Stream for Body {
461     type Item = crate::Result<Bytes>;
462 
poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>>463     fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
464         HttpBody::poll_data(self, cx)
465     }
466 }
467 
468 /// # Optional
469 ///
470 /// This function requires enabling the `stream` feature in your
471 /// `Cargo.toml`.
472 #[cfg(feature = "stream")]
473 impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body {
474     #[inline]
from( stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>, ) -> Body475     fn from(
476         stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>,
477     ) -> Body {
478         Body::new(Kind::Wrapped(SyncWrapper::new(stream.into())))
479     }
480 }
481 
482 impl From<Bytes> for Body {
483     #[inline]
from(chunk: Bytes) -> Body484     fn from(chunk: Bytes) -> Body {
485         if chunk.is_empty() {
486             Body::empty()
487         } else {
488             Body::new(Kind::Once(Some(chunk)))
489         }
490     }
491 }
492 
493 impl From<Vec<u8>> for Body {
494     #[inline]
from(vec: Vec<u8>) -> Body495     fn from(vec: Vec<u8>) -> Body {
496         Body::from(Bytes::from(vec))
497     }
498 }
499 
500 impl From<&'static [u8]> for Body {
501     #[inline]
from(slice: &'static [u8]) -> Body502     fn from(slice: &'static [u8]) -> Body {
503         Body::from(Bytes::from(slice))
504     }
505 }
506 
507 impl From<Cow<'static, [u8]>> for Body {
508     #[inline]
from(cow: Cow<'static, [u8]>) -> Body509     fn from(cow: Cow<'static, [u8]>) -> Body {
510         match cow {
511             Cow::Borrowed(b) => Body::from(b),
512             Cow::Owned(o) => Body::from(o),
513         }
514     }
515 }
516 
517 impl From<String> for Body {
518     #[inline]
from(s: String) -> Body519     fn from(s: String) -> Body {
520         Body::from(Bytes::from(s.into_bytes()))
521     }
522 }
523 
524 impl From<&'static str> for Body {
525     #[inline]
from(slice: &'static str) -> Body526     fn from(slice: &'static str) -> Body {
527         Body::from(Bytes::from(slice.as_bytes()))
528     }
529 }
530 
531 impl From<Cow<'static, str>> for Body {
532     #[inline]
from(cow: Cow<'static, str>) -> Body533     fn from(cow: Cow<'static, str>) -> Body {
534         match cow {
535             Cow::Borrowed(b) => Body::from(b),
536             Cow::Owned(o) => Body::from(o),
537         }
538     }
539 }
540 
541 impl Sender {
542     /// Check to see if this `Sender` can send more data.
poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>543     pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
544         // Check if the receiver end has tried polling for the body yet
545         ready!(self.poll_want(cx)?);
546         self.data_tx
547             .poll_ready(cx)
548             .map_err(|_| crate::Error::new_closed())
549     }
550 
poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>551     fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
552         match self.want_rx.load(cx) {
553             WANT_READY => Poll::Ready(Ok(())),
554             WANT_PENDING => Poll::Pending,
555             watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
556             unexpected => unreachable!("want_rx value: {}", unexpected),
557         }
558     }
559 
ready(&mut self) -> crate::Result<()>560     async fn ready(&mut self) -> crate::Result<()> {
561         futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
562     }
563 
564     /// Send data on data channel when it is ready.
send_data(&mut self, chunk: Bytes) -> crate::Result<()>565     pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
566         self.ready().await?;
567         self.data_tx
568             .try_send(Ok(chunk))
569             .map_err(|_| crate::Error::new_closed())
570     }
571 
572     /// Send trailers on trailers channel.
send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()>573     pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
574         let tx = match self.trailers_tx.take() {
575             Some(tx) => tx,
576             None => return Err(crate::Error::new_closed()),
577         };
578         tx.send(trailers).map_err(|_| crate::Error::new_closed())
579     }
580 
581     /// Try to send data on this channel.
582     ///
583     /// # Errors
584     ///
585     /// Returns `Err(Bytes)` if the channel could not (currently) accept
586     /// another `Bytes`.
587     ///
588     /// # Note
589     ///
590     /// This is mostly useful for when trying to send from some other thread
591     /// that doesn't have an async context. If in an async context, prefer
592     /// `send_data()` instead.
try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes>593     pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
594         self.data_tx
595             .try_send(Ok(chunk))
596             .map_err(|err| err.into_inner().expect("just sent Ok"))
597     }
598 
599     /// Aborts the body in an abnormal fashion.
abort(self)600     pub fn abort(self) {
601         let _ = self
602             .data_tx
603             // clone so the send works even if buffer is full
604             .clone()
605             .try_send(Err(crate::Error::new_body_write_aborted()));
606     }
607 
608     #[cfg(feature = "http1")]
send_error(&mut self, err: crate::Error)609     pub(crate) fn send_error(&mut self, err: crate::Error) {
610         let _ = self.data_tx.try_send(Err(err));
611     }
612 }
613 
614 impl fmt::Debug for Sender {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result615     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
616         #[derive(Debug)]
617         struct Open;
618         #[derive(Debug)]
619         struct Closed;
620 
621         let mut builder = f.debug_tuple("Sender");
622         match self.want_rx.peek() {
623             watch::CLOSED => builder.field(&Closed),
624             _ => builder.field(&Open),
625         };
626 
627         builder.finish()
628     }
629 }
630 
631 #[cfg(test)]
632 mod tests {
633     use std::mem;
634     use std::task::Poll;
635 
636     use super::{Body, DecodedLength, HttpBody, Sender, SizeHint};
637 
638     #[test]
test_size_of()639     fn test_size_of() {
640         // These are mostly to help catch *accidentally* increasing
641         // the size by too much.
642 
643         let body_size = mem::size_of::<Body>();
644         let body_expected_size = mem::size_of::<u64>() * 6;
645         assert!(
646             body_size <= body_expected_size,
647             "Body size = {} <= {}",
648             body_size,
649             body_expected_size,
650         );
651 
652         assert_eq!(body_size, mem::size_of::<Option<Body>>(), "Option<Body>");
653 
654         assert_eq!(
655             mem::size_of::<Sender>(),
656             mem::size_of::<usize>() * 5,
657             "Sender"
658         );
659 
660         assert_eq!(
661             mem::size_of::<Sender>(),
662             mem::size_of::<Option<Sender>>(),
663             "Option<Sender>"
664         );
665     }
666 
667     #[test]
size_hint()668     fn size_hint() {
669         fn eq(body: Body, b: SizeHint, note: &str) {
670             let a = body.size_hint();
671             assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
672             assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
673         }
674 
675         eq(Body::from("Hello"), SizeHint::with_exact(5), "from str");
676 
677         eq(Body::empty(), SizeHint::with_exact(0), "empty");
678 
679         eq(Body::channel().1, SizeHint::new(), "channel");
680 
681         eq(
682             Body::new_channel(DecodedLength::new(4), /*wanter =*/ false).1,
683             SizeHint::with_exact(4),
684             "channel with length",
685         );
686     }
687 
688     #[tokio::test]
channel_abort()689     async fn channel_abort() {
690         let (tx, mut rx) = Body::channel();
691 
692         tx.abort();
693 
694         let err = rx.data().await.unwrap().unwrap_err();
695         assert!(err.is_body_write_aborted(), "{:?}", err);
696     }
697 
698     #[tokio::test]
channel_abort_when_buffer_is_full()699     async fn channel_abort_when_buffer_is_full() {
700         let (mut tx, mut rx) = Body::channel();
701 
702         tx.try_send_data("chunk 1".into()).expect("send 1");
703         // buffer is full, but can still send abort
704         tx.abort();
705 
706         let chunk1 = rx.data().await.expect("item 1").expect("chunk 1");
707         assert_eq!(chunk1, "chunk 1");
708 
709         let err = rx.data().await.unwrap().unwrap_err();
710         assert!(err.is_body_write_aborted(), "{:?}", err);
711     }
712 
713     #[test]
channel_buffers_one()714     fn channel_buffers_one() {
715         let (mut tx, _rx) = Body::channel();
716 
717         tx.try_send_data("chunk 1".into()).expect("send 1");
718 
719         // buffer is now full
720         let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
721         assert_eq!(chunk2, "chunk 2");
722     }
723 
724     #[tokio::test]
channel_empty()725     async fn channel_empty() {
726         let (_, mut rx) = Body::channel();
727 
728         assert!(rx.data().await.is_none());
729     }
730 
731     #[test]
channel_ready()732     fn channel_ready() {
733         let (mut tx, _rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ false);
734 
735         let mut tx_ready = tokio_test::task::spawn(tx.ready());
736 
737         assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
738     }
739 
740     #[test]
channel_wanter()741     fn channel_wanter() {
742         let (mut tx, mut rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
743 
744         let mut tx_ready = tokio_test::task::spawn(tx.ready());
745         let mut rx_data = tokio_test::task::spawn(rx.data());
746 
747         assert!(
748             tx_ready.poll().is_pending(),
749             "tx isn't ready before rx has been polled"
750         );
751 
752         assert!(rx_data.poll().is_pending(), "poll rx.data");
753         assert!(tx_ready.is_woken(), "rx poll wakes tx");
754 
755         assert!(
756             tx_ready.poll().is_ready(),
757             "tx is ready after rx has been polled"
758         );
759     }
760 
761     #[test]
channel_notices_closure()762     fn channel_notices_closure() {
763         let (mut tx, rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
764 
765         let mut tx_ready = tokio_test::task::spawn(tx.ready());
766 
767         assert!(
768             tx_ready.poll().is_pending(),
769             "tx isn't ready before rx has been polled"
770         );
771 
772         drop(rx);
773         assert!(tx_ready.is_woken(), "dropping rx wakes tx");
774 
775         match tx_ready.poll() {
776             Poll::Ready(Err(ref e)) if e.is_closed() => (),
777             unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
778         }
779     }
780 }
781