1 use std::fmt; 2 use std::future::Future; 3 use std::pin::Pin; 4 use std::task::{Context, Poll}; 5 6 #[cfg(feature = "gzip")] 7 use async_compression::tokio::bufread::GzipDecoder; 8 9 #[cfg(feature = "brotli")] 10 use async_compression::tokio::bufread::BrotliDecoder; 11 12 #[cfg(feature = "deflate")] 13 use async_compression::tokio::bufread::ZlibDecoder; 14 15 use bytes::Bytes; 16 use futures_core::Stream; 17 use futures_util::stream::Peekable; 18 use http::HeaderMap; 19 use hyper::body::HttpBody; 20 21 #[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))] 22 use tokio_util::codec::{BytesCodec, FramedRead}; 23 #[cfg(any(feature = "gzip", feature = "brotli", feature = "deflate"))] 24 use tokio_util::io::StreamReader; 25 26 use super::super::Body; 27 use crate::error; 28 29 #[derive(Clone, Copy, Debug)] 30 pub(super) struct Accepts { 31 #[cfg(feature = "gzip")] 32 pub(super) gzip: bool, 33 #[cfg(feature = "brotli")] 34 pub(super) brotli: bool, 35 #[cfg(feature = "deflate")] 36 pub(super) deflate: bool, 37 } 38 39 /// A response decompressor over a non-blocking stream of chunks. 40 /// 41 /// The inner decoder may be constructed asynchronously. 42 pub(crate) struct Decoder { 43 inner: Inner, 44 } 45 46 enum Inner { 47 /// A `PlainText` decoder just returns the response content as is. 48 PlainText(super::body::ImplStream), 49 50 /// A `Gzip` decoder will uncompress the gzipped response content before returning it. 51 #[cfg(feature = "gzip")] 52 Gzip(FramedRead<GzipDecoder<StreamReader<Peekable<IoStream>, Bytes>>, BytesCodec>), 53 54 /// A `Brotli` decoder will uncompress the brotlied response content before returning it. 55 #[cfg(feature = "brotli")] 56 Brotli(FramedRead<BrotliDecoder<StreamReader<Peekable<IoStream>, Bytes>>, BytesCodec>), 57 58 /// A `Deflate` decoder will uncompress the deflated response content before returning it. 59 #[cfg(feature = "deflate")] 60 Deflate(FramedRead<ZlibDecoder<StreamReader<Peekable<IoStream>, Bytes>>, BytesCodec>), 61 62 /// A decoder that doesn't have a value yet. 63 #[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))] 64 Pending(Pending), 65 } 66 67 /// A future attempt to poll the response body for EOF so we know whether to use gzip or not. 68 struct Pending(Peekable<IoStream>, DecoderType); 69 70 struct IoStream(super::body::ImplStream); 71 72 enum DecoderType { 73 #[cfg(feature = "gzip")] 74 Gzip, 75 #[cfg(feature = "brotli")] 76 Brotli, 77 #[cfg(feature = "deflate")] 78 Deflate, 79 } 80 81 impl fmt::Debug for Decoder { fmt(&self, f: &mut fmt::Formatter) -> fmt::Result82 fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { 83 f.debug_struct("Decoder").finish() 84 } 85 } 86 87 impl Decoder { 88 #[cfg(feature = "blocking")] empty() -> Decoder89 pub(crate) fn empty() -> Decoder { 90 Decoder { 91 inner: Inner::PlainText(Body::empty().into_stream()), 92 } 93 } 94 95 /// A plain text decoder. 96 /// 97 /// This decoder will emit the underlying chunks as-is. plain_text(body: Body) -> Decoder98 fn plain_text(body: Body) -> Decoder { 99 Decoder { 100 inner: Inner::PlainText(body.into_stream()), 101 } 102 } 103 104 /// A gzip decoder. 105 /// 106 /// This decoder will buffer and decompress chunks that are gzipped. 107 #[cfg(feature = "gzip")] gzip(body: Body) -> Decoder108 fn gzip(body: Body) -> Decoder { 109 use futures_util::StreamExt; 110 111 Decoder { 112 inner: Inner::Pending(Pending( 113 IoStream(body.into_stream()).peekable(), 114 DecoderType::Gzip, 115 )), 116 } 117 } 118 119 /// A brotli decoder. 120 /// 121 /// This decoder will buffer and decompress chunks that are brotlied. 122 #[cfg(feature = "brotli")] brotli(body: Body) -> Decoder123 fn brotli(body: Body) -> Decoder { 124 use futures_util::StreamExt; 125 126 Decoder { 127 inner: Inner::Pending(Pending( 128 IoStream(body.into_stream()).peekable(), 129 DecoderType::Brotli, 130 )), 131 } 132 } 133 134 /// A deflate decoder. 135 /// 136 /// This decoder will buffer and decompress chunks that are deflated. 137 #[cfg(feature = "deflate")] deflate(body: Body) -> Decoder138 fn deflate(body: Body) -> Decoder { 139 use futures_util::StreamExt; 140 141 Decoder { 142 inner: Inner::Pending(Pending( 143 IoStream(body.into_stream()).peekable(), 144 DecoderType::Deflate, 145 )), 146 } 147 } 148 149 #[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))] detect_encoding(headers: &mut HeaderMap, encoding_str: &str) -> bool150 fn detect_encoding(headers: &mut HeaderMap, encoding_str: &str) -> bool { 151 use http::header::{CONTENT_ENCODING, CONTENT_LENGTH, TRANSFER_ENCODING}; 152 use log::warn; 153 154 let mut is_content_encoded = { 155 headers 156 .get_all(CONTENT_ENCODING) 157 .iter() 158 .any(|enc| enc == encoding_str) 159 || headers 160 .get_all(TRANSFER_ENCODING) 161 .iter() 162 .any(|enc| enc == encoding_str) 163 }; 164 if is_content_encoded { 165 if let Some(content_length) = headers.get(CONTENT_LENGTH) { 166 if content_length == "0" { 167 warn!("{} response with content-length of 0", encoding_str); 168 is_content_encoded = false; 169 } 170 } 171 } 172 if is_content_encoded { 173 headers.remove(CONTENT_ENCODING); 174 headers.remove(CONTENT_LENGTH); 175 } 176 is_content_encoded 177 } 178 179 /// Constructs a Decoder from a hyper request. 180 /// 181 /// A decoder is just a wrapper around the hyper request that knows 182 /// how to decode the content body of the request. 183 /// 184 /// Uses the correct variant by inspecting the Content-Encoding header. detect(_headers: &mut HeaderMap, body: Body, _accepts: Accepts) -> Decoder185 pub(super) fn detect(_headers: &mut HeaderMap, body: Body, _accepts: Accepts) -> Decoder { 186 #[cfg(feature = "gzip")] 187 { 188 if _accepts.gzip && Decoder::detect_encoding(_headers, "gzip") { 189 return Decoder::gzip(body); 190 } 191 } 192 193 #[cfg(feature = "brotli")] 194 { 195 if _accepts.brotli && Decoder::detect_encoding(_headers, "br") { 196 return Decoder::brotli(body); 197 } 198 } 199 200 #[cfg(feature = "deflate")] 201 { 202 if _accepts.deflate && Decoder::detect_encoding(_headers, "deflate") { 203 return Decoder::deflate(body); 204 } 205 } 206 207 Decoder::plain_text(body) 208 } 209 } 210 211 impl Stream for Decoder { 212 type Item = Result<Bytes, error::Error>; 213 poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>214 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { 215 // Do a read or poll for a pending decoder value. 216 match self.inner { 217 #[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))] 218 Inner::Pending(ref mut future) => match Pin::new(future).poll(cx) { 219 Poll::Ready(Ok(inner)) => { 220 self.inner = inner; 221 return self.poll_next(cx); 222 } 223 Poll::Ready(Err(e)) => { 224 return Poll::Ready(Some(Err(crate::error::decode_io(e)))); 225 } 226 Poll::Pending => return Poll::Pending, 227 }, 228 Inner::PlainText(ref mut body) => Pin::new(body).poll_next(cx), 229 #[cfg(feature = "gzip")] 230 Inner::Gzip(ref mut decoder) => { 231 return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { 232 Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))), 233 Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), 234 None => Poll::Ready(None), 235 }; 236 } 237 #[cfg(feature = "brotli")] 238 Inner::Brotli(ref mut decoder) => { 239 return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { 240 Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))), 241 Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), 242 None => Poll::Ready(None), 243 }; 244 } 245 #[cfg(feature = "deflate")] 246 Inner::Deflate(ref mut decoder) => { 247 return match futures_core::ready!(Pin::new(decoder).poll_next(cx)) { 248 Some(Ok(bytes)) => Poll::Ready(Some(Ok(bytes.freeze()))), 249 Some(Err(err)) => Poll::Ready(Some(Err(crate::error::decode_io(err)))), 250 None => Poll::Ready(None), 251 }; 252 } 253 } 254 } 255 } 256 257 impl HttpBody for Decoder { 258 type Data = Bytes; 259 type Error = crate::Error; 260 poll_data( self: Pin<&mut Self>, cx: &mut Context, ) -> Poll<Option<Result<Self::Data, Self::Error>>>261 fn poll_data( 262 self: Pin<&mut Self>, 263 cx: &mut Context, 264 ) -> Poll<Option<Result<Self::Data, Self::Error>>> { 265 self.poll_next(cx) 266 } 267 poll_trailers( self: Pin<&mut Self>, _cx: &mut Context, ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>>268 fn poll_trailers( 269 self: Pin<&mut Self>, 270 _cx: &mut Context, 271 ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> { 272 Poll::Ready(Ok(None)) 273 } 274 size_hint(&self) -> http_body::SizeHint275 fn size_hint(&self) -> http_body::SizeHint { 276 match self.inner { 277 Inner::PlainText(ref body) => HttpBody::size_hint(body), 278 // the rest are "unknown", so default 279 #[cfg(any(feature = "brotli", feature = "gzip", feature = "deflate"))] 280 _ => http_body::SizeHint::default(), 281 } 282 } 283 } 284 285 impl Future for Pending { 286 type Output = Result<Inner, std::io::Error>; 287 poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>288 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { 289 use futures_util::StreamExt; 290 291 match futures_core::ready!(Pin::new(&mut self.0).poll_peek(cx)) { 292 Some(Ok(_)) => { 293 // fallthrough 294 } 295 Some(Err(_e)) => { 296 // error was just a ref, so we need to really poll to move it 297 return Poll::Ready(Err(futures_core::ready!( 298 Pin::new(&mut self.0).poll_next(cx) 299 ) 300 .expect("just peeked Some") 301 .unwrap_err())); 302 } 303 None => return Poll::Ready(Ok(Inner::PlainText(Body::empty().into_stream()))), 304 }; 305 306 let _body = std::mem::replace( 307 &mut self.0, 308 IoStream(Body::empty().into_stream()).peekable(), 309 ); 310 311 match self.1 { 312 #[cfg(feature = "brotli")] 313 DecoderType::Brotli => Poll::Ready(Ok(Inner::Brotli(FramedRead::new( 314 BrotliDecoder::new(StreamReader::new(_body)), 315 BytesCodec::new(), 316 )))), 317 #[cfg(feature = "gzip")] 318 DecoderType::Gzip => Poll::Ready(Ok(Inner::Gzip(FramedRead::new( 319 GzipDecoder::new(StreamReader::new(_body)), 320 BytesCodec::new(), 321 )))), 322 #[cfg(feature = "deflate")] 323 DecoderType::Deflate => Poll::Ready(Ok(Inner::Deflate(FramedRead::new( 324 ZlibDecoder::new(StreamReader::new(_body)), 325 BytesCodec::new(), 326 )))), 327 } 328 } 329 } 330 331 impl Stream for IoStream { 332 type Item = Result<Bytes, std::io::Error>; 333 poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>>334 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> { 335 match futures_core::ready!(Pin::new(&mut self.0).poll_next(cx)) { 336 Some(Ok(chunk)) => Poll::Ready(Some(Ok(chunk))), 337 Some(Err(err)) => Poll::Ready(Some(Err(err.into_io()))), 338 None => Poll::Ready(None), 339 } 340 } 341 } 342 343 // ===== impl Accepts ===== 344 345 impl Accepts { none() -> Self346 pub(super) fn none() -> Self { 347 Accepts { 348 #[cfg(feature = "gzip")] 349 gzip: false, 350 #[cfg(feature = "brotli")] 351 brotli: false, 352 #[cfg(feature = "deflate")] 353 deflate: false, 354 } 355 } 356 as_str(&self) -> Option<&'static str>357 pub(super) fn as_str(&self) -> Option<&'static str> { 358 match (self.is_gzip(), self.is_brotli(), self.is_deflate()) { 359 (true, true, true) => Some("gzip, br, deflate"), 360 (true, true, false) => Some("gzip, br"), 361 (true, false, true) => Some("gzip, deflate"), 362 (false, true, true) => Some("br, deflate"), 363 (true, false, false) => Some("gzip"), 364 (false, true, false) => Some("br"), 365 (false, false, true) => Some("deflate"), 366 (false, false, false) => None, 367 } 368 } 369 is_gzip(&self) -> bool370 fn is_gzip(&self) -> bool { 371 #[cfg(feature = "gzip")] 372 { 373 self.gzip 374 } 375 376 #[cfg(not(feature = "gzip"))] 377 { 378 false 379 } 380 } 381 is_brotli(&self) -> bool382 fn is_brotli(&self) -> bool { 383 #[cfg(feature = "brotli")] 384 { 385 self.brotli 386 } 387 388 #[cfg(not(feature = "brotli"))] 389 { 390 false 391 } 392 } 393 is_deflate(&self) -> bool394 fn is_deflate(&self) -> bool { 395 #[cfg(feature = "deflate")] 396 { 397 self.deflate 398 } 399 400 #[cfg(not(feature = "deflate"))] 401 { 402 false 403 } 404 } 405 } 406 407 impl Default for Accepts { default() -> Accepts408 fn default() -> Accepts { 409 Accepts { 410 #[cfg(feature = "gzip")] 411 gzip: true, 412 #[cfg(feature = "brotli")] 413 brotli: true, 414 #[cfg(feature = "deflate")] 415 deflate: true, 416 } 417 } 418 } 419