1 use std::error::Error as StdError;
2 use std::fmt;
3 use std::io;
4 use std::usize;
5 
6 use bytes::Bytes;
7 
8 use crate::common::{task, Poll};
9 
10 use super::io::MemRead;
11 use super::DecodedLength;
12 
13 use self::Kind::{Chunked, Eof, Length};
14 
15 /// Decoders to handle different Transfer-Encodings.
16 ///
17 /// If a message body does not include a Transfer-Encoding, it *should*
18 /// include a Content-Length header.
19 #[derive(Clone, PartialEq)]
20 pub(crate) struct Decoder {
21     kind: Kind,
22 }
23 
24 #[derive(Debug, Clone, Copy, PartialEq)]
25 enum Kind {
26     /// A Reader used when a Content-Length header is passed with a positive integer.
27     Length(u64),
28     /// A Reader used when Transfer-Encoding is `chunked`.
29     Chunked(ChunkedState, u64),
30     /// A Reader used for responses that don't indicate a length or chunked.
31     ///
32     /// The bool tracks when EOF is seen on the transport.
33     ///
34     /// Note: This should only used for `Response`s. It is illegal for a
35     /// `Request` to be made with both `Content-Length` and
36     /// `Transfer-Encoding: chunked` missing, as explained from the spec:
37     ///
38     /// > If a Transfer-Encoding header field is present in a response and
39     /// > the chunked transfer coding is not the final encoding, the
40     /// > message body length is determined by reading the connection until
41     /// > it is closed by the server.  If a Transfer-Encoding header field
42     /// > is present in a request and the chunked transfer coding is not
43     /// > the final encoding, the message body length cannot be determined
44     /// > reliably; the server MUST respond with the 400 (Bad Request)
45     /// > status code and then close the connection.
46     Eof(bool),
47 }
48 
49 #[derive(Debug, PartialEq, Clone, Copy)]
50 enum ChunkedState {
51     Size,
52     SizeLws,
53     Extension,
54     SizeLf,
55     Body,
56     BodyCr,
57     BodyLf,
58     Trailer,
59     TrailerLf,
60     EndCr,
61     EndLf,
62     End,
63 }
64 
65 impl Decoder {
66     // constructors
67 
length(x: u64) -> Decoder68     pub(crate) fn length(x: u64) -> Decoder {
69         Decoder {
70             kind: Kind::Length(x),
71         }
72     }
73 
chunked() -> Decoder74     pub(crate) fn chunked() -> Decoder {
75         Decoder {
76             kind: Kind::Chunked(ChunkedState::Size, 0),
77         }
78     }
79 
eof() -> Decoder80     pub(crate) fn eof() -> Decoder {
81         Decoder {
82             kind: Kind::Eof(false),
83         }
84     }
85 
new(len: DecodedLength) -> Self86     pub(super) fn new(len: DecodedLength) -> Self {
87         match len {
88             DecodedLength::CHUNKED => Decoder::chunked(),
89             DecodedLength::CLOSE_DELIMITED => Decoder::eof(),
90             length => Decoder::length(length.danger_len()),
91         }
92     }
93 
94     // methods
95 
is_eof(&self) -> bool96     pub(crate) fn is_eof(&self) -> bool {
97         matches!(self.kind, Length(0) | Chunked(ChunkedState::End, _) | Eof(true))
98     }
99 
decode<R: MemRead>( &mut self, cx: &mut task::Context<'_>, body: &mut R, ) -> Poll<Result<Bytes, io::Error>>100     pub(crate) fn decode<R: MemRead>(
101         &mut self,
102         cx: &mut task::Context<'_>,
103         body: &mut R,
104     ) -> Poll<Result<Bytes, io::Error>> {
105         trace!("decode; state={:?}", self.kind);
106         match self.kind {
107             Length(ref mut remaining) => {
108                 if *remaining == 0 {
109                     Poll::Ready(Ok(Bytes::new()))
110                 } else {
111                     let to_read = *remaining as usize;
112                     let buf = ready!(body.read_mem(cx, to_read))?;
113                     let num = buf.as_ref().len() as u64;
114                     if num > *remaining {
115                         *remaining = 0;
116                     } else if num == 0 {
117                         return Poll::Ready(Err(io::Error::new(
118                             io::ErrorKind::UnexpectedEof,
119                             IncompleteBody,
120                         )));
121                     } else {
122                         *remaining -= num;
123                     }
124                     Poll::Ready(Ok(buf))
125                 }
126             }
127             Chunked(ref mut state, ref mut size) => {
128                 loop {
129                     let mut buf = None;
130                     // advances the chunked state
131                     *state = ready!(state.step(cx, body, size, &mut buf))?;
132                     if *state == ChunkedState::End {
133                         trace!("end of chunked");
134                         return Poll::Ready(Ok(Bytes::new()));
135                     }
136                     if let Some(buf) = buf {
137                         return Poll::Ready(Ok(buf));
138                     }
139                 }
140             }
141             Eof(ref mut is_eof) => {
142                 if *is_eof {
143                     Poll::Ready(Ok(Bytes::new()))
144                 } else {
145                     // 8192 chosen because its about 2 packets, there probably
146                     // won't be that much available, so don't have MemReaders
147                     // allocate buffers to big
148                     body.read_mem(cx, 8192).map_ok(|slice| {
149                         *is_eof = slice.is_empty();
150                         slice
151                     })
152                 }
153             }
154         }
155     }
156 
157     #[cfg(test)]
decode_fut<R: MemRead>(&mut self, body: &mut R) -> Result<Bytes, io::Error>158     async fn decode_fut<R: MemRead>(&mut self, body: &mut R) -> Result<Bytes, io::Error> {
159         futures_util::future::poll_fn(move |cx| self.decode(cx, body)).await
160     }
161 }
162 
163 impl fmt::Debug for Decoder {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result164     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
165         fmt::Debug::fmt(&self.kind, f)
166     }
167 }
168 
169 macro_rules! byte (
170     ($rdr:ident, $cx:expr) => ({
171         let buf = ready!($rdr.read_mem($cx, 1))?;
172         if !buf.is_empty() {
173             buf[0]
174         } else {
175             return Poll::Ready(Err(io::Error::new(io::ErrorKind::UnexpectedEof,
176                                       "unexpected EOF during chunk size line")));
177         }
178     })
179 );
180 
181 impl ChunkedState {
step<R: MemRead>( &self, cx: &mut task::Context<'_>, body: &mut R, size: &mut u64, buf: &mut Option<Bytes>, ) -> Poll<Result<ChunkedState, io::Error>>182     fn step<R: MemRead>(
183         &self,
184         cx: &mut task::Context<'_>,
185         body: &mut R,
186         size: &mut u64,
187         buf: &mut Option<Bytes>,
188     ) -> Poll<Result<ChunkedState, io::Error>> {
189         use self::ChunkedState::*;
190         match *self {
191             Size => ChunkedState::read_size(cx, body, size),
192             SizeLws => ChunkedState::read_size_lws(cx, body),
193             Extension => ChunkedState::read_extension(cx, body),
194             SizeLf => ChunkedState::read_size_lf(cx, body, *size),
195             Body => ChunkedState::read_body(cx, body, size, buf),
196             BodyCr => ChunkedState::read_body_cr(cx, body),
197             BodyLf => ChunkedState::read_body_lf(cx, body),
198             Trailer => ChunkedState::read_trailer(cx, body),
199             TrailerLf => ChunkedState::read_trailer_lf(cx, body),
200             EndCr => ChunkedState::read_end_cr(cx, body),
201             EndLf => ChunkedState::read_end_lf(cx, body),
202             End => Poll::Ready(Ok(ChunkedState::End)),
203         }
204     }
read_size<R: MemRead>( cx: &mut task::Context<'_>, rdr: &mut R, size: &mut u64, ) -> Poll<Result<ChunkedState, io::Error>>205     fn read_size<R: MemRead>(
206         cx: &mut task::Context<'_>,
207         rdr: &mut R,
208         size: &mut u64,
209     ) -> Poll<Result<ChunkedState, io::Error>> {
210         trace!("Read chunk hex size");
211 
212         macro_rules! or_overflow {
213             ($e:expr) => (
214                 match $e {
215                     Some(val) => val,
216                     None => return Poll::Ready(Err(io::Error::new(
217                         io::ErrorKind::InvalidData,
218                         "invalid chunk size: overflow",
219                     ))),
220                 }
221             )
222         }
223 
224         let radix = 16;
225         match byte!(rdr, cx) {
226             b @ b'0'..=b'9' => {
227                 *size = or_overflow!(size.checked_mul(radix));
228                 *size = or_overflow!(size.checked_add((b - b'0') as u64));
229             }
230             b @ b'a'..=b'f' => {
231                 *size = or_overflow!(size.checked_mul(radix));
232                 *size = or_overflow!(size.checked_add((b + 10 - b'a') as u64));
233             }
234             b @ b'A'..=b'F' => {
235                 *size = or_overflow!(size.checked_mul(radix));
236                 *size = or_overflow!(size.checked_add((b + 10 - b'A') as u64));
237             }
238             b'\t' | b' ' => return Poll::Ready(Ok(ChunkedState::SizeLws)),
239             b';' => return Poll::Ready(Ok(ChunkedState::Extension)),
240             b'\r' => return Poll::Ready(Ok(ChunkedState::SizeLf)),
241             _ => {
242                 return Poll::Ready(Err(io::Error::new(
243                     io::ErrorKind::InvalidInput,
244                     "Invalid chunk size line: Invalid Size",
245                 )));
246             }
247         }
248         Poll::Ready(Ok(ChunkedState::Size))
249     }
read_size_lws<R: MemRead>( cx: &mut task::Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>250     fn read_size_lws<R: MemRead>(
251         cx: &mut task::Context<'_>,
252         rdr: &mut R,
253     ) -> Poll<Result<ChunkedState, io::Error>> {
254         trace!("read_size_lws");
255         match byte!(rdr, cx) {
256             // LWS can follow the chunk size, but no more digits can come
257             b'\t' | b' ' => Poll::Ready(Ok(ChunkedState::SizeLws)),
258             b';' => Poll::Ready(Ok(ChunkedState::Extension)),
259             b'\r' => Poll::Ready(Ok(ChunkedState::SizeLf)),
260             _ => Poll::Ready(Err(io::Error::new(
261                 io::ErrorKind::InvalidInput,
262                 "Invalid chunk size linear white space",
263             ))),
264         }
265     }
read_extension<R: MemRead>( cx: &mut task::Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>266     fn read_extension<R: MemRead>(
267         cx: &mut task::Context<'_>,
268         rdr: &mut R,
269     ) -> Poll<Result<ChunkedState, io::Error>> {
270         trace!("read_extension");
271         // We don't care about extensions really at all. Just ignore them.
272         // They "end" at the next CRLF.
273         //
274         // However, some implementations may not check for the CR, so to save
275         // them from themselves, we reject extensions containing plain LF as
276         // well.
277         match byte!(rdr, cx) {
278             b'\r' => Poll::Ready(Ok(ChunkedState::SizeLf)),
279             b'\n' => Poll::Ready(Err(io::Error::new(
280                 io::ErrorKind::InvalidData,
281                 "invalid chunk extension contains newline",
282             ))),
283             _ => Poll::Ready(Ok(ChunkedState::Extension)), // no supported extensions
284         }
285     }
read_size_lf<R: MemRead>( cx: &mut task::Context<'_>, rdr: &mut R, size: u64, ) -> Poll<Result<ChunkedState, io::Error>>286     fn read_size_lf<R: MemRead>(
287         cx: &mut task::Context<'_>,
288         rdr: &mut R,
289         size: u64,
290     ) -> Poll<Result<ChunkedState, io::Error>> {
291         trace!("Chunk size is {:?}", size);
292         match byte!(rdr, cx) {
293             b'\n' => {
294                 if size == 0 {
295                     Poll::Ready(Ok(ChunkedState::EndCr))
296                 } else {
297                     debug!("incoming chunked header: {0:#X} ({0} bytes)", size);
298                     Poll::Ready(Ok(ChunkedState::Body))
299                 }
300             }
301             _ => Poll::Ready(Err(io::Error::new(
302                 io::ErrorKind::InvalidInput,
303                 "Invalid chunk size LF",
304             ))),
305         }
306     }
307 
read_body<R: MemRead>( cx: &mut task::Context<'_>, rdr: &mut R, rem: &mut u64, buf: &mut Option<Bytes>, ) -> Poll<Result<ChunkedState, io::Error>>308     fn read_body<R: MemRead>(
309         cx: &mut task::Context<'_>,
310         rdr: &mut R,
311         rem: &mut u64,
312         buf: &mut Option<Bytes>,
313     ) -> Poll<Result<ChunkedState, io::Error>> {
314         trace!("Chunked read, remaining={:?}", rem);
315 
316         // cap remaining bytes at the max capacity of usize
317         let rem_cap = match *rem {
318             r if r > usize::MAX as u64 => usize::MAX,
319             r => r as usize,
320         };
321 
322         let to_read = rem_cap;
323         let slice = ready!(rdr.read_mem(cx, to_read))?;
324         let count = slice.len();
325 
326         if count == 0 {
327             *rem = 0;
328             return Poll::Ready(Err(io::Error::new(
329                 io::ErrorKind::UnexpectedEof,
330                 IncompleteBody,
331             )));
332         }
333         *buf = Some(slice);
334         *rem -= count as u64;
335 
336         if *rem > 0 {
337             Poll::Ready(Ok(ChunkedState::Body))
338         } else {
339             Poll::Ready(Ok(ChunkedState::BodyCr))
340         }
341     }
read_body_cr<R: MemRead>( cx: &mut task::Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>342     fn read_body_cr<R: MemRead>(
343         cx: &mut task::Context<'_>,
344         rdr: &mut R,
345     ) -> Poll<Result<ChunkedState, io::Error>> {
346         match byte!(rdr, cx) {
347             b'\r' => Poll::Ready(Ok(ChunkedState::BodyLf)),
348             _ => Poll::Ready(Err(io::Error::new(
349                 io::ErrorKind::InvalidInput,
350                 "Invalid chunk body CR",
351             ))),
352         }
353     }
read_body_lf<R: MemRead>( cx: &mut task::Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>354     fn read_body_lf<R: MemRead>(
355         cx: &mut task::Context<'_>,
356         rdr: &mut R,
357     ) -> Poll<Result<ChunkedState, io::Error>> {
358         match byte!(rdr, cx) {
359             b'\n' => Poll::Ready(Ok(ChunkedState::Size)),
360             _ => Poll::Ready(Err(io::Error::new(
361                 io::ErrorKind::InvalidInput,
362                 "Invalid chunk body LF",
363             ))),
364         }
365     }
366 
read_trailer<R: MemRead>( cx: &mut task::Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>367     fn read_trailer<R: MemRead>(
368         cx: &mut task::Context<'_>,
369         rdr: &mut R,
370     ) -> Poll<Result<ChunkedState, io::Error>> {
371         trace!("read_trailer");
372         match byte!(rdr, cx) {
373             b'\r' => Poll::Ready(Ok(ChunkedState::TrailerLf)),
374             _ => Poll::Ready(Ok(ChunkedState::Trailer)),
375         }
376     }
read_trailer_lf<R: MemRead>( cx: &mut task::Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>377     fn read_trailer_lf<R: MemRead>(
378         cx: &mut task::Context<'_>,
379         rdr: &mut R,
380     ) -> Poll<Result<ChunkedState, io::Error>> {
381         match byte!(rdr, cx) {
382             b'\n' => Poll::Ready(Ok(ChunkedState::EndCr)),
383             _ => Poll::Ready(Err(io::Error::new(
384                 io::ErrorKind::InvalidInput,
385                 "Invalid trailer end LF",
386             ))),
387         }
388     }
389 
read_end_cr<R: MemRead>( cx: &mut task::Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>390     fn read_end_cr<R: MemRead>(
391         cx: &mut task::Context<'_>,
392         rdr: &mut R,
393     ) -> Poll<Result<ChunkedState, io::Error>> {
394         match byte!(rdr, cx) {
395             b'\r' => Poll::Ready(Ok(ChunkedState::EndLf)),
396             _ => Poll::Ready(Ok(ChunkedState::Trailer)),
397         }
398     }
read_end_lf<R: MemRead>( cx: &mut task::Context<'_>, rdr: &mut R, ) -> Poll<Result<ChunkedState, io::Error>>399     fn read_end_lf<R: MemRead>(
400         cx: &mut task::Context<'_>,
401         rdr: &mut R,
402     ) -> Poll<Result<ChunkedState, io::Error>> {
403         match byte!(rdr, cx) {
404             b'\n' => Poll::Ready(Ok(ChunkedState::End)),
405             _ => Poll::Ready(Err(io::Error::new(
406                 io::ErrorKind::InvalidInput,
407                 "Invalid chunk end LF",
408             ))),
409         }
410     }
411 }
412 
413 #[derive(Debug)]
414 struct IncompleteBody;
415 
416 impl fmt::Display for IncompleteBody {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result417     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
418         write!(f, "end of file before message length reached")
419     }
420 }
421 
422 impl StdError for IncompleteBody {}
423 
424 #[cfg(test)]
425 mod tests {
426     use super::*;
427     use std::pin::Pin;
428     use std::time::Duration;
429     use tokio::io::{AsyncRead, ReadBuf};
430 
431     impl<'a> MemRead for &'a [u8] {
read_mem(&mut self, _: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>>432         fn read_mem(&mut self, _: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
433             let n = std::cmp::min(len, self.len());
434             if n > 0 {
435                 let (a, b) = self.split_at(n);
436                 let buf = Bytes::copy_from_slice(a);
437                 *self = b;
438                 Poll::Ready(Ok(buf))
439             } else {
440                 Poll::Ready(Ok(Bytes::new()))
441             }
442         }
443     }
444 
445     impl<'a> MemRead for &'a mut (dyn AsyncRead + Unpin) {
read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>>446         fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
447             let mut v = vec![0; len];
448             let mut buf = ReadBuf::new(&mut v);
449             ready!(Pin::new(self).poll_read(cx, &mut buf)?);
450             Poll::Ready(Ok(Bytes::copy_from_slice(&buf.filled())))
451         }
452     }
453 
454     #[cfg(feature = "nightly")]
455     impl MemRead for Bytes {
read_mem(&mut self, _: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>>456         fn read_mem(&mut self, _: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
457             let n = std::cmp::min(len, self.len());
458             let ret = self.split_to(n);
459             Poll::Ready(Ok(ret))
460         }
461     }
462 
463     /*
464     use std::io;
465     use std::io::Write;
466     use super::Decoder;
467     use super::ChunkedState;
468     use futures::{Async, Poll};
469     use bytes::{BytesMut, Bytes};
470     use crate::mock::AsyncIo;
471     */
472 
473     #[tokio::test]
test_read_chunk_size()474     async fn test_read_chunk_size() {
475         use std::io::ErrorKind::{InvalidData, InvalidInput, UnexpectedEof};
476 
477         async fn read(s: &str) -> u64 {
478             let mut state = ChunkedState::Size;
479             let rdr = &mut s.as_bytes();
480             let mut size = 0;
481             loop {
482                 let result =
483                     futures_util::future::poll_fn(|cx| state.step(cx, rdr, &mut size, &mut None))
484                         .await;
485                 let desc = format!("read_size failed for {:?}", s);
486                 state = result.expect(desc.as_str());
487                 if state == ChunkedState::Body || state == ChunkedState::EndCr {
488                     break;
489                 }
490             }
491             size
492         }
493 
494         async fn read_err(s: &str, expected_err: io::ErrorKind) {
495             let mut state = ChunkedState::Size;
496             let rdr = &mut s.as_bytes();
497             let mut size = 0;
498             loop {
499                 let result =
500                     futures_util::future::poll_fn(|cx| state.step(cx, rdr, &mut size, &mut None))
501                         .await;
502                 state = match result {
503                     Ok(s) => s,
504                     Err(e) => {
505                         assert!(
506                             expected_err == e.kind(),
507                             "Reading {:?}, expected {:?}, but got {:?}",
508                             s,
509                             expected_err,
510                             e.kind()
511                         );
512                         return;
513                     }
514                 };
515                 if state == ChunkedState::Body || state == ChunkedState::End {
516                     panic!("Was Ok. Expected Err for {:?}", s);
517                 }
518             }
519         }
520 
521         assert_eq!(1, read("1\r\n").await);
522         assert_eq!(1, read("01\r\n").await);
523         assert_eq!(0, read("0\r\n").await);
524         assert_eq!(0, read("00\r\n").await);
525         assert_eq!(10, read("A\r\n").await);
526         assert_eq!(10, read("a\r\n").await);
527         assert_eq!(255, read("Ff\r\n").await);
528         assert_eq!(255, read("Ff   \r\n").await);
529         // Missing LF or CRLF
530         read_err("F\rF", InvalidInput).await;
531         read_err("F", UnexpectedEof).await;
532         // Invalid hex digit
533         read_err("X\r\n", InvalidInput).await;
534         read_err("1X\r\n", InvalidInput).await;
535         read_err("-\r\n", InvalidInput).await;
536         read_err("-1\r\n", InvalidInput).await;
537         // Acceptable (if not fully valid) extensions do not influence the size
538         assert_eq!(1, read("1;extension\r\n").await);
539         assert_eq!(10, read("a;ext name=value\r\n").await);
540         assert_eq!(1, read("1;extension;extension2\r\n").await);
541         assert_eq!(1, read("1;;;  ;\r\n").await);
542         assert_eq!(2, read("2; extension...\r\n").await);
543         assert_eq!(3, read("3   ; extension=123\r\n").await);
544         assert_eq!(3, read("3   ;\r\n").await);
545         assert_eq!(3, read("3   ;   \r\n").await);
546         // Invalid extensions cause an error
547         read_err("1 invalid extension\r\n", InvalidInput).await;
548         read_err("1 A\r\n", InvalidInput).await;
549         read_err("1;no CRLF", UnexpectedEof).await;
550         read_err("1;reject\nnewlines\r\n", InvalidData).await;
551         // Overflow
552         read_err("f0000000000000003\r\n", InvalidData).await;
553     }
554 
555     #[tokio::test]
test_read_sized_early_eof()556     async fn test_read_sized_early_eof() {
557         let mut bytes = &b"foo bar"[..];
558         let mut decoder = Decoder::length(10);
559         assert_eq!(decoder.decode_fut(&mut bytes).await.unwrap().len(), 7);
560         let e = decoder.decode_fut(&mut bytes).await.unwrap_err();
561         assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
562     }
563 
564     #[tokio::test]
test_read_chunked_early_eof()565     async fn test_read_chunked_early_eof() {
566         let mut bytes = &b"\
567             9\r\n\
568             foo bar\
569         "[..];
570         let mut decoder = Decoder::chunked();
571         assert_eq!(decoder.decode_fut(&mut bytes).await.unwrap().len(), 7);
572         let e = decoder.decode_fut(&mut bytes).await.unwrap_err();
573         assert_eq!(e.kind(), io::ErrorKind::UnexpectedEof);
574     }
575 
576     #[tokio::test]
test_read_chunked_single_read()577     async fn test_read_chunked_single_read() {
578         let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n"[..];
579         let buf = Decoder::chunked()
580             .decode_fut(&mut mock_buf)
581             .await
582             .expect("decode");
583         assert_eq!(16, buf.len());
584         let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String");
585         assert_eq!("1234567890abcdef", &result);
586     }
587 
588     #[tokio::test]
test_read_chunked_trailer_with_missing_lf()589     async fn test_read_chunked_trailer_with_missing_lf() {
590         let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\nbad\r\r\n"[..];
591         let mut decoder = Decoder::chunked();
592         decoder.decode_fut(&mut mock_buf).await.expect("decode");
593         let e = decoder.decode_fut(&mut mock_buf).await.unwrap_err();
594         assert_eq!(e.kind(), io::ErrorKind::InvalidInput);
595     }
596 
597     #[tokio::test]
test_read_chunked_after_eof()598     async fn test_read_chunked_after_eof() {
599         let mut mock_buf = &b"10\r\n1234567890abcdef\r\n0\r\n\r\n"[..];
600         let mut decoder = Decoder::chunked();
601 
602         // normal read
603         let buf = decoder.decode_fut(&mut mock_buf).await.unwrap();
604         assert_eq!(16, buf.len());
605         let result = String::from_utf8(buf.as_ref().to_vec()).expect("decode String");
606         assert_eq!("1234567890abcdef", &result);
607 
608         // eof read
609         let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode");
610         assert_eq!(0, buf.len());
611 
612         // ensure read after eof also returns eof
613         let buf = decoder.decode_fut(&mut mock_buf).await.expect("decode");
614         assert_eq!(0, buf.len());
615     }
616 
617     // perform an async read using a custom buffer size and causing a blocking
618     // read at the specified byte
read_async(mut decoder: Decoder, content: &[u8], block_at: usize) -> String619     async fn read_async(mut decoder: Decoder, content: &[u8], block_at: usize) -> String {
620         let mut outs = Vec::new();
621 
622         let mut ins = if block_at == 0 {
623             tokio_test::io::Builder::new()
624                 .wait(Duration::from_millis(10))
625                 .read(content)
626                 .build()
627         } else {
628             tokio_test::io::Builder::new()
629                 .read(&content[..block_at])
630                 .wait(Duration::from_millis(10))
631                 .read(&content[block_at..])
632                 .build()
633         };
634 
635         let mut ins = &mut ins as &mut (dyn AsyncRead + Unpin);
636 
637         loop {
638             let buf = decoder
639                 .decode_fut(&mut ins)
640                 .await
641                 .expect("unexpected decode error");
642             if buf.is_empty() {
643                 break; // eof
644             }
645             outs.extend(buf.as_ref());
646         }
647 
648         String::from_utf8(outs).expect("decode String")
649     }
650 
651     // iterate over the different ways that this async read could go.
652     // tests blocking a read at each byte along the content - The shotgun approach
all_async_cases(content: &str, expected: &str, decoder: Decoder)653     async fn all_async_cases(content: &str, expected: &str, decoder: Decoder) {
654         let content_len = content.len();
655         for block_at in 0..content_len {
656             let actual = read_async(decoder.clone(), content.as_bytes(), block_at).await;
657             assert_eq!(expected, &actual) //, "Failed async. Blocking at {}", block_at);
658         }
659     }
660 
661     #[tokio::test]
test_read_length_async()662     async fn test_read_length_async() {
663         let content = "foobar";
664         all_async_cases(content, content, Decoder::length(content.len() as u64)).await;
665     }
666 
667     #[tokio::test]
test_read_chunked_async()668     async fn test_read_chunked_async() {
669         let content = "3\r\nfoo\r\n3\r\nbar\r\n0\r\n\r\n";
670         let expected = "foobar";
671         all_async_cases(content, expected, Decoder::chunked()).await;
672     }
673 
674     #[tokio::test]
test_read_eof_async()675     async fn test_read_eof_async() {
676         let content = "foobar";
677         all_async_cases(content, content, Decoder::eof()).await;
678     }
679 
680     #[cfg(feature = "nightly")]
681     #[bench]
bench_decode_chunked_1kb(b: &mut test::Bencher)682     fn bench_decode_chunked_1kb(b: &mut test::Bencher) {
683         let rt = new_runtime();
684 
685         const LEN: usize = 1024;
686         let mut vec = Vec::new();
687         vec.extend(format!("{:x}\r\n", LEN).as_bytes());
688         vec.extend(&[0; LEN][..]);
689         vec.extend(b"\r\n");
690         let content = Bytes::from(vec);
691 
692         b.bytes = LEN as u64;
693 
694         b.iter(|| {
695             let mut decoder = Decoder::chunked();
696             rt.block_on(async {
697                 let mut raw = content.clone();
698                 let chunk = decoder.decode_fut(&mut raw).await.unwrap();
699                 assert_eq!(chunk.len(), LEN);
700             });
701         });
702     }
703 
704     #[cfg(feature = "nightly")]
705     #[bench]
bench_decode_length_1kb(b: &mut test::Bencher)706     fn bench_decode_length_1kb(b: &mut test::Bencher) {
707         let rt = new_runtime();
708 
709         const LEN: usize = 1024;
710         let content = Bytes::from(&[0; LEN][..]);
711         b.bytes = LEN as u64;
712 
713         b.iter(|| {
714             let mut decoder = Decoder::length(LEN as u64);
715             rt.block_on(async {
716                 let mut raw = content.clone();
717                 let chunk = decoder.decode_fut(&mut raw).await.unwrap();
718                 assert_eq!(chunk.len(), LEN);
719             });
720         });
721     }
722 
723     #[cfg(feature = "nightly")]
new_runtime() -> tokio::runtime::Runtime724     fn new_runtime() -> tokio::runtime::Runtime {
725         tokio::runtime::Builder::new_current_thread()
726             .enable_all()
727             .build()
728             .expect("rt build")
729     }
730 }
731