1 use std::fmt;
2 use std::future::Future;
3 use std::pin::Pin;
4 use std::task::{Context, Poll};
5 
6 #[cfg(feature = "gzip")]
7 use async_compression::stream::GzipDecoder;
8 
9 #[cfg(feature = "brotli")]
10 use async_compression::stream::BrotliDecoder;
11 
12 use bytes::Bytes;
13 use futures_core::Stream;
14 use futures_util::stream::Peekable;
15 use http::HeaderMap;
16 use hyper::body::HttpBody;
17 
18 use super::super::Body;
19 use crate::error;
20 
21 #[derive(Clone, Copy, Debug)]
22 pub(super) struct Accepts {
23     #[cfg(feature = "gzip")]
24     pub(super) gzip: bool,
25     #[cfg(feature = "brotli")]
26     pub(super) brotli: bool,
27 }
28 
29 /// A response decompressor over a non-blocking stream of chunks.
30 ///
31 /// The inner decoder may be constructed asynchronously.
32 pub(crate) struct Decoder {
33     inner: Inner,
34 }
35 
36 enum Inner {
37     /// A `PlainText` decoder just returns the response content as is.
38     PlainText(super::body::ImplStream),
39 
40     /// A `Gzip` decoder will uncompress the gzipped response content before returning it.
41     #[cfg(feature = "gzip")]
42     Gzip(GzipDecoder<Peekable<IoStream>>),
43 
44     /// A `Brotli` decoder will uncompress the brotlied response content before returning it.
45     #[cfg(feature = "brotli")]
46     Brotli(BrotliDecoder<Peekable<IoStream>>),
47 
48     /// A decoder that doesn't have a value yet.
49     #[cfg(any(feature = "brotli", feature = "gzip"))]
50     Pending(Pending),
51 }
52 
53 /// A future attempt to poll the response body for EOF so we know whether to use gzip or not.
54 struct Pending(Peekable<IoStream>, DecoderType);
55 
56 struct IoStream(super::body::ImplStream);
57 
58 enum DecoderType {
59     #[cfg(feature = "gzip")]
60     Gzip,
61     #[cfg(feature = "brotli")]
62     Brotli,
63 }
64 
65 impl fmt::Debug for Decoder {
fmt(&self, f: &mut fmt::Formatter) -> fmt::Result66     fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
67         f.debug_struct("Decoder").finish()
68     }
69 }
70 
71 impl Decoder {
72     #[cfg(feature = "blocking")]
empty() -> Decoder73     pub(crate) fn empty() -> Decoder {
74         Decoder {
75             inner: Inner::PlainText(Body::empty().into_stream()),
76         }
77     }
78 
79     /// A plain text decoder.
80     ///
81     /// This decoder will emit the underlying chunks as-is.
plain_text(body: Body) -> Decoder82     fn plain_text(body: Body) -> Decoder {
83         Decoder {
84             inner: Inner::PlainText(body.into_stream()),
85         }
86     }
87 
88     /// A gzip decoder.
89     ///
90     /// This decoder will buffer and decompress chunks that are gzipped.
91     #[cfg(feature = "gzip")]
gzip(body: Body) -> Decoder92     fn gzip(body: Body) -> Decoder {
93         use futures_util::StreamExt;
94 
95         Decoder {
96             inner: Inner::Pending(Pending(
97                 IoStream(body.into_stream()).peekable(),
98                 DecoderType::Gzip,
99             )),
100         }
101     }
102 
103     /// A brotli decoder.
104     ///
105     /// This decoder will buffer and decompress chunks that are brotlied.
106     #[cfg(feature = "brotli")]
brotli(body: Body) -> Decoder107     fn brotli(body: Body) -> Decoder {
108         use futures_util::StreamExt;
109 
110         Decoder {
111             inner: Inner::Pending(Pending(
112                 IoStream(body.into_stream()).peekable(),
113                 DecoderType::Brotli,
114             )),
115         }
116     }
117 
118     #[cfg(feature = "gzip")]
detect_gzip(headers: &mut HeaderMap) -> bool119     fn detect_gzip(headers: &mut HeaderMap) -> bool {
120         use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
121         use log::warn;
122 
123         let content_encoding_gzip: bool;
124         let mut is_gzip = {
125             content_encoding_gzip = headers
126                 .get_all(CONTENT_ENCODING)
127                 .iter()
128                 .any(|enc| enc == "gzip");
129             content_encoding_gzip
130                 || headers
131                     .get_all(TRANSFER_ENCODING)
132                     .iter()
133                     .any(|enc| enc == "gzip")
134         };
135         if is_gzip {
136             if let Some(content_length) = headers.get(CONTENT_LENGTH) {
137                 if content_length == "0" {
138                     warn!("gzip response with content-length of 0");
139                     is_gzip = false;
140                 }
141             }
142         }
143         if is_gzip {
144             headers.remove(CONTENT_ENCODING);
145             headers.remove(CONTENT_LENGTH);
146         }
147         is_gzip
148     }
149 
150     #[cfg(feature = "brotli")]
detect_brotli(headers: &mut HeaderMap) -> bool151     fn detect_brotli(headers: &mut HeaderMap) -> bool {
152         use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING};
153         use log::warn;
154 
155         let content_encoding_gzip: bool;
156         let mut is_brotli = {
157             content_encoding_gzip = headers
158                 .get_all(CONTENT_ENCODING)
159                 .iter()
160                 .any(|enc| enc == "br");
161             content_encoding_gzip
162                 || headers
163                     .get_all(TRANSFER_ENCODING)
164                     .iter()
165                     .any(|enc| enc == "br")
166         };
167         if is_brotli {
168             if let Some(content_length) = headers.get(CONTENT_LENGTH) {
169                 if content_length == "0" {
170                     warn!("brotli response with content-length of 0");
171                     is_brotli = false;
172                 }
173             }
174         }
175         if is_brotli {
176             headers.remove(CONTENT_ENCODING);
177             headers.remove(CONTENT_LENGTH);
178         }
179         is_brotli
180     }
181 
182     /// Constructs a Decoder from a hyper request.
183     ///
184     /// A decoder is just a wrapper around the hyper request that knows
185     /// how to decode the content body of the request.
186     ///
187     /// Uses the correct variant by inspecting the Content-Encoding header.
detect( _headers: &mut HeaderMap, body: Body, _accepts: Accepts, ) -> Decoder188     pub(super) fn detect(
189         _headers: &mut HeaderMap,
190         body: Body,
191         _accepts: Accepts,
192     ) -> Decoder {
193         #[cfg(feature = "gzip")]
194         {
195             if _accepts.gzip && Decoder::detect_gzip(_headers) {
196                 return Decoder::gzip(body);
197             }
198         }
199 
200         #[cfg(feature = "brotli")]
201         {
202             if _accepts.brotli && Decoder::detect_brotli(_headers) {
203                 return Decoder::brotli(body);
204             }
205         }
206 
207         Decoder::plain_text(body)
208     }
209 }
210 
211 impl Stream for Decoder {
212     type Item = Result<Bytes, error::Error>;
213 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>214     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
215         // Do a read or poll for a pending decoder value.
216         match self.inner {
217             #[cfg(any(feature = "brotli", feature = "gzip"))]
218             Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) {
219                 Poll::Ready(Ok(inner)) => {
220                     self.inner = inner;
221                     return self.poll_next(cx);
222                 }
223                 Poll::Ready(Err(e)) => {
224                     return Poll::Ready(Some(Err(crate::error::decode_io(e))));
225                 }
226                 Poll::Pending => return Poll::Pending,
227             },
228             Inner::PlainText(ref mut body) => return Pin::new(body).poll_next(cx),
229             #[cfg(feature = "gzip")]
230             Inner::Gzip(ref mut decoder) => {
231                 return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
232                     Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))),
233                     Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
234                     None => Poll::Ready(None),
235                 };
236             }
237             #[cfg(feature = "brotli")]
238             Inner::Brotli(ref mut decoder) => {
239                 return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) {
240                     Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))),
241                     Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))),
242                     None => Poll::Ready(None),
243                 };
244             }
245         };
246     }
247 }
248 
249 impl HttpBody for Decoder {
250     type Data = Bytes;
251     type Error = crate::Error;
252 
poll_data( self: Pin<&mut Self>, cx: &mut Context, ) -> Poll<Option<Result<Self::Data, Self::Error>>>253     fn poll_data(
254         self: Pin<&mut Self>,
255         cx: &mut Context,
256     ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
257         self.poll_next(cx)
258     }
259 
poll_trailers( self: Pin<&mut Self>, _cx: &mut Context, ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>>260     fn poll_trailers(
261         self: Pin<&mut Self>,
262         _cx: &mut Context,
263     ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
264         Poll::Ready(Ok(None))
265     }
266 
size_hint(&self) -> http_body::SizeHint267     fn size_hint(&self) -> http_body::SizeHint {
268         match self.inner {
269             Inner::PlainText(ref body) => HttpBody::size_hint(body),
270             // the rest are "unknown", so default
271             #[cfg(any(feature = "brotli", feature = "gzip"))]
272             _ => http_body::SizeHint::default(),
273         }
274     }
275 }
276 
277 impl Future for Pending {
278     type Output = Result<Inner, std::io::Error>;
279 
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>280     fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
281         use futures_util::StreamExt;
282 
283         match futures_core::ready!(Pin::new(&mut self.0).poll_peek(cx)) {
284             Some(Ok(_)) => {
285                 // fallthrough
286             }
287             Some(Err(_e)) => {
288                 // error was just a ref, so we need to really poll to move it
289                 return Poll::Ready(Err(futures_core::ready!(
290                     Pin::new(&mut self.0).poll_next(cx)
291                 )
292                 .expect("just peeked Some")
293                 .unwrap_err()));
294             }
295             None => return Poll::Ready(Ok(Inner::PlainText(Body::empty().into_stream()))),
296         };
297 
298         let _body = std::mem::replace(
299             &mut self.0,
300             IoStream(Body::empty().into_stream()).peekable(),
301         );
302 
303         match self.1 {
304             #[cfg(feature = "brotli")]
305             DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(BrotliDecoder::new(_body)))),
306             #[cfg(feature = "gzip")]
307             DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(GzipDecoder::new(_body)))),
308         }
309     }
310 }
311 
312 impl Stream for IoStream {
313     type Item = Result<Bytes, std::io::Error>;
314 
poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>315     fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
316         match futures_core::ready!(Pin::new(&mut self.0).poll_next(cx)) {
317             Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk))),
318             Some(Err(err)) => Poll::Ready(Some(Err(err.into_io()))),
319             None => Poll::Ready(None),
320         }
321     }
322 }
323 
324 // ===== impl Accepts =====
325 
326 impl Accepts {
none() -> Self327     pub(super) fn none() -> Self {
328         Accepts {
329             #[cfg(feature = "gzip")]
330             gzip: false,
331             #[cfg(feature = "brotli")]
332             brotli: false,
333         }
334     }
335 
as_str(&self) -> Option<&'static str>336     pub(super) fn as_str(&self) -> Option<&'static str> {
337         match (self.is_gzip(), self.is_brotli()) {
338             (true, true) => Some("gzip, br"),
339             (true, false) => Some("gzip"),
340             (false, true) => Some("br"),
341             _ => None,
342         }
343     }
344 
is_gzip(&self) -> bool345     fn is_gzip(&self) -> bool {
346         #[cfg(feature = "gzip")]
347         {
348             self.gzip
349         }
350 
351         #[cfg(not(feature = "gzip"))]
352         {
353             false
354         }
355     }
356 
is_brotli(&self) -> bool357     fn is_brotli(&self) -> bool {
358         #[cfg(feature = "brotli")]
359         {
360             self.brotli
361         }
362 
363         #[cfg(not(feature = "brotli"))]
364         {
365             false
366         }
367     }
368 }
369 
370 impl Default for Accepts {
default() -> Accepts371     fn default() -> Accepts {
372         Accepts {
373             #[cfg(feature = "gzip")]
374             gzip: true,
375             #[cfg(feature = "brotli")]
376             brotli: true,
377         }
378     }
379 }
380