1 use std::cell::Cell;
2 use std::cmp;
3 use std::fmt;
4 use std::io::{self, IoSlice};
5 
6 use bytes::{Buf, BufMut, Bytes, BytesMut};
7 use tokio::io::{AsyncRead, AsyncWrite};
8 
9 use super::{Http1Transaction, ParseContext, ParsedMessage};
10 use crate::common::buf::BufList;
11 use crate::common::{task, Pin, Poll, Unpin};
12 
13 /// The initial buffer size allocated before trying to read from IO.
14 pub(crate) const INIT_BUFFER_SIZE: usize = 8192;
15 
16 /// The minimum value that can be set to max buffer size.
17 pub const MINIMUM_MAX_BUFFER_SIZE: usize = INIT_BUFFER_SIZE;
18 
19 /// The default maximum read buffer size. If the buffer gets this big and
20 /// a message is still not complete, a `TooLarge` error is triggered.
21 // Note: if this changes, update server::conn::Http::max_buf_size docs.
22 pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
23 
24 /// The maximum number of distinct `Buf`s to hold in a list before requiring
25 /// a flush. Only affects when the buffer strategy is to queue buffers.
26 ///
27 /// Note that a flush can happen before reaching the maximum. This simply
28 /// forces a flush if the queue gets this big.
29 const MAX_BUF_LIST_BUFFERS: usize = 16;
30 
31 pub struct Buffered<T, B> {
32     flush_pipeline: bool,
33     io: T,
34     read_blocked: bool,
35     read_buf: BytesMut,
36     read_buf_strategy: ReadStrategy,
37     write_buf: WriteBuf<B>,
38 }
39 
40 impl<T, B> fmt::Debug for Buffered<T, B>
41 where
42     B: Buf,
43 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result44     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45         f.debug_struct("Buffered")
46             .field("read_buf", &self.read_buf)
47             .field("write_buf", &self.write_buf)
48             .finish()
49     }
50 }
51 
52 impl<T, B> Buffered<T, B>
53 where
54     T: AsyncRead + AsyncWrite + Unpin,
55     B: Buf,
56 {
new(io: T) -> Buffered<T, B>57     pub fn new(io: T) -> Buffered<T, B> {
58         Buffered {
59             flush_pipeline: false,
60             io,
61             read_blocked: false,
62             read_buf: BytesMut::with_capacity(0),
63             read_buf_strategy: ReadStrategy::default(),
64             write_buf: WriteBuf::new(),
65         }
66     }
67 
set_flush_pipeline(&mut self, enabled: bool)68     pub fn set_flush_pipeline(&mut self, enabled: bool) {
69         debug_assert!(!self.write_buf.has_remaining());
70         self.flush_pipeline = enabled;
71         if enabled {
72             self.set_write_strategy_flatten();
73         }
74     }
75 
set_max_buf_size(&mut self, max: usize)76     pub fn set_max_buf_size(&mut self, max: usize) {
77         assert!(
78             max >= MINIMUM_MAX_BUFFER_SIZE,
79             "The max_buf_size cannot be smaller than {}.",
80             MINIMUM_MAX_BUFFER_SIZE,
81         );
82         self.read_buf_strategy = ReadStrategy::with_max(max);
83         self.write_buf.max_buf_size = max;
84     }
85 
set_read_buf_exact_size(&mut self, sz: usize)86     pub fn set_read_buf_exact_size(&mut self, sz: usize) {
87         self.read_buf_strategy = ReadStrategy::Exact(sz);
88     }
89 
set_write_strategy_flatten(&mut self)90     pub fn set_write_strategy_flatten(&mut self) {
91         // this should always be called only at construction time,
92         // so this assert is here to catch myself
93         debug_assert!(self.write_buf.queue.bufs_cnt() == 0);
94         self.write_buf.set_strategy(WriteStrategy::Flatten);
95     }
96 
read_buf(&self) -> &[u8]97     pub fn read_buf(&self) -> &[u8] {
98         self.read_buf.as_ref()
99     }
100 
101     #[cfg(test)]
102     #[cfg(feature = "nightly")]
read_buf_mut(&mut self) -> &mut BytesMut103     pub(super) fn read_buf_mut(&mut self) -> &mut BytesMut {
104         &mut self.read_buf
105     }
106 
107     /// Return the "allocated" available space, not the potential space
108     /// that could be allocated in the future.
read_buf_remaining_mut(&self) -> usize109     fn read_buf_remaining_mut(&self) -> usize {
110         self.read_buf.capacity() - self.read_buf.len()
111     }
112 
headers_buf(&mut self) -> &mut Vec<u8>113     pub fn headers_buf(&mut self) -> &mut Vec<u8> {
114         let buf = self.write_buf.headers_mut();
115         &mut buf.bytes
116     }
117 
write_buf(&mut self) -> &mut WriteBuf<B>118     pub(super) fn write_buf(&mut self) -> &mut WriteBuf<B> {
119         &mut self.write_buf
120     }
121 
buffer<BB: Buf + Into<B>>(&mut self, buf: BB)122     pub fn buffer<BB: Buf + Into<B>>(&mut self, buf: BB) {
123         self.write_buf.buffer(buf)
124     }
125 
can_buffer(&self) -> bool126     pub fn can_buffer(&self) -> bool {
127         self.flush_pipeline || self.write_buf.can_buffer()
128     }
129 
consume_leading_lines(&mut self)130     pub fn consume_leading_lines(&mut self) {
131         if !self.read_buf.is_empty() {
132             let mut i = 0;
133             while i < self.read_buf.len() {
134                 match self.read_buf[i] {
135                     b'\r' | b'\n' => i += 1,
136                     _ => break,
137                 }
138             }
139             self.read_buf.advance(i);
140         }
141     }
142 
parse<S>( &mut self, cx: &mut task::Context<'_>, parse_ctx: ParseContext<'_>, ) -> Poll<crate::Result<ParsedMessage<S::Incoming>>> where S: Http1Transaction,143     pub(super) fn parse<S>(
144         &mut self,
145         cx: &mut task::Context<'_>,
146         parse_ctx: ParseContext<'_>,
147     ) -> Poll<crate::Result<ParsedMessage<S::Incoming>>>
148     where
149         S: Http1Transaction,
150     {
151         loop {
152             match S::parse(
153                 &mut self.read_buf,
154                 ParseContext {
155                     cached_headers: parse_ctx.cached_headers,
156                     req_method: parse_ctx.req_method,
157                 },
158             )? {
159                 Some(msg) => {
160                     debug!("parsed {} headers", msg.head.headers.len());
161                     return Poll::Ready(Ok(msg));
162                 }
163                 None => {
164                     let max = self.read_buf_strategy.max();
165                     if self.read_buf.len() >= max {
166                         debug!("max_buf_size ({}) reached, closing", max);
167                         return Poll::Ready(Err(crate::Error::new_too_large()));
168                     }
169                 }
170             }
171             if ready!(self.poll_read_from_io(cx)).map_err(crate::Error::new_io)? == 0 {
172                 trace!("parse eof");
173                 return Poll::Ready(Err(crate::Error::new_incomplete()));
174             }
175         }
176     }
177 
poll_read_from_io(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>>178     pub fn poll_read_from_io(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>> {
179         self.read_blocked = false;
180         let next = self.read_buf_strategy.next();
181         if self.read_buf_remaining_mut() < next {
182             self.read_buf.reserve(next);
183         }
184         match Pin::new(&mut self.io).poll_read_buf(cx, &mut self.read_buf) {
185             Poll::Ready(Ok(n)) => {
186                 debug!("read {} bytes", n);
187                 self.read_buf_strategy.record(n);
188                 Poll::Ready(Ok(n))
189             }
190             Poll::Pending => {
191                 self.read_blocked = true;
192                 Poll::Pending
193             }
194             Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
195         }
196     }
197 
into_inner(self) -> (T, Bytes)198     pub fn into_inner(self) -> (T, Bytes) {
199         (self.io, self.read_buf.freeze())
200     }
201 
io_mut(&mut self) -> &mut T202     pub fn io_mut(&mut self) -> &mut T {
203         &mut self.io
204     }
205 
is_read_blocked(&self) -> bool206     pub fn is_read_blocked(&self) -> bool {
207         self.read_blocked
208     }
209 
poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>>210     pub fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
211         if self.flush_pipeline && !self.read_buf.is_empty() {
212             Poll::Ready(Ok(()))
213         } else if self.write_buf.remaining() == 0 {
214             Pin::new(&mut self.io).poll_flush(cx)
215         } else {
216             if let WriteStrategy::Flatten = self.write_buf.strategy {
217                 return self.poll_flush_flattened(cx);
218             }
219             loop {
220                 let n =
221                     ready!(Pin::new(&mut self.io).poll_write_buf(cx, &mut self.write_buf.auto()))?;
222                 debug!("flushed {} bytes", n);
223                 if self.write_buf.remaining() == 0 {
224                     break;
225                 } else if n == 0 {
226                     trace!(
227                         "write returned zero, but {} bytes remaining",
228                         self.write_buf.remaining()
229                     );
230                     return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
231                 }
232             }
233             Pin::new(&mut self.io).poll_flush(cx)
234         }
235     }
236 
237     /// Specialized version of `flush` when strategy is Flatten.
238     ///
239     /// Since all buffered bytes are flattened into the single headers buffer,
240     /// that skips some bookkeeping around using multiple buffers.
poll_flush_flattened(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>>241     fn poll_flush_flattened(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
242         loop {
243             let n = ready!(Pin::new(&mut self.io).poll_write(cx, self.write_buf.headers.bytes()))?;
244             debug!("flushed {} bytes", n);
245             self.write_buf.headers.advance(n);
246             if self.write_buf.headers.remaining() == 0 {
247                 self.write_buf.headers.reset();
248                 break;
249             } else if n == 0 {
250                 trace!(
251                     "write returned zero, but {} bytes remaining",
252                     self.write_buf.remaining()
253                 );
254                 return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
255             }
256         }
257         Pin::new(&mut self.io).poll_flush(cx)
258     }
259 
260     #[cfg(test)]
flush<'a>(&'a mut self) -> impl std::future::Future<Output = io::Result<()>> + 'a261     fn flush<'a>(&'a mut self) -> impl std::future::Future<Output = io::Result<()>> + 'a {
262         futures_util::future::poll_fn(move |cx| self.poll_flush(cx))
263     }
264 }
265 
266 // The `B` is a `Buf`, we never project a pin to it
267 impl<T: Unpin, B> Unpin for Buffered<T, B> {}
268 
269 // TODO: This trait is old... at least rename to PollBytes or something...
270 pub trait MemRead {
read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>>271     fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>>;
272 }
273 
274 impl<T, B> MemRead for Buffered<T, B>
275 where
276     T: AsyncRead + AsyncWrite + Unpin,
277     B: Buf,
278 {
read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>>279     fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
280         if !self.read_buf.is_empty() {
281             let n = std::cmp::min(len, self.read_buf.len());
282             Poll::Ready(Ok(self.read_buf.split_to(n).freeze()))
283         } else {
284             let n = ready!(self.poll_read_from_io(cx))?;
285             Poll::Ready(Ok(self.read_buf.split_to(::std::cmp::min(len, n)).freeze()))
286         }
287     }
288 }
289 
290 #[derive(Clone, Copy, Debug)]
291 enum ReadStrategy {
292     Adaptive {
293         decrease_now: bool,
294         next: usize,
295         max: usize,
296     },
297     Exact(usize),
298 }
299 
300 impl ReadStrategy {
with_max(max: usize) -> ReadStrategy301     fn with_max(max: usize) -> ReadStrategy {
302         ReadStrategy::Adaptive {
303             decrease_now: false,
304             next: INIT_BUFFER_SIZE,
305             max,
306         }
307     }
308 
next(&self) -> usize309     fn next(&self) -> usize {
310         match *self {
311             ReadStrategy::Adaptive { next, .. } => next,
312             ReadStrategy::Exact(exact) => exact,
313         }
314     }
315 
max(&self) -> usize316     fn max(&self) -> usize {
317         match *self {
318             ReadStrategy::Adaptive { max, .. } => max,
319             ReadStrategy::Exact(exact) => exact,
320         }
321     }
322 
record(&mut self, bytes_read: usize)323     fn record(&mut self, bytes_read: usize) {
324         if let ReadStrategy::Adaptive {
325             ref mut decrease_now,
326             ref mut next,
327             max,
328             ..
329         } = *self
330         {
331             if bytes_read >= *next {
332                 *next = cmp::min(incr_power_of_two(*next), max);
333                 *decrease_now = false;
334             } else {
335                 let decr_to = prev_power_of_two(*next);
336                 if bytes_read < decr_to {
337                     if *decrease_now {
338                         *next = cmp::max(decr_to, INIT_BUFFER_SIZE);
339                         *decrease_now = false;
340                     } else {
341                         // Decreasing is a two "record" process.
342                         *decrease_now = true;
343                     }
344                 } else {
345                     // A read within the current range should cancel
346                     // a potential decrease, since we just saw proof
347                     // that we still need this size.
348                     *decrease_now = false;
349                 }
350             }
351         }
352     }
353 }
354 
incr_power_of_two(n: usize) -> usize355 fn incr_power_of_two(n: usize) -> usize {
356     n.saturating_mul(2)
357 }
358 
prev_power_of_two(n: usize) -> usize359 fn prev_power_of_two(n: usize) -> usize {
360     // Only way this shift can underflow is if n is less than 4.
361     // (Which would means `usize::MAX >> 64` and underflowed!)
362     debug_assert!(n >= 4);
363     (::std::usize::MAX >> (n.leading_zeros() + 2)) + 1
364 }
365 
366 impl Default for ReadStrategy {
default() -> ReadStrategy367     fn default() -> ReadStrategy {
368         ReadStrategy::with_max(DEFAULT_MAX_BUFFER_SIZE)
369     }
370 }
371 
372 #[derive(Clone)]
373 pub struct Cursor<T> {
374     bytes: T,
375     pos: usize,
376 }
377 
378 impl<T: AsRef<[u8]>> Cursor<T> {
379     #[inline]
new(bytes: T) -> Cursor<T>380     pub(crate) fn new(bytes: T) -> Cursor<T> {
381         Cursor { bytes, pos: 0 }
382     }
383 }
384 
385 impl Cursor<Vec<u8>> {
reset(&mut self)386     fn reset(&mut self) {
387         self.pos = 0;
388         self.bytes.clear();
389     }
390 }
391 
392 impl<T: AsRef<[u8]>> fmt::Debug for Cursor<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result393     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
394         f.debug_struct("Cursor")
395             .field("pos", &self.pos)
396             .field("len", &self.bytes.as_ref().len())
397             .finish()
398     }
399 }
400 
401 impl<T: AsRef<[u8]>> Buf for Cursor<T> {
402     #[inline]
remaining(&self) -> usize403     fn remaining(&self) -> usize {
404         self.bytes.as_ref().len() - self.pos
405     }
406 
407     #[inline]
bytes(&self) -> &[u8]408     fn bytes(&self) -> &[u8] {
409         &self.bytes.as_ref()[self.pos..]
410     }
411 
412     #[inline]
advance(&mut self, cnt: usize)413     fn advance(&mut self, cnt: usize) {
414         debug_assert!(self.pos + cnt <= self.bytes.as_ref().len());
415         self.pos += cnt;
416     }
417 }
418 
419 // an internal buffer to collect writes before flushes
420 pub(super) struct WriteBuf<B> {
421     /// Re-usable buffer that holds message headers
422     headers: Cursor<Vec<u8>>,
423     max_buf_size: usize,
424     /// Deque of user buffers if strategy is Queue
425     queue: BufList<B>,
426     strategy: WriteStrategy,
427 }
428 
429 impl<B: Buf> WriteBuf<B> {
new() -> WriteBuf<B>430     fn new() -> WriteBuf<B> {
431         WriteBuf {
432             headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)),
433             max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
434             queue: BufList::new(),
435             strategy: WriteStrategy::Auto,
436         }
437     }
438 }
439 
440 impl<B> WriteBuf<B>
441 where
442     B: Buf,
443 {
set_strategy(&mut self, strategy: WriteStrategy)444     fn set_strategy(&mut self, strategy: WriteStrategy) {
445         self.strategy = strategy;
446     }
447 
448     #[inline]
auto(&mut self) -> WriteBufAuto<'_, B>449     fn auto(&mut self) -> WriteBufAuto<'_, B> {
450         WriteBufAuto::new(self)
451     }
452 
buffer<BB: Buf + Into<B>>(&mut self, mut buf: BB)453     pub(super) fn buffer<BB: Buf + Into<B>>(&mut self, mut buf: BB) {
454         debug_assert!(buf.has_remaining());
455         match self.strategy {
456             WriteStrategy::Flatten => {
457                 let head = self.headers_mut();
458                 //perf: This is a little faster than <Vec as BufMut>>::put,
459                 //but accomplishes the same result.
460                 loop {
461                     let adv = {
462                         let slice = buf.bytes();
463                         if slice.is_empty() {
464                             return;
465                         }
466                         head.bytes.extend_from_slice(slice);
467                         slice.len()
468                     };
469                     buf.advance(adv);
470                 }
471             }
472             WriteStrategy::Auto | WriteStrategy::Queue => {
473                 self.queue.push(buf.into());
474             }
475         }
476     }
477 
can_buffer(&self) -> bool478     fn can_buffer(&self) -> bool {
479         match self.strategy {
480             WriteStrategy::Flatten => self.remaining() < self.max_buf_size,
481             WriteStrategy::Auto | WriteStrategy::Queue => {
482                 self.queue.bufs_cnt() < MAX_BUF_LIST_BUFFERS && self.remaining() < self.max_buf_size
483             }
484         }
485     }
486 
headers_mut(&mut self) -> &mut Cursor<Vec<u8>>487     fn headers_mut(&mut self) -> &mut Cursor<Vec<u8>> {
488         debug_assert!(!self.queue.has_remaining());
489         &mut self.headers
490     }
491 }
492 
493 impl<B: Buf> fmt::Debug for WriteBuf<B> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result494     fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
495         f.debug_struct("WriteBuf")
496             .field("remaining", &self.remaining())
497             .field("strategy", &self.strategy)
498             .finish()
499     }
500 }
501 
502 impl<B: Buf> Buf for WriteBuf<B> {
503     #[inline]
remaining(&self) -> usize504     fn remaining(&self) -> usize {
505         self.headers.remaining() + self.queue.remaining()
506     }
507 
508     #[inline]
bytes(&self) -> &[u8]509     fn bytes(&self) -> &[u8] {
510         let headers = self.headers.bytes();
511         if !headers.is_empty() {
512             headers
513         } else {
514             self.queue.bytes()
515         }
516     }
517 
518     #[inline]
advance(&mut self, cnt: usize)519     fn advance(&mut self, cnt: usize) {
520         let hrem = self.headers.remaining();
521 
522         match hrem.cmp(&cnt) {
523             cmp::Ordering::Equal => self.headers.reset(),
524             cmp::Ordering::Greater => self.headers.advance(cnt),
525             cmp::Ordering::Less => {
526                 let qcnt = cnt - hrem;
527                 self.headers.reset();
528                 self.queue.advance(qcnt);
529             }
530         }
531     }
532 
533     #[inline]
bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize534     fn bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
535         let n = self.headers.bytes_vectored(dst);
536         self.queue.bytes_vectored(&mut dst[n..]) + n
537     }
538 }
539 
540 /// Detects when wrapped `WriteBuf` is used for vectored IO, and
541 /// adjusts the `WriteBuf` strategy if not.
542 struct WriteBufAuto<'a, B: Buf> {
543     bytes_called: Cell<bool>,
544     bytes_vec_called: Cell<bool>,
545     inner: &'a mut WriteBuf<B>,
546 }
547 
548 impl<'a, B: Buf> WriteBufAuto<'a, B> {
new(inner: &'a mut WriteBuf<B>) -> WriteBufAuto<'a, B>549     fn new(inner: &'a mut WriteBuf<B>) -> WriteBufAuto<'a, B> {
550         WriteBufAuto {
551             bytes_called: Cell::new(false),
552             bytes_vec_called: Cell::new(false),
553             inner,
554         }
555     }
556 }
557 
558 impl<'a, B: Buf> Buf for WriteBufAuto<'a, B> {
559     #[inline]
remaining(&self) -> usize560     fn remaining(&self) -> usize {
561         self.inner.remaining()
562     }
563 
564     #[inline]
bytes(&self) -> &[u8]565     fn bytes(&self) -> &[u8] {
566         self.bytes_called.set(true);
567         self.inner.bytes()
568     }
569 
570     #[inline]
advance(&mut self, cnt: usize)571     fn advance(&mut self, cnt: usize) {
572         self.inner.advance(cnt)
573     }
574 
575     #[inline]
bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize576     fn bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
577         self.bytes_vec_called.set(true);
578         self.inner.bytes_vectored(dst)
579     }
580 }
581 
582 impl<'a, B: Buf + 'a> Drop for WriteBufAuto<'a, B> {
drop(&mut self)583     fn drop(&mut self) {
584         if let WriteStrategy::Auto = self.inner.strategy {
585             if self.bytes_vec_called.get() {
586                 self.inner.strategy = WriteStrategy::Queue;
587             } else if self.bytes_called.get() {
588                 trace!("detected no usage of vectored write, flattening");
589                 self.inner.strategy = WriteStrategy::Flatten;
590                 self.inner.headers.bytes.put(&mut self.inner.queue);
591             }
592         }
593     }
594 }
595 
596 #[derive(Debug)]
597 enum WriteStrategy {
598     Auto,
599     Flatten,
600     Queue,
601 }
602 
603 #[cfg(test)]
604 mod tests {
605     use super::*;
606     use std::time::Duration;
607 
608     use tokio_test::io::Builder as Mock;
609 
610     #[cfg(feature = "nightly")]
611     use test::Bencher;
612 
613     /*
614     impl<T: Read> MemRead for AsyncIo<T> {
615         fn read_mem(&mut self, len: usize) -> Poll<Bytes, io::Error> {
616             let mut v = vec![0; len];
617             let n = try_nb!(self.read(v.as_mut_slice()));
618             Ok(Async::Ready(BytesMut::from(&v[..n]).freeze()))
619         }
620     }
621     */
622 
623     #[tokio::test]
iobuf_write_empty_slice()624     async fn iobuf_write_empty_slice() {
625         // First, let's just check that the Mock would normally return an
626         // error on an unexpected write, even if the buffer is empty...
627         let mut mock = Mock::new().build();
628         futures_util::future::poll_fn(|cx| {
629             Pin::new(&mut mock).poll_write_buf(cx, &mut Cursor::new(&[]))
630         })
631         .await
632         .expect_err("should be a broken pipe");
633 
634         // underlying io will return the logic error upon write,
635         // so we are testing that the io_buf does not trigger a write
636         // when there is nothing to flush
637         let mock = Mock::new().build();
638         let mut io_buf = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
639         io_buf.flush().await.expect("should short-circuit flush");
640     }
641 
642     #[tokio::test]
parse_reads_until_blocked()643     async fn parse_reads_until_blocked() {
644         use crate::proto::h1::ClientTransaction;
645 
646         let mock = Mock::new()
647             // Split over multiple reads will read all of it
648             .read(b"HTTP/1.1 200 OK\r\n")
649             .read(b"Server: hyper\r\n")
650             // missing last line ending
651             .wait(Duration::from_secs(1))
652             .build();
653 
654         let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
655 
656         // We expect a `parse` to be not ready, and so can't await it directly.
657         // Rather, this `poll_fn` will wrap the `Poll` result.
658         futures_util::future::poll_fn(|cx| {
659             let parse_ctx = ParseContext {
660                 cached_headers: &mut None,
661                 req_method: &mut None,
662             };
663             assert!(buffered
664                 .parse::<ClientTransaction>(cx, parse_ctx)
665                 .is_pending());
666             Poll::Ready(())
667         })
668         .await;
669 
670         assert_eq!(
671             buffered.read_buf,
672             b"HTTP/1.1 200 OK\r\nServer: hyper\r\n"[..]
673         );
674     }
675 
676     #[test]
read_strategy_adaptive_increments()677     fn read_strategy_adaptive_increments() {
678         let mut strategy = ReadStrategy::default();
679         assert_eq!(strategy.next(), 8192);
680 
681         // Grows if record == next
682         strategy.record(8192);
683         assert_eq!(strategy.next(), 16384);
684 
685         strategy.record(16384);
686         assert_eq!(strategy.next(), 32768);
687 
688         // Enormous records still increment at same rate
689         strategy.record(::std::usize::MAX);
690         assert_eq!(strategy.next(), 65536);
691 
692         let max = strategy.max();
693         while strategy.next() < max {
694             strategy.record(max);
695         }
696 
697         assert_eq!(strategy.next(), max, "never goes over max");
698         strategy.record(max + 1);
699         assert_eq!(strategy.next(), max, "never goes over max");
700     }
701 
702     #[test]
read_strategy_adaptive_decrements()703     fn read_strategy_adaptive_decrements() {
704         let mut strategy = ReadStrategy::default();
705         strategy.record(8192);
706         assert_eq!(strategy.next(), 16384);
707 
708         strategy.record(1);
709         assert_eq!(
710             strategy.next(),
711             16384,
712             "first smaller record doesn't decrement yet"
713         );
714         strategy.record(8192);
715         assert_eq!(strategy.next(), 16384, "record was with range");
716 
717         strategy.record(1);
718         assert_eq!(
719             strategy.next(),
720             16384,
721             "in-range record should make this the 'first' again"
722         );
723 
724         strategy.record(1);
725         assert_eq!(strategy.next(), 8192, "second smaller record decrements");
726 
727         strategy.record(1);
728         assert_eq!(strategy.next(), 8192, "first doesn't decrement");
729         strategy.record(1);
730         assert_eq!(strategy.next(), 8192, "doesn't decrement under minimum");
731     }
732 
733     #[test]
read_strategy_adaptive_stays_the_same()734     fn read_strategy_adaptive_stays_the_same() {
735         let mut strategy = ReadStrategy::default();
736         strategy.record(8192);
737         assert_eq!(strategy.next(), 16384);
738 
739         strategy.record(8193);
740         assert_eq!(
741             strategy.next(),
742             16384,
743             "first smaller record doesn't decrement yet"
744         );
745 
746         strategy.record(8193);
747         assert_eq!(
748             strategy.next(),
749             16384,
750             "with current step does not decrement"
751         );
752     }
753 
754     #[test]
read_strategy_adaptive_max_fuzz()755     fn read_strategy_adaptive_max_fuzz() {
756         fn fuzz(max: usize) {
757             let mut strategy = ReadStrategy::with_max(max);
758             while strategy.next() < max {
759                 strategy.record(::std::usize::MAX);
760             }
761             let mut next = strategy.next();
762             while next > 8192 {
763                 strategy.record(1);
764                 strategy.record(1);
765                 next = strategy.next();
766                 assert!(
767                     next.is_power_of_two(),
768                     "decrement should be powers of two: {} (max = {})",
769                     next,
770                     max,
771                 );
772             }
773         }
774 
775         let mut max = 8192;
776         while max < std::usize::MAX {
777             fuzz(max);
778             max = (max / 2).saturating_mul(3);
779         }
780         fuzz(::std::usize::MAX);
781     }
782 
783     #[test]
784     #[should_panic]
785     #[cfg(debug_assertions)] // needs to trigger a debug_assert
write_buf_requires_non_empty_bufs()786     fn write_buf_requires_non_empty_bufs() {
787         let mock = Mock::new().build();
788         let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
789 
790         buffered.buffer(Cursor::new(Vec::new()));
791     }
792 
793     /*
794     TODO: needs tokio_test::io to allow configure write_buf calls
795     #[test]
796     fn write_buf_queue() {
797         let _ = pretty_env_logger::try_init();
798 
799         let mock = AsyncIo::new_buf(vec![], 1024);
800         let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
801 
802 
803         buffered.headers_buf().extend(b"hello ");
804         buffered.buffer(Cursor::new(b"world, ".to_vec()));
805         buffered.buffer(Cursor::new(b"it's ".to_vec()));
806         buffered.buffer(Cursor::new(b"hyper!".to_vec()));
807         assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3);
808         buffered.flush().unwrap();
809 
810         assert_eq!(buffered.io, b"hello world, it's hyper!");
811         assert_eq!(buffered.io.num_writes(), 1);
812         assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
813     }
814     */
815 
816     #[tokio::test]
write_buf_flatten()817     async fn write_buf_flatten() {
818         let _ = pretty_env_logger::try_init();
819 
820         let mock = Mock::new()
821             // Just a single write
822             .write(b"hello world, it's hyper!")
823             .build();
824 
825         let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
826         buffered.write_buf.set_strategy(WriteStrategy::Flatten);
827 
828         buffered.headers_buf().extend(b"hello ");
829         buffered.buffer(Cursor::new(b"world, ".to_vec()));
830         buffered.buffer(Cursor::new(b"it's ".to_vec()));
831         buffered.buffer(Cursor::new(b"hyper!".to_vec()));
832         assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
833 
834         buffered.flush().await.expect("flush");
835     }
836 
837     #[tokio::test]
write_buf_auto_flatten()838     async fn write_buf_auto_flatten() {
839         let _ = pretty_env_logger::try_init();
840 
841         let mock = Mock::new()
842             // Expects write_buf to only consume first buffer
843             .write(b"hello ")
844             // And then the Auto strategy will have flattened
845             .write(b"world, it's hyper!")
846             .build();
847 
848         let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
849 
850         // we have 4 buffers, but hope to detect that vectored IO isn't
851         // being used, and switch to flattening automatically,
852         // resulting in only 2 writes
853         buffered.headers_buf().extend(b"hello ");
854         buffered.buffer(Cursor::new(b"world, ".to_vec()));
855         buffered.buffer(Cursor::new(b"it's ".to_vec()));
856         buffered.buffer(Cursor::new(b"hyper!".to_vec()));
857         assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3);
858 
859         buffered.flush().await.expect("flush");
860 
861         assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
862     }
863 
864     #[tokio::test]
write_buf_queue_disable_auto()865     async fn write_buf_queue_disable_auto() {
866         let _ = pretty_env_logger::try_init();
867 
868         let mock = Mock::new()
869             .write(b"hello ")
870             .write(b"world, ")
871             .write(b"it's ")
872             .write(b"hyper!")
873             .build();
874 
875         let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
876         buffered.write_buf.set_strategy(WriteStrategy::Queue);
877 
878         // we have 4 buffers, and vec IO disabled, but explicitly said
879         // don't try to auto detect (via setting strategy above)
880 
881         buffered.headers_buf().extend(b"hello ");
882         buffered.buffer(Cursor::new(b"world, ".to_vec()));
883         buffered.buffer(Cursor::new(b"it's ".to_vec()));
884         buffered.buffer(Cursor::new(b"hyper!".to_vec()));
885         assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3);
886 
887         buffered.flush().await.expect("flush");
888 
889         assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
890     }
891 
892     #[cfg(feature = "nightly")]
893     #[bench]
bench_write_buf_flatten_buffer_chunk(b: &mut Bencher)894     fn bench_write_buf_flatten_buffer_chunk(b: &mut Bencher) {
895         let s = "Hello, World!";
896         b.bytes = s.len() as u64;
897 
898         let mut write_buf = WriteBuf::<bytes::Bytes>::new();
899         write_buf.set_strategy(WriteStrategy::Flatten);
900         b.iter(|| {
901             let chunk = bytes::Bytes::from(s);
902             write_buf.buffer(chunk);
903             ::test::black_box(&write_buf);
904             write_buf.headers.bytes.clear();
905         })
906     }
907 }
908