1 use std::borrow::Cow;
2 #[cfg(feature = "stream")]
3 use std::error::Error as StdError;
4 use std::fmt;
5 
6 use bytes::Bytes;
7 use futures_channel::{mpsc, oneshot};
8 use futures_core::Stream; // for mpsc::Receiver
9 #[cfg(feature = "stream")]
10 use futures_util::TryStreamExt;
11 use http::HeaderMap;
12 use http_body::{Body as HttpBody, SizeHint};
13 
14 use crate::common::sync_wrapper::SyncWrapper;
15 use crate::common::{task, watch, Future, Never, Pin, Poll};
16 use crate::proto::h2::ping;
17 use crate::proto::DecodedLength;
18 use crate::upgrade::OnUpgrade;
19 
20 type BodySender = mpsc::Sender<Result<Bytes, crate::Error>>;
21 
22 /// A stream of `Bytes`, used when receiving bodies.
23 ///
24 /// A good default [`HttpBody`](crate::body::HttpBody) to use in many
25 /// applications.
26 #[must_use = "streams do nothing unless polled"]
27 pub struct Body {
28     kind: Kind,
29     /// Keep the extra bits in an `Option<Box<Extra>>`, so that
30     /// Body stays small in the common case (no extras needed).
31     extra: Option<Box<Extra>>,
32 }
33 
34 enum Kind {
35     Once(Option<Bytes>),
36     Chan {
37         content_length: DecodedLength,
38         want_tx: watch::Sender,
39         rx: mpsc::Receiver<Result<Bytes, crate::Error>>,
40     },
41     H2 {
42         ping: ping::Recorder,
43         content_length: DecodedLength,
44         recv: h2::RecvStream,
45     },
46     #[cfg(feature = "stream")]
47     Wrapped(
48         SyncWrapper<
49             Pin<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>>,
50         >,
51     ),
52 }
53 
54 struct Extra {
55     /// Allow the client to pass a future to delay the `Body` from returning
56     /// EOF. This allows the `Client` to try to put the idle connection
57     /// back into the pool before the body is "finished".
58     ///
59     /// The reason for this is so that creating a new request after finishing
60     /// streaming the body of a response could sometimes result in creating
61     /// a brand new connection, since the pool didn't know about the idle
62     /// connection yet.
63     delayed_eof: Option<DelayEof>,
64     on_upgrade: OnUpgrade,
65 }
66 
67 type DelayEofUntil = oneshot::Receiver<Never>;
68 
69 enum DelayEof {
70     /// Initial state, stream hasn't seen EOF yet.
71     NotEof(DelayEofUntil),
72     /// Transitions to this state once we've seen `poll` try to
73     /// return EOF (`None`). This future is then polled, and
74     /// when it completes, the Body finally returns EOF (`None`).
75     Eof(DelayEofUntil),
76 }
77 
78 /// A sender half used with `Body::channel()`.
79 ///
80 /// Useful when wanting to stream chunks from another thread. See
81 /// [`Body::channel`](Body::channel) for more.
82 #[must_use = "Sender does nothing unless sent on"]
83 pub struct Sender {
84     want_rx: watch::Receiver,
85     tx: BodySender,
86 }
87 
88 const WANT_PENDING: usize = 1;
89 const WANT_READY: usize = 2;
90 
91 impl Body {
92     /// Create an empty `Body` stream.
93     ///
94     /// # Example
95     ///
96     /// ```
97     /// use hyper::{Body, Request};
98     ///
99     /// // create a `GET /` request
100     /// let get = Request::new(Body::empty());
101     /// ```
102     #[inline]
empty() -> Body103     pub fn empty() -> Body {
104         Body::new(Kind::Once(None))
105     }
106 
107     /// Create a `Body` stream with an associated sender half.
108     ///
109     /// Useful when wanting to stream chunks from another thread.
110     #[inline]
channel() -> (Sender, Body)111     pub fn channel() -> (Sender, Body) {
112         Self::new_channel(DecodedLength::CHUNKED, /*wanter =*/ false)
113     }
114 
new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body)115     pub(crate) fn new_channel(content_length: DecodedLength, wanter: bool) -> (Sender, Body) {
116         let (tx, rx) = mpsc::channel(0);
117 
118         // If wanter is true, `Sender::poll_ready()` won't becoming ready
119         // until the `Body` has been polled for data once.
120         let want = if wanter { WANT_PENDING } else { WANT_READY };
121 
122         let (want_tx, want_rx) = watch::channel(want);
123 
124         let tx = Sender { want_rx, tx };
125         let rx = Body::new(Kind::Chan {
126             content_length,
127             want_tx,
128             rx,
129         });
130 
131         (tx, rx)
132     }
133 
134     /// Wrap a futures `Stream` in a box inside `Body`.
135     ///
136     /// # Example
137     ///
138     /// ```
139     /// # use hyper::Body;
140     /// let chunks: Vec<Result<_, std::io::Error>> = vec![
141     ///     Ok("hello"),
142     ///     Ok(" "),
143     ///     Ok("world"),
144     /// ];
145     ///
146     /// let stream = futures_util::stream::iter(chunks);
147     ///
148     /// let body = Body::wrap_stream(stream);
149     /// ```
150     ///
151     /// # Optional
152     ///
153     /// This function requires enabling the `stream` feature in your
154     /// `Cargo.toml`.
155     #[cfg(feature = "stream")]
wrap_stream<S, O, E>(stream: S) -> Body where S: Stream<Item = Result<O, E>> + Send + 'static, O: Into<Bytes> + 'static, E: Into<Box<dyn StdError + Send + Sync>> + 'static,156     pub fn wrap_stream<S, O, E>(stream: S) -> Body
157     where
158         S: Stream<Item = Result<O, E>> + Send + 'static,
159         O: Into<Bytes> + 'static,
160         E: Into<Box<dyn StdError + Send + Sync>> + 'static,
161     {
162         let mapped = stream.map_ok(Into::into).map_err(Into::into);
163         Body::new(Kind::Wrapped(SyncWrapper::new(Box::pin(mapped))))
164     }
165 
166     /// Converts this `Body` into a `Future` of a pending HTTP upgrade.
167     ///
168     /// See [the `upgrade` module](crate::upgrade) for more.
on_upgrade(self) -> OnUpgrade169     pub fn on_upgrade(self) -> OnUpgrade {
170         self.extra
171             .map(|ex| ex.on_upgrade)
172             .unwrap_or_else(OnUpgrade::none)
173     }
174 
new(kind: Kind) -> Body175     fn new(kind: Kind) -> Body {
176         Body { kind, extra: None }
177     }
178 
h2( recv: h2::RecvStream, content_length: DecodedLength, ping: ping::Recorder, ) -> Self179     pub(crate) fn h2(
180         recv: h2::RecvStream,
181         content_length: DecodedLength,
182         ping: ping::Recorder,
183     ) -> Self {
184         let body = Body::new(Kind::H2 {
185             ping,
186             content_length,
187             recv,
188         });
189 
190         body
191     }
192 
set_on_upgrade(&mut self, upgrade: OnUpgrade)193     pub(crate) fn set_on_upgrade(&mut self, upgrade: OnUpgrade) {
194         debug_assert!(!upgrade.is_none(), "set_on_upgrade with empty upgrade");
195         let extra = self.extra_mut();
196         debug_assert!(extra.on_upgrade.is_none(), "set_on_upgrade twice");
197         extra.on_upgrade = upgrade;
198     }
199 
delayed_eof(&mut self, fut: DelayEofUntil)200     pub(crate) fn delayed_eof(&mut self, fut: DelayEofUntil) {
201         self.extra_mut().delayed_eof = Some(DelayEof::NotEof(fut));
202     }
203 
take_delayed_eof(&mut self) -> Option<DelayEof>204     fn take_delayed_eof(&mut self) -> Option<DelayEof> {
205         self.extra
206             .as_mut()
207             .and_then(|extra| extra.delayed_eof.take())
208     }
209 
extra_mut(&mut self) -> &mut Extra210     fn extra_mut(&mut self) -> &mut Extra {
211         self.extra.get_or_insert_with(|| {
212             Box::new(Extra {
213                 delayed_eof: None,
214                 on_upgrade: OnUpgrade::none(),
215             })
216         })
217     }
218 
poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>>219     fn poll_eof(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
220         match self.take_delayed_eof() {
221             Some(DelayEof::NotEof(mut delay)) => match self.poll_inner(cx) {
222                 ok @ Poll::Ready(Some(Ok(..))) | ok @ Poll::Pending => {
223                     self.extra_mut().delayed_eof = Some(DelayEof::NotEof(delay));
224                     ok
225                 }
226                 Poll::Ready(None) => match Pin::new(&mut delay).poll(cx) {
227                     Poll::Ready(Ok(never)) => match never {},
228                     Poll::Pending => {
229                         self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
230                         Poll::Pending
231                     }
232                     Poll::Ready(Err(_done)) => Poll::Ready(None),
233                 },
234                 Poll::Ready(Some(Err(e))) => Poll::Ready(Some(Err(e))),
235             },
236             Some(DelayEof::Eof(mut delay)) => match Pin::new(&mut delay).poll(cx) {
237                 Poll::Ready(Ok(never)) => match never {},
238                 Poll::Pending => {
239                     self.extra_mut().delayed_eof = Some(DelayEof::Eof(delay));
240                     Poll::Pending
241                 }
242                 Poll::Ready(Err(_done)) => Poll::Ready(None),
243             },
244             None => self.poll_inner(cx),
245         }
246     }
247 
poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>>248     fn poll_inner(&mut self, cx: &mut task::Context<'_>) -> Poll<Option<crate::Result<Bytes>>> {
249         match self.kind {
250             Kind::Once(ref mut val) => Poll::Ready(val.take().map(Ok)),
251             Kind::Chan {
252                 content_length: ref mut len,
253                 ref mut rx,
254                 ref mut want_tx,
255             } => {
256                 want_tx.send(WANT_READY);
257 
258                 match ready!(Pin::new(rx).poll_next(cx)?) {
259                     Some(chunk) => {
260                         len.sub_if(chunk.len() as u64);
261                         Poll::Ready(Some(Ok(chunk)))
262                     }
263                     None => Poll::Ready(None),
264                 }
265             }
266             Kind::H2 {
267                 ref ping,
268                 recv: ref mut h2,
269                 content_length: ref mut len,
270             } => match ready!(h2.poll_data(cx)) {
271                 Some(Ok(bytes)) => {
272                     let _ = h2.flow_control().release_capacity(bytes.len());
273                     len.sub_if(bytes.len() as u64);
274                     ping.record_data(bytes.len());
275                     Poll::Ready(Some(Ok(bytes)))
276                 }
277                 Some(Err(e)) => Poll::Ready(Some(Err(crate::Error::new_body(e)))),
278                 None => Poll::Ready(None),
279             },
280 
281             #[cfg(feature = "stream")]
282             Kind::Wrapped(ref mut s) => match ready!(s.get_mut().as_mut().poll_next(cx)) {
283                 Some(res) => Poll::Ready(Some(res.map_err(crate::Error::new_body))),
284                 None => Poll::Ready(None),
285             },
286         }
287     }
288 
take_full_data(&mut self) -> Option<Bytes>289     pub(super) fn take_full_data(&mut self) -> Option<Bytes> {
290         if let Kind::Once(ref mut chunk) = self.kind {
291             chunk.take()
292         } else {
293             None
294         }
295     }
296 }
297 
298 impl Default for Body {
299     /// Returns `Body::empty()`.
300     #[inline]
default() -> Body301     fn default() -> Body {
302         Body::empty()
303     }
304 }
305 
306 impl HttpBody for Body {
307     type Data = Bytes;
308     type Error = crate::Error;
309 
poll_data( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll<Option<Result<Self::Data, Self::Error>>>310     fn poll_data(
311         mut self: Pin<&mut Self>,
312         cx: &mut task::Context<'_>,
313     ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
314         self.poll_eof(cx)
315     }
316 
poll_trailers( mut self: Pin<&mut Self>, cx: &mut task::Context<'_>, ) -> Poll<Result<Option<HeaderMap>, Self::Error>>317     fn poll_trailers(
318         mut self: Pin<&mut Self>,
319         cx: &mut task::Context<'_>,
320     ) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
321         match self.kind {
322             Kind::H2 {
323                 recv: ref mut h2,
324                 ref ping,
325                 ..
326             } => match ready!(h2.poll_trailers(cx)) {
327                 Ok(t) => {
328                     ping.record_non_data();
329                     Poll::Ready(Ok(t))
330                 }
331                 Err(e) => Poll::Ready(Err(crate::Error::new_h2(e))),
332             },
333             _ => Poll::Ready(Ok(None)),
334         }
335     }
336 
is_end_stream(&self) -> bool337     fn is_end_stream(&self) -> bool {
338         match self.kind {
339             Kind::Once(ref val) => val.is_none(),
340             Kind::Chan { content_length, .. } => content_length == DecodedLength::ZERO,
341             Kind::H2 { recv: ref h2, .. } => h2.is_end_stream(),
342             #[cfg(feature = "stream")]
343             Kind::Wrapped(..) => false,
344         }
345     }
346 
size_hint(&self) -> SizeHint347     fn size_hint(&self) -> SizeHint {
348         match self.kind {
349             Kind::Once(Some(ref val)) => SizeHint::with_exact(val.len() as u64),
350             Kind::Once(None) => SizeHint::with_exact(0),
351             #[cfg(feature = "stream")]
352             Kind::Wrapped(..) => SizeHint::default(),
353             Kind::Chan { content_length, .. } | Kind::H2 { content_length, .. } => {
354                 let mut hint = SizeHint::default();
355 
356                 if let Some(content_length) = content_length.into_opt() {
357                     hint.set_exact(content_length);
358                 }
359 
360                 hint
361             }
362         }
363     }
364 }
365 
366 impl fmt::Debug for Body {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result367     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
368         #[derive(Debug)]
369         struct Streaming;
370         #[derive(Debug)]
371         struct Empty;
372         #[derive(Debug)]
373         struct Full<'a>(&'a Bytes);
374 
375         let mut builder = f.debug_tuple("Body");
376         match self.kind {
377             Kind::Once(None) => builder.field(&Empty),
378             Kind::Once(Some(ref chunk)) => builder.field(&Full(chunk)),
379             _ => builder.field(&Streaming),
380         };
381 
382         builder.finish()
383     }
384 }
385 
386 /// # Optional
387 ///
388 /// This function requires enabling the `stream` feature in your
389 /// `Cargo.toml`.
390 #[cfg(feature = "stream")]
391 impl Stream for Body {
392     type Item = crate::Result<Bytes>;
393 
poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>>394     fn poll_next(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll<Option<Self::Item>> {
395         HttpBody::poll_data(self, cx)
396     }
397 }
398 
399 /// # Optional
400 ///
401 /// This function requires enabling the `stream` feature in your
402 /// `Cargo.toml`.
403 #[cfg(feature = "stream")]
404 impl From<Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>> for Body {
405     #[inline]
from( stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>, ) -> Body406     fn from(
407         stream: Box<dyn Stream<Item = Result<Bytes, Box<dyn StdError + Send + Sync>>> + Send>,
408     ) -> Body {
409         Body::new(Kind::Wrapped(SyncWrapper::new(stream.into())))
410     }
411 }
412 
413 impl From<Bytes> for Body {
414     #[inline]
from(chunk: Bytes) -> Body415     fn from(chunk: Bytes) -> Body {
416         if chunk.is_empty() {
417             Body::empty()
418         } else {
419             Body::new(Kind::Once(Some(chunk)))
420         }
421     }
422 }
423 
424 impl From<Vec<u8>> for Body {
425     #[inline]
from(vec: Vec<u8>) -> Body426     fn from(vec: Vec<u8>) -> Body {
427         Body::from(Bytes::from(vec))
428     }
429 }
430 
431 impl From<&'static [u8]> for Body {
432     #[inline]
from(slice: &'static [u8]) -> Body433     fn from(slice: &'static [u8]) -> Body {
434         Body::from(Bytes::from(slice))
435     }
436 }
437 
438 impl From<Cow<'static, [u8]>> for Body {
439     #[inline]
from(cow: Cow<'static, [u8]>) -> Body440     fn from(cow: Cow<'static, [u8]>) -> Body {
441         match cow {
442             Cow::Borrowed(b) => Body::from(b),
443             Cow::Owned(o) => Body::from(o),
444         }
445     }
446 }
447 
448 impl From<String> for Body {
449     #[inline]
from(s: String) -> Body450     fn from(s: String) -> Body {
451         Body::from(Bytes::from(s.into_bytes()))
452     }
453 }
454 
455 impl From<&'static str> for Body {
456     #[inline]
from(slice: &'static str) -> Body457     fn from(slice: &'static str) -> Body {
458         Body::from(Bytes::from(slice.as_bytes()))
459     }
460 }
461 
462 impl From<Cow<'static, str>> for Body {
463     #[inline]
from(cow: Cow<'static, str>) -> Body464     fn from(cow: Cow<'static, str>) -> Body {
465         match cow {
466             Cow::Borrowed(b) => Body::from(b),
467             Cow::Owned(o) => Body::from(o),
468         }
469     }
470 }
471 
472 impl Sender {
473     /// Check to see if this `Sender` can send more data.
poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>474     pub fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
475         // Check if the receiver end has tried polling for the body yet
476         ready!(self.poll_want(cx)?);
477         self.tx
478             .poll_ready(cx)
479             .map_err(|_| crate::Error::new_closed())
480     }
481 
poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>>482     fn poll_want(&mut self, cx: &mut task::Context<'_>) -> Poll<crate::Result<()>> {
483         match self.want_rx.load(cx) {
484             WANT_READY => Poll::Ready(Ok(())),
485             WANT_PENDING => Poll::Pending,
486             watch::CLOSED => Poll::Ready(Err(crate::Error::new_closed())),
487             unexpected => unreachable!("want_rx value: {}", unexpected),
488         }
489     }
490 
ready(&mut self) -> crate::Result<()>491     async fn ready(&mut self) -> crate::Result<()> {
492         futures_util::future::poll_fn(|cx| self.poll_ready(cx)).await
493     }
494 
495     /// Send data on this channel when it is ready.
send_data(&mut self, chunk: Bytes) -> crate::Result<()>496     pub async fn send_data(&mut self, chunk: Bytes) -> crate::Result<()> {
497         self.ready().await?;
498         self.tx
499             .try_send(Ok(chunk))
500             .map_err(|_| crate::Error::new_closed())
501     }
502 
503     /// Try to send data on this channel.
504     ///
505     /// # Errors
506     ///
507     /// Returns `Err(Bytes)` if the channel could not (currently) accept
508     /// another `Bytes`.
509     ///
510     /// # Note
511     ///
512     /// This is mostly useful for when trying to send from some other thread
513     /// that doesn't have an async context. If in an async context, prefer
514     /// `send_data()` instead.
try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes>515     pub fn try_send_data(&mut self, chunk: Bytes) -> Result<(), Bytes> {
516         self.tx
517             .try_send(Ok(chunk))
518             .map_err(|err| err.into_inner().expect("just sent Ok"))
519     }
520 
521     /// Aborts the body in an abnormal fashion.
abort(self)522     pub fn abort(self) {
523         let _ = self
524             .tx
525             // clone so the send works even if buffer is full
526             .clone()
527             .try_send(Err(crate::Error::new_body_write_aborted()));
528     }
529 
send_error(&mut self, err: crate::Error)530     pub(crate) fn send_error(&mut self, err: crate::Error) {
531         let _ = self.tx.try_send(Err(err));
532     }
533 }
534 
535 impl fmt::Debug for Sender {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result536     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
537         #[derive(Debug)]
538         struct Open;
539         #[derive(Debug)]
540         struct Closed;
541 
542         let mut builder = f.debug_tuple("Sender");
543         match self.want_rx.peek() {
544             watch::CLOSED => builder.field(&Closed),
545             _ => builder.field(&Open),
546         };
547 
548         builder.finish()
549     }
550 }
551 
552 #[cfg(test)]
553 mod tests {
554     use std::mem;
555     use std::task::Poll;
556 
557     use super::{Body, DecodedLength, HttpBody, Sender, SizeHint};
558 
559     #[test]
test_size_of()560     fn test_size_of() {
561         // These are mostly to help catch *accidentally* increasing
562         // the size by too much.
563 
564         let body_size = mem::size_of::<Body>();
565         let body_expected_size = mem::size_of::<u64>() * 6;
566         assert!(
567             body_size <= body_expected_size,
568             "Body size = {} <= {}",
569             body_size,
570             body_expected_size,
571         );
572 
573         assert_eq!(body_size, mem::size_of::<Option<Body>>(), "Option<Body>");
574 
575         assert_eq!(
576             mem::size_of::<Sender>(),
577             mem::size_of::<usize>() * 4,
578             "Sender"
579         );
580 
581         assert_eq!(
582             mem::size_of::<Sender>(),
583             mem::size_of::<Option<Sender>>(),
584             "Option<Sender>"
585         );
586     }
587 
588     #[test]
size_hint()589     fn size_hint() {
590         fn eq(body: Body, b: SizeHint, note: &str) {
591             let a = body.size_hint();
592             assert_eq!(a.lower(), b.lower(), "lower for {:?}", note);
593             assert_eq!(a.upper(), b.upper(), "upper for {:?}", note);
594         }
595 
596         eq(Body::from("Hello"), SizeHint::with_exact(5), "from str");
597 
598         eq(Body::empty(), SizeHint::with_exact(0), "empty");
599 
600         eq(Body::channel().1, SizeHint::new(), "channel");
601 
602         eq(
603             Body::new_channel(DecodedLength::new(4), /*wanter =*/ false).1,
604             SizeHint::with_exact(4),
605             "channel with length",
606         );
607     }
608 
609     #[tokio::test]
channel_abort()610     async fn channel_abort() {
611         let (tx, mut rx) = Body::channel();
612 
613         tx.abort();
614 
615         let err = rx.data().await.unwrap().unwrap_err();
616         assert!(err.is_body_write_aborted(), "{:?}", err);
617     }
618 
619     #[tokio::test]
channel_abort_when_buffer_is_full()620     async fn channel_abort_when_buffer_is_full() {
621         let (mut tx, mut rx) = Body::channel();
622 
623         tx.try_send_data("chunk 1".into()).expect("send 1");
624         // buffer is full, but can still send abort
625         tx.abort();
626 
627         let chunk1 = rx.data().await.expect("item 1").expect("chunk 1");
628         assert_eq!(chunk1, "chunk 1");
629 
630         let err = rx.data().await.unwrap().unwrap_err();
631         assert!(err.is_body_write_aborted(), "{:?}", err);
632     }
633 
634     #[test]
channel_buffers_one()635     fn channel_buffers_one() {
636         let (mut tx, _rx) = Body::channel();
637 
638         tx.try_send_data("chunk 1".into()).expect("send 1");
639 
640         // buffer is now full
641         let chunk2 = tx.try_send_data("chunk 2".into()).expect_err("send 2");
642         assert_eq!(chunk2, "chunk 2");
643     }
644 
645     #[tokio::test]
channel_empty()646     async fn channel_empty() {
647         let (_, mut rx) = Body::channel();
648 
649         assert!(rx.data().await.is_none());
650     }
651 
652     #[test]
channel_ready()653     fn channel_ready() {
654         let (mut tx, _rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ false);
655 
656         let mut tx_ready = tokio_test::task::spawn(tx.ready());
657 
658         assert!(tx_ready.poll().is_ready(), "tx is ready immediately");
659     }
660 
661     #[test]
channel_wanter()662     fn channel_wanter() {
663         let (mut tx, mut rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
664 
665         let mut tx_ready = tokio_test::task::spawn(tx.ready());
666         let mut rx_data = tokio_test::task::spawn(rx.data());
667 
668         assert!(
669             tx_ready.poll().is_pending(),
670             "tx isn't ready before rx has been polled"
671         );
672 
673         assert!(rx_data.poll().is_pending(), "poll rx.data");
674         assert!(tx_ready.is_woken(), "rx poll wakes tx");
675 
676         assert!(
677             tx_ready.poll().is_ready(),
678             "tx is ready after rx has been polled"
679         );
680     }
681 
682     #[test]
channel_notices_closure()683     fn channel_notices_closure() {
684         let (mut tx, rx) = Body::new_channel(DecodedLength::CHUNKED, /*wanter = */ true);
685 
686         let mut tx_ready = tokio_test::task::spawn(tx.ready());
687 
688         assert!(
689             tx_ready.poll().is_pending(),
690             "tx isn't ready before rx has been polled"
691         );
692 
693         drop(rx);
694         assert!(tx_ready.is_woken(), "dropping rx wakes tx");
695 
696         match tx_ready.poll() {
697             Poll::Ready(Err(ref e)) if e.is_closed() => (),
698             unexpected => panic!("tx poll ready unexpected: {:?}", unexpected),
699         }
700     }
701 }
702