1 use std::fmt;
2 use std::future::Future;
3 use std::pin::Pin;
4 use std::task::{Context, Poll};
5 
6 #[cfg(feature = "gzip")]
7 use async_compression::tokio::bufread::GzipDecoder;
8 
9 #[cfg(feature = "brotli")]
10 use async_compression::tokio::bufread::BrotliDecoder;
11 
12 #[cfg(feature = "deflate")]
13 use async_compression::tokio::bufread::ZlibDecoder;
14 
15 use bytes::Bytes;
16 use futures_core::Stream;
17 use futures_util::stream::Peekable;
18 use http::HeaderMap;
19 use hyper::body::HttpBody;
20 
21 #[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))]
22 use tokio_util::codec::{BytesCodec, FramedRead};
23 #[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))]
24 use tokio_util::io::StreamReader;
25 
26 use super::super::Body;
27 use crate::error;
28 
29 #[derive(Clone, Copy, Debug)]
30 pub(super) struct Accepts {
31     #[cfg(feature = "gzip")]
32     pub(super) gzip: bool,
33     #[cfg(feature = "brotli")]
34     pub(super) brotli: bool,
35     #[cfg(feature = "deflate")]
36     pub(super) deflate: bool,
37 }
38 
39 /// A response decompressor over a non-blocking stream of chunks.
40 ///
41 /// The inner decoder may be constructed asynchronously.
42 pub(crate) struct Decoder {
43     inner: Inner,
44 }
45 
46 enum Inner {
47     /// A `PlainText` decoder just returns the response content as is.
48     PlainText(super::body::ImplStream),
49 
50     /// A `Gzip` decoder will uncompress the gzipped response content before returning it.
51     #[cfg(feature = "gzip")]
52     Gzip(FramedRead<GzipDecoder<StreamReader<Peekable<IoStream>, Bytes>>, BytesCodec>),
53 
54     /// A `Brotli` decoder will uncompress the brotlied response content before returning it.
55     #[cfg(feature = "brotli")]
56     Brotli(FramedRead<BrotliDecoder<StreamReader<Peekable<IoStream>, Bytes>>, BytesCodec>),
57 
58     /// A `Deflate` decoder will uncompress the deflated response content before returning it.
59     #[cfg(feature = "deflate")]
60     Deflate(FramedRead<ZlibDecoder<StreamReader<Peekable<IoStream>, Bytes>>, BytesCodec>),
61 
62     /// A decoder that doesn't have a value yet.
63     #[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
64     Pending(Pending),
65 }
66 
67 /// A future attempt to poll the response body for EOF so we know whether to use gzip or not.
68 struct Pending(Peekable<IoStream>, DecoderType);
69 
70 struct IoStream(super::body::ImplStream);
71 
72 enum DecoderType {
73     #[cfg(feature = "gzip")]
74     Gzip,
75     #[cfg(feature = "brotli")]
76     Brotli,
77     #[cfg(feature = "deflate")]
78     Deflate,
79 }
80 
81 impl fmt::Debug for Decoder {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result82     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
83         f.debug_struct("Decoder").finish()
84     }
85 }
86 
87 impl Decoder {
88     #[cfg(feature = "blocking")]
empty() -> Decoder89     pub(crate) fn empty() -> Decoder {
90         Decoder {
91             inner: Inner::PlainText(Body::empty().into_stream()),
92         }
93     }
94 
95     /// A plain text decoder.
96     ///
97     /// This decoder will emit the underlying chunks as-is.
plain_text(body: Body) -> Decoder98     fn plain_text(body: Body) -> Decoder {
99         Decoder {
100             inner: Inner::PlainText(body.into_stream()),
101         }
102     }
103 
104     /// A gzip decoder.
105     ///
106     /// This decoder will buffer and decompress chunks that are gzipped.
107     #[cfg(feature = "gzip")]
gzip(body: Body) -> Decoder108     fn gzip(body: Body) -> Decoder {
109         use futures_util::StreamExt;
110 
111         Decoder {
112             inner: Inner::Pending(Pending(
113                 IoStream(body.into_stream()).peekable(),
114                 DecoderType::Gzip,
115             )),
116         }
117     }
118 
119     /// A brotli decoder.
120     ///
121     /// This decoder will buffer and decompress chunks that are brotlied.
122     #[cfg(feature = "brotli")]
brotli(body: Body) -> Decoder123     fn brotli(body: Body) -> Decoder {
124         use futures_util::StreamExt;
125 
126         Decoder {
127             inner: Inner::Pending(Pending(
128                 IoStream(body.into_stream()).peekable(),
129                 DecoderType::Brotli,
130             )),
131         }
132     }
133 
134     /// A deflate decoder.
135     ///
136     /// This decoder will buffer and decompress chunks that are deflated.
137     #[cfg(feature = "deflate")]
deflate(body: Body) -> Decoder138     fn deflate(body: Body) -> Decoder {
139         use futures_util::StreamExt;
140 
141         Decoder {
142             inner: Inner::Pending(Pending(
143                 IoStream(body.into_stream()).peekable(),
144                 DecoderType::Deflate,
145             )),
146         }
147     }
148 
149     #[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
detect_encoding(headers: &mut HeaderMap, encoding_str: &str) -> bool150     fn detect_encoding(headers: &mut HeaderMap, encoding_str: &str) -> bool {
151         use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
152         use log::warn;
153 
154         let mut is_content_encoded = {
155             headers
156                 .get_all(CONTENT_ENCODING)
157                 .iter()
158                 .any(|enc| enc == encoding_str)
159                 || headers
160                     .get_all(TRANSFER_ENCODING)
161                     .iter()
162                     .any(|enc| enc == encoding_str)
163         };
164         if is_content_encoded {
165             if let Some(content_length) = headers.get(CONTENT_LENGTH) {
166                 if content_length == "0" {
167                     warn!("{} response with content-length of 0", encoding_str);
168                     is_content_encoded = false;
169                 }
170             }
171         }
172         if is_content_encoded {
173             headers.remove(CONTENT_ENCODING);
174             headers.remove(CONTENT_LENGTH);
175         }
176         is_content_encoded
177     }
178 
179     /// Constructs a Decoder from a hyper request.
180     ///
181     /// A decoder is just a wrapper around the hyper request that knows
182     /// how to decode the content body of the request.
183     ///
184     /// Uses the correct variant by inspecting the Content-Encoding header.
detect(_headers: &mut HeaderMap, body: Body, _accepts: Accepts) -> Decoder185     pub(super) fn detect(_headers: &mut HeaderMap, body: Body, _accepts: Accepts) -> Decoder {
186         #[cfg(feature = "gzip")]
187         {
188             if _accepts.gzip && Decoder::detect_encoding(_headers, "gzip") {
189                 return Decoder::gzip(body);
190             }
191         }
192 
193         #[cfg(feature = "brotli")]
194         {
195             if _accepts.brotli && Decoder::detect_encoding(_headers, "br") {
196                 return Decoder::brotli(body);
197             }
198         }
199 
200         #[cfg(feature = "deflate")]
201         {
202             if _accepts.deflate && Decoder::detect_encoding(_headers, "deflate") {
203                 return Decoder::deflate(body);
204             }
205         }
206 
207         Decoder::plain_text(body)
208     }
209 }
210 
211 impl Stream for Decoder {
212     type Item = Result<Bytes, error::Error>;
213 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>214     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
215         // Do a read or poll for a pending decoder value.
216         match self.inner {
217             #[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
218             Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) {
219                 Poll::Ready(Ok(inner)) => {
220                     self.inner = inner;
221                     return self.poll_next(cx);
222                 }
223                 Poll::Ready(Err(e)) => {
224                     return Poll::Ready(Some(Err(crate::error::decode_io(e))));
225                 }
226                 Poll::Pending => return Poll::Pending,
227             },
228             Inner::PlainText(ref mut body) => Pin::new(body).poll_next(cx),
229             #[cfg(feature = "gzip")]
230             Inner::Gzip(ref mut decoder) => {
231                 return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
232                     Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))),
233                     Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
234                     None => Poll::Ready(None),
235                 };
236             }
237             #[cfg(feature = "brotli")]
238             Inner::Brotli(ref mut decoder) => {
239                 return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
240                     Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))),
241                     Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
242                     None => Poll::Ready(None),
243                 };
244             }
245             #[cfg(feature = "deflate")]
246             Inner::Deflate(ref mut decoder) => {
247                 return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
248                     Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))),
249                     Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
250                     None => Poll::Ready(None),
251                 };
252             }
253         }
254     }
255 }
256 
257 impl HttpBody for Decoder {
258     type Data = Bytes;
259     type Error = crate::Error;
260 
poll_data( self: Pin<&mut Self>, cx: &mut Context, ) -> Poll<Option<Result<Self::Data, Self::Error>>>261     fn poll_data(
262         self: Pin<&mut Self>,
263         cx: &mut Context,
264     ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
265         self.poll_next(cx)
266     }
267 
poll_trailers( self: Pin<&mut Self>, _cx: &mut Context, ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>>268     fn poll_trailers(
269         self: Pin<&mut Self>,
270         _cx: &mut Context,
271     ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
272         Poll::Ready(Ok(None))
273     }
274 
size_hint(&self) -> http_body::SizeHint275     fn size_hint(&self) -> http_body::SizeHint {
276         match self.inner {
277             Inner::PlainText(ref body) => HttpBody::size_hint(body),
278             // the rest are "unknown", so default
279             #[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))]
280             _ => http_body::SizeHint::default(),
281         }
282     }
283 }
284 
285 impl Future for Pending {
286     type Output = Result<Inner, std::io::Error>;
287 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>288     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
289         use futures_util::StreamExt;
290 
291         match futures_core::ready!(Pin::new(&mut self.0).poll_peek(cx)) {
292             Some(Ok(_)) => {
293                 // fallthrough
294             }
295             Some(Err(_e)) => {
296                 // error was just a ref, so we need to really poll to move it
297                 return Poll::Ready(Err(futures_core::ready!(
298                     Pin::new(&mut self.0).poll_next(cx)
299                 )
300                 .expect("just peeked Some")
301                 .unwrap_err()));
302             }
303             None => return Poll::Ready(Ok(Inner::PlainText(Body::empty().into_stream()))),
304         };
305 
306         let _body = std::mem::replace(
307             &mut self.0,
308             IoStream(Body::empty().into_stream()).peekable(),
309         );
310 
311         match self.1 {
312             #[cfg(feature = "brotli")]
313             DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(FramedRead::new(
314                 BrotliDecoder::new(StreamReader::new(_body)),
315                 BytesCodec::new(),
316             )))),
317             #[cfg(feature = "gzip")]
318             DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(FramedRead::new(
319                 GzipDecoder::new(StreamReader::new(_body)),
320                 BytesCodec::new(),
321             )))),
322             #[cfg(feature = "deflate")]
323             DecoderType::Deflate => Poll::Ready(Ok(Inner::Deflate(FramedRead::new(
324                 ZlibDecoder::new(StreamReader::new(_body)),
325                 BytesCodec::new(),
326             )))),
327         }
328     }
329 }
330 
331 impl Stream for IoStream {
332     type Item = Result<Bytes, std::io::Error>;
333 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>334     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
335         match futures_core::ready!(Pin::new(&mut self.0).poll_next(cx)) {
336             Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk))),
337             Some(Err(err)) => Poll::Ready(Some(Err(err.into_io()))),
338             None => Poll::Ready(None),
339         }
340     }
341 }
342 
343 // ===== impl Accepts =====
344 
345 impl Accepts {
none() -> Self346     pub(super) fn none() -> Self {
347         Accepts {
348             #[cfg(feature = "gzip")]
349             gzip: false,
350             #[cfg(feature = "brotli")]
351             brotli: false,
352             #[cfg(feature = "deflate")]
353             deflate: false,
354         }
355     }
356 
as_str(&self) -> Option<&'static str>357     pub(super) fn as_str(&self) -> Option<&'static str> {
358         match (self.is_gzip(), self.is_brotli(), self.is_deflate()) {
359             (true, true, true) => Some("gzip, br, deflate"),
360             (true, true, false) => Some("gzip, br"),
361             (true, false, true) => Some("gzip, deflate"),
362             (false, true, true) => Some("br, deflate"),
363             (true, false, false) => Some("gzip"),
364             (false, true, false) => Some("br"),
365             (false, false, true) => Some("deflate"),
366             (false, false, false) => None,
367         }
368     }
369 
is_gzip(&self) -> bool370     fn is_gzip(&self) -> bool {
371         #[cfg(feature = "gzip")]
372         {
373             self.gzip
374         }
375 
376         #[cfg(not(feature = "gzip"))]
377         {
378             false
379         }
380     }
381 
is_brotli(&self) -> bool382     fn is_brotli(&self) -> bool {
383         #[cfg(feature = "brotli")]
384         {
385             self.brotli
386         }
387 
388         #[cfg(not(feature = "brotli"))]
389         {
390             false
391         }
392     }
393 
is_deflate(&self) -> bool394     fn is_deflate(&self) -> bool {
395         #[cfg(feature = "deflate")]
396         {
397             self.deflate
398         }
399 
400         #[cfg(not(feature = "deflate"))]
401         {
402             false
403         }
404     }
405 }
406 
407 impl Default for Accepts {
default() -> Accepts408     fn default() -> Accepts {
409         Accepts {
410             #[cfg(feature = "gzip")]
411             gzip: true,
412             #[cfg(feature = "brotli")]
413             brotli: true,
414             #[cfg(feature = "deflate")]
415             deflate: true,
416         }
417     }
418 }
419