1 use std::borrow::Cow;
2 #[cfg(feature = "stream")]
3 use std::error::Error as StdError;
4 use std::fmt;
5 
6 use bytes::Bytes;
7 use futures_channel::mpsc;
8 use futures_channel::oneshot;
9 use futures_core::Stream; // for mpsc::Receiver
10 #[cfg(feature = "stream")]
11 use futures_util::TryStreamExt;
12 use http::HeaderMap;
13 use http_body::{Body as HttpBody, SizeHint};
14 
15 use super::DecodedLength;
16 #[cfg(feature = "stream")]
17 use crate::common::sync_wrapper::SyncWrapper;
18 use crate::common::Future;
19 #[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
20 use crate::common::Never;
21 use crate::common::{task, watch, Pin, Poll};
22 #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
23 use crate::proto::h2::ping;
24 
25 type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
26 type TrailersSender = oneshot::Sender<HeaderMap>;
27 
28 /// A stream of `Bytes`, used when receiving bodies.
29 ///
30 /// A good default [`HttpBody`](crate::body::HttpBody) to use in many
31 /// applications.
32 ///
33 /// Note: To read the full body, use [`body::to_bytes`](crate::body::to_bytes)
34 /// or [`body::aggregate`](crate::body::aggregate).
35 #[must_use = "streams do nothing unless polled"]
36 pub struct Body {
37     kind: Kind,
38     /// Keep the extra bits in an `Option<Box<Extra>>`, so that
39     /// Body stays small in the common case (no extras needed).
40     extra: Option<Box<Extra>>,
41 }
42 
43 enum Kind {
44     Once(Option<Bytes>),
45     Chan {
46         content_length: DecodedLength,
47         want_tx: watch::Sender,
48         data_rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
49         trailers_rx: oneshot::Receiver<HeaderMap>,
50     },
51     #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
52     H2 {
53         ping: ping::Recorder,
54         content_length: DecodedLength,
55         recv: h2::RecvStream,
56     },
57     #[cfg(feature = "ffi")]
58     Ffi(crate::ffi::UserBody),
59     #[cfg(feature = "stream")]
60     Wrapped(
61         SyncWrapper<
62             Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>,
63         >,
64     ),
65 }
66 
67 struct Extra {
68     /// Allow the client to pass a future to delay the `Body` from returning
69     /// EOF. This allows the `Client` to try to put the idle connection
70     /// back into the pool before the body is "finished".
71     ///
72     /// The reason for this is so that creating a new request after finishing
73     /// streaming the body of a response could sometimes result in creating
74     /// a brand new connection, since the pool didn't know about the idle
75     /// connection yet.
76     delayed_eof: Option<DelayEof>,
77 }
78 
79 #[cfg(all(feature = "client", any(feature = "http1", feature = "http2")))]
80 type DelayEofUntil = oneshot::Receiver<Never>;
81 
82 enum DelayEof {
83     /// Initial state, stream hasn't seen EOF yet.
84     #[cfg(any(feature = "http1", feature = "http2"))]
85     #[cfg(feature = "client")]
86     NotEof(DelayEofUntil),
87     /// Transitions to this state once we've seen `poll` try to
88     /// return EOF (`None`). This future is then polled, and
89     /// when it completes, the Body finally returns EOF (`None`).
90     #[cfg(any(feature = "http1", feature = "http2"))]
91     #[cfg(feature = "client")]
92     Eof(DelayEofUntil),
93 }
94 
95 /// A sender half created through [`Body::channel()`].
96 ///
97 /// Useful when wanting to stream chunks from another thread.
98 ///
99 /// ## Body Closing
100 ///
101 /// Note that the request body will always be closed normally when the sender is dropped (meaning
102 /// that the empty terminating chunk will be sent to the remote). If you desire to close the
103 /// connection with an incomplete response (e.g. in the case of an error during asynchronous
104 /// processing), call the [`Sender::abort()`] method to abort the body in an abnormal fashion.
105 ///
106 /// [`Body::channel()`]: struct.Body.html#method.channel
107 /// [`Sender::abort()`]: struct.Sender.html#method.abort
108 #[must_use = "Sender does nothing unless sent on"]
109 pub struct Sender {
110     want_rx: watch::Receiver,
111     data_tx: BodySender,
112     trailers_tx: Option<TrailersSender>,
113 }
114 
115 const WANT_PENDING: usize = 1;
116 const WANT_READY: usize = 2;
117 
118 impl Body {
119     /// Create an empty `Body` stream.
120     ///
121     /// # Example
122     ///
123     /// ```
124     /// use hyper::{Body, Request};
125     ///
126     /// // create a `GET /` request
127     /// let get = Request::new(Body::empty());
128     /// ```
129     #[inline]
empty() -> Body130     pub fn empty() -> Body {
131         Body::new(Kind::Once(None))
132     }
133 
134     /// Create a `Body` stream with an associated sender half.
135     ///
136     /// Useful when wanting to stream chunks from another thread.
137     #[inline]
channel() -> (Sender, Body)138     pub fn channel() -> (Sender, Body) {
139         Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
140     }
141 
new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body)142     pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) {
143         let (data_tx, data_rx) = mpsc::channel(0);
144         let (trailers_tx, trailers_rx) = oneshot::channel();
145 
146         // If wanter is true, `Sender::poll_ready()` won't becoming ready
147         // until the `Body` has been polled for data once.
148         let want = if wanter { WANT_PENDING } else { WANT_READY };
149 
150         let (want_tx, want_rx) = watch::channel(want);
151 
152         let tx = Sender {
153             want_rx,
154             data_tx,
155             trailers_tx: Some(trailers_tx),
156         };
157         let rx = Body::new(Kind::Chan {
158             content_length,
159             want_tx,
160             data_rx,
161             trailers_rx,
162         });
163 
164         (tx, rx)
165     }
166 
167     /// Wrap a futures `Stream` in a box inside `Body`.
168     ///
169     /// # Example
170     ///
171     /// ```
172     /// # use hyper::Body;
173     /// let chunks: Vec<Result<_, std::io::Error>> = vec![
174     ///     Ok("hello"),
175     ///     Ok(" "),
176     ///     Ok("world"),
177     /// ];
178     ///
179     /// let stream = futures_util::stream::iter(chunks);
180     ///
181     /// let body = Body::wrap_stream(stream);
182     /// ```
183     ///
184     /// # Optional
185     ///
186     /// This function requires enabling the `stream` feature in your
187     /// `Cargo.toml`.
188     #[cfg(feature = "stream")]
189     #[cfg_attr(docsrs, doc(cfg(feature = "stream")))]
wrap_stream<S, O, E>(stream: S) -> Body where S: Stream<Item = Result<O, E>> + Send + 'static, O: Into<Bytes> + 'static, E: Into<Box<dyn StdError + Send + Sync>> + 'static,190     pub fn wrap_stream<S, O, E>(stream: S) -> Body
191     where
192         S: Stream<Item = Result<O, E>> + Send + 'static,
193         O: Into<Bytes> + 'static,
194         E: Into<Box<dyn StdError + Send + Sync>> + 'static,
195     {
196         let mapped = stream.map_ok(Into::into).map_err(Into::into);
197         Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
198     }
199 
new(kind: Kind) -> Body200     fn new(kind: Kind) -> Body {
201         Body { kind, extra: None }
202     }
203 
204     #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
h2( recv: h2::RecvStream, mut content_length: DecodedLength, ping: ping::Recorder, ) -> Self205     pub(crate) fn h2(
206         recv: h2::RecvStream,
207         mut content_length: DecodedLength,
208         ping: ping::Recorder,
209     ) -> Self {
210         // If the stream is already EOS, then the "unknown length" is clearly
211         // actually ZERO.
212         if !content_length.is_exact() && recv.is_end_stream() {
213             content_length = DecodedLength::ZERO;
214         }
215         let body = Body::new(Kind::H2 {
216             ping,
217             content_length,
218             recv,
219         });
220 
221         body
222     }
223 
224     #[cfg(any(feature = "http1", feature = "http2"))]
225     #[cfg(feature = "client")]
delayed_eof(&mut self, fut: DelayEofUntil)226     pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
227         self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut));
228     }
229 
take_delayed_eof(&mut self) -> Option<DelayEof>230     fn take_delayed_eof(&mut self) -> Option<DelayEof> {
231         self.extra
232             .as_mut()
233             .and_then(|extra| extra.delayed_eof.take())
234     }
235 
236     #[cfg(any(feature = "http1", feature = "http2"))]
extra_mut(&mut self) -> &mut Extra237     fn extra_mut(&mut self) -> &mut Extra {
238         self.extra
239             .get_or_insert_with(|| Box::new(Extra { delayed_eof: None }))
240     }
241 
poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>>242     fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
243         match self.take_delayed_eof() {
244             #[cfg(any(feature = "http1", feature = "http2"))]
245             #[cfg(feature = "client")]
246             Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) {
247                 ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => {
248                     self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay));
249                     ok
250                 }
251                 Poll::Ready(None) => match Pin::new(&mut delay).poll(cx) {
252                     Poll::Ready(Ok(never)) => match never {},
253                     Poll::Pending => {
254                         self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
255                         Poll::Pending
256                     }
257                     Poll::Ready(Err(_done)) => Poll::Ready(None),
258                 },
259                 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
260             },
261             #[cfg(any(feature = "http1", feature = "http2"))]
262             #[cfg(feature = "client")]
263             Some(DelayEof::Eof(mut delay)) => match Pin::new(&mut delay).poll(cx) {
264                 Poll::Ready(Ok(never)) => match never {},
265                 Poll::Pending => {
266                     self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
267                     Poll::Pending
268                 }
269                 Poll::Ready(Err(_done)) => Poll::Ready(None),
270             },
271             #[cfg(any(
272                 not(any(feature = "http1", feature = "http2")),
273                 not(feature = "client")
274             ))]
275             Some(delay_eof) => match delay_eof {},
276             None => self.poll_inner(cx),
277         }
278     }
279 
280     #[cfg(feature = "ffi")]
as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody281     pub(crate) fn as_ffi_mut(&mut self) -> &mut crate::ffi::UserBody {
282         match self.kind {
283             Kind::Ffi(ref mut body) => return body,
284             _ => {
285                 self.kind = Kind::Ffi(crate::ffi::UserBody::new());
286             }
287         }
288 
289         match self.kind {
290             Kind::Ffi(ref mut body) => body,
291             _ => unreachable!(),
292         }
293     }
294 
poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>>295     fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
296         match self.kind {
297             Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)),
298             Kind::Chan {
299                 content_length: ref mut len,
300                 ref mut data_rx,
301                 ref mut want_tx,
302                 ..
303             } => {
304                 want_tx.send(WANT_READY);
305 
306                 match ready!(Pin::new(data_rx).poll_next(cx)?) {
307                     Some(chunk) => {
308                         len.sub_if(chunk.len() as u64);
309                         Poll::Ready(Some(Ok(chunk)))
310                     }
311                     None => Poll::Ready(None),
312                 }
313             }
314             #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
315             Kind::H2 {
316                 ref ping,
317                 recv: ref mut h2,
318                 content_length: ref mut len,
319             } => match ready!(h2.poll_data(cx)) {
320                 Some(Ok(bytes)) => {
321                     let _ = h2.flow_control().release_capacity(bytes.len());
322                     len.sub_if(bytes.len() as u64);
323                     ping.record_data(bytes.len());
324                     Poll::Ready(Some(Ok(bytes)))
325                 }
326                 Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
327                 None => Poll::Ready(None),
328             },
329 
330             #[cfg(feature = "ffi")]
331             Kind::Ffi(ref mut body) => body.poll_data(cx),
332 
333             #[cfg(feature = "stream")]
334             Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
335                 Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
336                 None => Poll::Ready(None),
337             },
338         }
339     }
340 
341     #[cfg(feature = "http1")]
take_full_data(&mut self) -> Option<Bytes>342     pub(super) fn take_full_data(&mut self) -> Option<Bytes> {
343         if let Kind::Once(ref mut chunk) = self.kind {
344             chunk.take()
345         } else {
346             None
347         }
348     }
349 }
350 
351 impl Default for Body {
352     /// Returns `Body::empty()`.
353     #[inline]
default() -> Body354     fn default() -> Body {
355         Body::empty()
356     }
357 }
358 
359 impl HttpBody for Body {
360     type Data = Bytes;
361     type Error = crate::Error;
362 
poll_data( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<Self::Data, Self::Error>>>363     fn poll_data(
364         mut self: Pin<&mut Self>,
365         cx: &mut task::Context<'_>,
366     ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
367         self.poll_eof(cx)
368     }
369 
poll_trailers( #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>, #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut task::Context<'_>, ) -> Poll<Result<Option<HeaderMap>, Self::Error>>370     fn poll_trailers(
371         #[cfg_attr(not(feature = "http2"), allow(unused_mut))] mut self: Pin<&mut Self>,
372         #[cfg_attr(not(feature = "http2"), allow(unused))] cx: &mut task::Context<'_>,
373     ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
374         match self.kind {
375             #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
376             Kind::H2 {
377                 recv: ref mut h2,
378                 ref ping,
379                 ..
380             } => match ready!(h2.poll_trailers(cx)) {
381                 Ok(t) => {
382                     ping.record_non_data();
383                     Poll::Ready(Ok(t))
384                 }
385                 Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))),
386             },
387             Kind::Chan {
388                 ref mut trailers_rx,
389                 ..
390             } => match ready!(Pin::new(trailers_rx).poll(cx)) {
391                 Ok(t) => Poll::Ready(Ok(Some(t))),
392                 Err(_) => Poll::Ready(Ok(None)),
393             },
394             #[cfg(feature = "ffi")]
395             Kind::Ffi(ref mut body) => body.poll_trailers(cx),
396             _ => Poll::Ready(Ok(None)),
397         }
398     }
399 
is_end_stream(&self) -> bool400     fn is_end_stream(&self) -> bool {
401         match self.kind {
402             Kind::Once(ref val) => val.is_none(),
403             Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
404             #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
405             Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
406             #[cfg(feature = "ffi")]
407             Kind::Ffi(..) => false,
408             #[cfg(feature = "stream")]
409             Kind::Wrapped(..) => false,
410         }
411     }
412 
size_hint(&self) -> SizeHint413     fn size_hint(&self) -> SizeHint {
414         macro_rules! opt_len {
415             ($content_length:expr) => {{
416                 let mut hint = SizeHint::default();
417 
418                 if let Some(content_length) = $content_length.into_opt() {
419                     hint.set_exact(content_length);
420                 }
421 
422                 hint
423             }};
424         }
425 
426         match self.kind {
427             Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64),
428             Kind::Once(None) => SizeHint::with_exact(0),
429             #[cfg(feature = "stream")]
430             Kind::Wrapped(..) => SizeHint::default(),
431             Kind::Chan { content_length, .. } => opt_len!(content_length),
432             #[cfg(all(feature = "http2", any(feature = "client", feature = "server")))]
433             Kind::H2 { content_length, .. } => opt_len!(content_length),
434             #[cfg(feature = "ffi")]
435             Kind::Ffi(..) => SizeHint::default(),
436         }
437     }
438 }
439 
440 impl fmt::Debug for Body {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result441     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
442         #[derive(Debug)]
443         struct Streaming;
444         #[derive(Debug)]
445         struct Empty;
446         #[derive(Debug)]
447         struct Full<'a>(&'a Bytes);
448 
449         let mut builder = f.debug_tuple("Body");
450         match self.kind {
451             Kind::Once(None) => builder.field(&Empty),
452             Kind::Once(Some(ref chunk)) => builder.field(&Full(chunk)),
453             _ => builder.field(&Streaming),
454         };
455 
456         builder.finish()
457     }
458 }
459 
460 /// # Optional
461 ///
462 /// This function requires enabling the `stream` feature in your
463 /// `Cargo.toml`.
464 #[cfg(feature = "stream")]
465 impl Stream for Body {
466     type Item = crate::Result<Bytes>;
467 
poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>>468     fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
469         HttpBody::poll_data(self, cx)
470     }
471 }
472 
473 /// # Optional
474 ///
475 /// This function requires enabling the `stream` feature in your
476 /// `Cargo.toml`.
477 #[cfg(feature = "stream")]
478 impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body {
479     #[inline]
from( stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>, ) -> Body480     fn from(
481         stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>,
482     ) -> Body {
483         Body::new(Kind::Wrapped(SyncWrapper::new(stream.into())))
484     }
485 }
486 
487 impl From<Bytes> for Body {
488     #[inline]
from(chunk: Bytes) -> Body489     fn from(chunk: Bytes) -> Body {
490         if chunk.is_empty() {
491             Body::empty()
492         } else {
493             Body::new(Kind::Once(Some(chunk)))
494         }
495     }
496 }
497 
498 impl From<Vec<u8>> for Body {
499     #[inline]
from(vec: Vec<u8>) -> Body500     fn from(vec: Vec<u8>) -> Body {
501         Body::from(Bytes::from(vec))
502     }
503 }
504 
505 impl From<&'static [u8]> for Body {
506     #[inline]
from(slice: &'static [u8]) -> Body507     fn from(slice: &'static [u8]) -> Body {
508         Body::from(Bytes::from(slice))
509     }
510 }
511 
512 impl From<Cow<'static, [u8]>> for Body {
513     #[inline]
from(cow: Cow<'static, [u8]>) -> Body514     fn from(cow: Cow<'static, [u8]>) -> Body {
515         match cow {
516             Cow::Borrowed(b) => Body::from(b),
517             Cow::Owned(o) => Body::from(o),
518         }
519     }
520 }
521 
522 impl From<String> for Body {
523     #[inline]
from(s: String) -> Body524     fn from(s: String) -> Body {
525         Body::from(Bytes::from(s.into_bytes()))
526     }
527 }
528 
529 impl From<&'static str> for Body {
530     #[inline]
from(slice: &'static str) -> Body531     fn from(slice: &'static str) -> Body {
532         Body::from(Bytes::from(slice.as_bytes()))
533     }
534 }
535 
536 impl From<Cow<'static, str>> for Body {
537     #[inline]
from(cow: Cow<'static, str>) -> Body538     fn from(cow: Cow<'static, str>) -> Body {
539         match cow {
540             Cow::Borrowed(b) => Body::from(b),
541             Cow::Owned(o) => Body::from(o),
542         }
543     }
544 }
545 
546 impl Sender {
547     /// Check to see if this `Sender` can send more data.
poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>548     pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
549         // Check if the receiver end has tried polling for the body yet
550         ready!(self.poll_want(cx)?);
551         self.data_tx
552             .poll_ready(cx)
553             .map_err(|_| crate::Error::new_closed())
554     }
555 
poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>556     fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
557         match self.want_rx.load(cx) {
558             WANT_READY => Poll::Ready(Ok(())),
559             WANT_PENDING => Poll::Pending,
560             watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
561             unexpected => unreachable!("want_rx value: {}", unexpected),
562         }
563     }
564 
ready(&mut self) -> crate::Result<()>565     async fn ready(&mut self) -> crate::Result<()> {
566         futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
567     }
568 
569     /// Send data on data channel when it is ready.
send_data(&mut self, chunk: Bytes) -> crate::Result<()>570     pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
571         self.ready().await?;
572         self.data_tx
573             .try_send(Ok(chunk))
574             .map_err(|_| crate::Error::new_closed())
575     }
576 
577     /// Send trailers on trailers channel.
send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()>578     pub async fn send_trailers(&mut self, trailers: HeaderMap) -> crate::Result<()> {
579         let tx = match self.trailers_tx.take() {
580             Some(tx) => tx,
581             None => return Err(crate::Error::new_closed()),
582         };
583         tx.send(trailers).map_err(|_| crate::Error::new_closed())
584     }
585 
586     /// Try to send data on this channel.
587     ///
588     /// # Errors
589     ///
590     /// Returns `Err(Bytes)` if the channel could not (currently) accept
591     /// another `Bytes`.
592     ///
593     /// # Note
594     ///
595     /// This is mostly useful for when trying to send from some other thread
596     /// that doesn't have an async context. If in an async context, prefer
597     /// `send_data()` instead.
try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes>598     pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
599         self.data_tx
600             .try_send(Ok(chunk))
601             .map_err(|err| err.into_inner().expect("just sent Ok"))
602     }
603 
604     /// Aborts the body in an abnormal fashion.
abort(self)605     pub fn abort(self) {
606         let _ = self
607             .data_tx
608             // clone so the send works even if buffer is full
609             .clone()
610             .try_send(Err(crate::Error::new_body_write_aborted()));
611     }
612 
613     #[cfg(feature = "http1")]
send_error(&mut self, err: crate::Error)614     pub(crate) fn send_error(&mut self, err: crate::Error) {
615         let _ = self.data_tx.try_send(Err(err));
616     }
617 }
618 
619 impl fmt::Debug for Sender {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result620     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
621         #[derive(Debug)]
622         struct Open;
623         #[derive(Debug)]
624         struct Closed;
625 
626         let mut builder = f.debug_tuple("Sender");
627         match self.want_rx.peek() {
628             watch::CLOSED => builder.field(&Closed),
629             _ => builder.field(&Open),
630         };
631 
632         builder.finish()
633     }
634 }
635 
636 #[cfg(test)]
637 mod tests {
638     use std::mem;
639     use std::task::Poll;
640 
641     use super::{Body, DecodedLength, HttpBody, Sender, SizeHint};
642 
643     #[test]
test_size_of()644     fn test_size_of() {
645         // These are mostly to help catch *accidentally* increasing
646         // the size by too much.
647 
648         let body_size = mem::size_of::<Body>();
649         let body_expected_size = mem::size_of::<u64>() * 6;
650         assert!(
651             body_size <= body_expected_size,
652             "Body size = {} <= {}",
653             body_size,
654             body_expected_size,
655         );
656 
657         assert_eq!(body_size, mem::size_of::<Option<Body>>(), "Option<Body>");
658 
659         assert_eq!(
660             mem::size_of::<Sender>(),
661             mem::size_of::<usize>() * 5,
662             "Sender"
663         );
664 
665         assert_eq!(
666             mem::size_of::<Sender>(),
667             mem::size_of::<Option<Sender>>(),
668             "Option<Sender>"
669         );
670     }
671 
672     #[test]
size_hint()673     fn size_hint() {
674         fn eq(body: Body, b: SizeHint, note: &str) {
675             let a = body.size_hint();
676             assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
677             assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
678         }
679 
680         eq(Body::from("Hello"), SizeHint::with_exact(5), "from str");
681 
682         eq(Body::empty(), SizeHint::with_exact(0), "empty");
683 
684         eq(Body::channel().1, SizeHint::new(), "channel");
685 
686         eq(
687             Body::new_channel(DecodedLength::new(4), /*wanter =*/ false).1,
688             SizeHint::with_exact(4),
689             "channel with length",
690         );
691     }
692 
693     #[tokio::test]
channel_abort()694     async fn channel_abort() {
695         let (tx, mut rx) = Body::channel();
696 
697         tx.abort();
698 
699         let err = rx.data().await.unwrap().unwrap_err();
700         assert!(err.is_body_write_aborted(), "{:?}", err);
701     }
702 
703     #[tokio::test]
channel_abort_when_buffer_is_full()704     async fn channel_abort_when_buffer_is_full() {
705         let (mut tx, mut rx) = Body::channel();
706 
707         tx.try_send_data("chunk 1".into()).expect("send 1");
708         // buffer is full, but can still send abort
709         tx.abort();
710 
711         let chunk1 = rx.data().await.expect("item 1").expect("chunk 1");
712         assert_eq!(chunk1, "chunk 1");
713 
714         let err = rx.data().await.unwrap().unwrap_err();
715         assert!(err.is_body_write_aborted(), "{:?}", err);
716     }
717 
718     #[test]
channel_buffers_one()719     fn channel_buffers_one() {
720         let (mut tx, _rx) = Body::channel();
721 
722         tx.try_send_data("chunk 1".into()).expect("send 1");
723 
724         // buffer is now full
725         let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
726         assert_eq!(chunk2, "chunk 2");
727     }
728 
729     #[tokio::test]
channel_empty()730     async fn channel_empty() {
731         let (_, mut rx) = Body::channel();
732 
733         assert!(rx.data().await.is_none());
734     }
735 
736     #[test]
channel_ready()737     fn channel_ready() {
738         let (mut tx, _rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ false);
739 
740         let mut tx_ready = tokio_test::task::spawn(tx.ready());
741 
742         assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
743     }
744 
745     #[test]
channel_wanter()746     fn channel_wanter() {
747         let (mut tx, mut rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
748 
749         let mut tx_ready = tokio_test::task::spawn(tx.ready());
750         let mut rx_data = tokio_test::task::spawn(rx.data());
751 
752         assert!(
753             tx_ready.poll().is_pending(),
754             "tx isn't ready before rx has been polled"
755         );
756 
757         assert!(rx_data.poll().is_pending(), "poll rx.data");
758         assert!(tx_ready.is_woken(), "rx poll wakes tx");
759 
760         assert!(
761             tx_ready.poll().is_ready(),
762             "tx is ready after rx has been polled"
763         );
764     }
765 
766     #[test]
channel_notices_closure()767     fn channel_notices_closure() {
768         let (mut tx, rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
769 
770         let mut tx_ready = tokio_test::task::spawn(tx.ready());
771 
772         assert!(
773             tx_ready.poll().is_pending(),
774             "tx isn't ready before rx has been polled"
775         );
776 
777         drop(rx);
778         assert!(tx_ready.is_woken(), "dropping rx wakes tx");
779 
780         match tx_ready.poll() {
781             Poll::Ready(Err(ref e)) if e.is_closed() => (),
782             unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
783         }
784     }
785 }
786