1 use std::fmt; 2 use std::future::Future; 3 use std::pin::Pin; 4 use std::task::{Context, Poll}; 5 6 #[cfg(feature = "gzip")] 7 use async_compression::stream::GzipDecoder; 8 9 #[cfg(feature = "brotli")] 10 use async_compression::stream::BrotliDecoder; 11 12 use bytes::Bytes; 13 use futures_core::Stream; 14 use futures_util::stream::Peekable; 15 use http::HeaderMap; 16 use hyper::body::HttpBody; 17 18 use super::super::Body; 19 use crate::error; 20 21 #[derive(Clone, Copy, Debug)] 22 pub(super) struct Accepts { 23 #[cfg(feature = "gzip")] 24 pub(super) gzip: bool, 25 #[cfg(feature = "brotli")] 26 pub(super) brotli: bool, 27 } 28 29 /// A response decompressor over a non-blocking stream of chunks. 30 /// 31 /// The inner decoder may be constructed asynchronously. 32 pub(crate) struct Decoder { 33 inner: Inner, 34 } 35 36 enum Inner { 37 /// A `PlainText` decoder just returns the response content as is. 38 PlainText(super::body::ImplStream), 39 40 /// A `Gzip` decoder will uncompress the gzipped response content before returning it. 41 #[cfg(feature = "gzip")] 42 Gzip(GzipDecoder<Peekable<IoStream>>), 43 44 /// A `Brotli` decoder will uncompress the brotlied response content before returning it. 45 #[cfg(feature = "brotli")] 46 Brotli(BrotliDecoder<Peekable<IoStream>>), 47 48 /// A decoder that doesn't have a value yet. 49 #[cfg(any(feature = "brotli", feature = "gzip"))] 50 Pending(Pending), 51 } 52 53 /// A future attempt to poll the response body for EOF so we know whether to use gzip or not. 54 struct Pending(Peekable<IoStream>, DecoderType); 55 56 struct IoStream(super::body::ImplStream); 57 58 enum DecoderType { 59 #[cfg(feature = "gzip")] 60 Gzip, 61 #[cfg(feature = "brotli")] 62 Brotli, 63 } 64 65 impl fmt::Debug for Decoder { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result66 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 67 f.debug_struct("Decoder").finish() 68 } 69 } 70 71 impl Decoder { 72 #[cfg(feature = "blocking")] empty() -> Decoder73 pub(crate) fn empty() -> Decoder { 74 Decoder { 75 inner: Inner::PlainText(Body::empty().into_stream()), 76 } 77 } 78 79 /// A plain text decoder. 80 /// 81 /// This decoder will emit the underlying chunks as-is. plain_text(body: Body) -> Decoder82 fn plain_text(body: Body) -> Decoder { 83 Decoder { 84 inner: Inner::PlainText(body.into_stream()), 85 } 86 } 87 88 /// A gzip decoder. 89 /// 90 /// This decoder will buffer and decompress chunks that are gzipped. 91 #[cfg(feature = "gzip")] gzip(body: Body) -> Decoder92 fn gzip(body: Body) -> Decoder { 93 use futures_util::StreamExt; 94 95 Decoder { 96 inner: Inner::Pending(Pending( 97 IoStream(body.into_stream()).peekable(), 98 DecoderType::Gzip, 99 )), 100 } 101 } 102 103 /// A brotli decoder. 104 /// 105 /// This decoder will buffer and decompress chunks that are brotlied. 106 #[cfg(feature = "brotli")] brotli(body: Body) -> Decoder107 fn brotli(body: Body) -> Decoder { 108 use futures_util::StreamExt; 109 110 Decoder { 111 inner: Inner::Pending(Pending( 112 IoStream(body.into_stream()).peekable(), 113 DecoderType::Brotli, 114 )), 115 } 116 } 117 118 #[cfg(feature = "gzip")] detect_gzip(headers: &mut HeaderMap) -> bool119 fn detect_gzip(headers: &mut HeaderMap) -> bool { 120 use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING}; 121 use log::warn; 122 123 let content_encoding_gzip: bool; 124 let mut is_gzip = { 125 content_encoding_gzip = headers 126 .get_all(CONTENT_ENCODING) 127 .iter() 128 .any(|enc| enc == "gzip"); 129 content_encoding_gzip 130 || headers 131 .get_all(TRANSFER_ENCODING) 132 .iter() 133 .any(|enc| enc == "gzip") 134 }; 135 if is_gzip { 136 if let Some(content_length) = headers.get(CONTENT_LENGTH) { 137 if content_length == "0" { 138 warn!("gzip response with content-length of 0"); 139 is_gzip = false; 140 } 141 } 142 } 143 if is_gzip { 144 headers.remove(CONTENT_ENCODING); 145 headers.remove(CONTENT_LENGTH); 146 } 147 is_gzip 148 } 149 150 #[cfg(feature = "brotli")] detect_brotli(headers: &mut HeaderMap) -> bool151 fn detect_brotli(headers: &mut HeaderMap) -> bool { 152 use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING}; 153 use log::warn; 154 155 let content_encoding_gzip: bool; 156 let mut is_brotli = { 157 content_encoding_gzip = headers 158 .get_all(CONTENT_ENCODING) 159 .iter() 160 .any(|enc| enc == "br"); 161 content_encoding_gzip 162 || headers 163 .get_all(TRANSFER_ENCODING) 164 .iter() 165 .any(|enc| enc == "br") 166 }; 167 if is_brotli { 168 if let Some(content_length) = headers.get(CONTENT_LENGTH) { 169 if content_length == "0" { 170 warn!("brotli response with content-length of 0"); 171 is_brotli = false; 172 } 173 } 174 } 175 if is_brotli { 176 headers.remove(CONTENT_ENCODING); 177 headers.remove(CONTENT_LENGTH); 178 } 179 is_brotli 180 } 181 182 /// Constructs a Decoder from a hyper request. 183 /// 184 /// A decoder is just a wrapper around the hyper request that knows 185 /// how to decode the content body of the request. 186 /// 187 /// Uses the correct variant by inspecting the Content-Encoding header. detect( _headers: &mut HeaderMap, body: Body, _accepts: Accepts, ) -> Decoder188 pub(super) fn detect( 189 _headers: &mut HeaderMap, 190 body: Body, 191 _accepts: Accepts, 192 ) -> Decoder { 193 #[cfg(feature = "gzip")] 194 { 195 if _accepts.gzip && Decoder::detect_gzip(_headers) { 196 return Decoder::gzip(body); 197 } 198 } 199 200 #[cfg(feature = "brotli")] 201 { 202 if _accepts.brotli && Decoder::detect_brotli(_headers) { 203 return Decoder::brotli(body); 204 } 205 } 206 207 Decoder::plain_text(body) 208 } 209 } 210 211 impl Stream for Decoder { 212 type Item = Result<Bytes, error::Error>; 213 poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>214 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { 215 // Do a read or poll for a pending decoder value. 216 match self.inner { 217 #[cfg(any(feature = "brotli", feature = "gzip"))] 218 Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) { 219 Poll::Ready(Ok(inner)) => { 220 self.inner = inner; 221 return self.poll_next(cx); 222 } 223 Poll::Ready(Err(e)) => { 224 return Poll::Ready(Some(Err(crate::error::decode_io(e)))); 225 } 226 Poll::Pending => return Poll::Pending, 227 }, 228 Inner::PlainText(ref mut body) => Pin::new(body).poll_next(cx), 229 #[cfg(feature = "gzip")] 230 Inner::Gzip(ref mut decoder) => { 231 return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { 232 Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))), 233 Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), 234 None => Poll::Ready(None), 235 }; 236 } 237 #[cfg(feature = "brotli")] 238 Inner::Brotli(ref mut decoder) => { 239 return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { 240 Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes))), 241 Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), 242 None => Poll::Ready(None), 243 }; 244 } 245 } 246 } 247 } 248 249 impl HttpBody for Decoder { 250 type Data = Bytes; 251 type Error = crate::Error; 252 poll_data( self: Pin<&mut Self>, cx: &mut Context, ) -> Poll<Option<Result<Self::Data, Self::Error>>>253 fn poll_data( 254 self: Pin<&mut Self>, 255 cx: &mut Context, 256 ) -> Poll<Option<Result<Self::Data, Self::Error>>> { 257 self.poll_next(cx) 258 } 259 poll_trailers( self: Pin<&mut Self>, _cx: &mut Context, ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>>260 fn poll_trailers( 261 self: Pin<&mut Self>, 262 _cx: &mut Context, 263 ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> { 264 Poll::Ready(Ok(None)) 265 } 266 size_hint(&self) -> http_body::SizeHint267 fn size_hint(&self) -> http_body::SizeHint { 268 match self.inner { 269 Inner::PlainText(ref body) => HttpBody::size_hint(body), 270 // the rest are "unknown", so default 271 #[cfg(any(feature = "brotli", feature = "gzip"))] 272 _ => http_body::SizeHint::default(), 273 } 274 } 275 } 276 277 impl Future for Pending { 278 type Output = Result<Inner, std::io::Error>; 279 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>280 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 281 use futures_util::StreamExt; 282 283 match futures_core::ready!(Pin::new(&mut self.0).poll_peek(cx)) { 284 Some(Ok(_)) => { 285 // fallthrough 286 } 287 Some(Err(_e)) => { 288 // error was just a ref, so we need to really poll to move it 289 return Poll::Ready(Err(futures_core::ready!( 290 Pin::new(&mut self.0).poll_next(cx) 291 ) 292 .expect("just peeked Some") 293 .unwrap_err())); 294 } 295 None => return Poll::Ready(Ok(Inner::PlainText(Body::empty().into_stream()))), 296 }; 297 298 let _body = std::mem::replace( 299 &mut self.0, 300 IoStream(Body::empty().into_stream()).peekable(), 301 ); 302 303 match self.1 { 304 #[cfg(feature = "brotli")] 305 DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(BrotliDecoder::new(_body)))), 306 #[cfg(feature = "gzip")] 307 DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(GzipDecoder::new(_body)))), 308 } 309 } 310 } 311 312 impl Stream for IoStream { 313 type Item = Result<Bytes, std::io::Error>; 314 poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>315 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { 316 match futures_core::ready!(Pin::new(&mut self.0).poll_next(cx)) { 317 Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk))), 318 Some(Err(err)) => Poll::Ready(Some(Err(err.into_io()))), 319 None => Poll::Ready(None), 320 } 321 } 322 } 323 324 // ===== impl Accepts ===== 325 326 impl Accepts { none() -> Self327 pub(super) fn none() -> Self { 328 Accepts { 329 #[cfg(feature = "gzip")] 330 gzip: false, 331 #[cfg(feature = "brotli")] 332 brotli: false, 333 } 334 } 335 as_str(&self) -> Option<&'static str>336 pub(super) fn as_str(&self) -> Option<&'static str> { 337 match (self.is_gzip(), self.is_brotli()) { 338 (true, true) => Some("gzip, br"), 339 (true, false) => Some("gzip"), 340 (false, true) => Some("br"), 341 _ => None, 342 } 343 } 344 is_gzip(&self) -> bool345 fn is_gzip(&self) -> bool { 346 #[cfg(feature = "gzip")] 347 { 348 self.gzip 349 } 350 351 #[cfg(not(feature = "gzip"))] 352 { 353 false 354 } 355 } 356 is_brotli(&self) -> bool357 fn is_brotli(&self) -> bool { 358 #[cfg(feature = "brotli")] 359 { 360 self.brotli 361 } 362 363 #[cfg(not(feature = "brotli"))] 364 { 365 false 366 } 367 } 368 } 369 370 impl Default for Accepts { default() -> Accepts371 fn default() -> Accepts { 372 Accepts { 373 #[cfg(feature = "gzip")] 374 gzip: true, 375 #[cfg(feature = "brotli")] 376 brotli: true, 377 } 378 } 379 } 380