1 use std::cell::Cell;
2 use std::cmp;
3 use std::fmt;
4 use std::io::{self, IoSlice};
5
6 use bytes::{Buf, BufMut, Bytes, BytesMut};
7 use tokio::io::{AsyncRead, AsyncWrite};
8
9 use super::{Http1Transaction, ParseContext, ParsedMessage};
10 use crate::common::buf::BufList;
11 use crate::common::{task, Pin, Poll, Unpin};
12
13 /// The initial buffer size allocated before trying to read from IO.
14 pub(crate) const INIT_BUFFER_SIZE: usize = 8192;
15
16 /// The minimum value that can be set to max buffer size.
17 pub const MINIMUM_MAX_BUFFER_SIZE: usize = INIT_BUFFER_SIZE;
18
19 /// The default maximum read buffer size. If the buffer gets this big and
20 /// a message is still not complete, a `TooLarge` error is triggered.
21 // Note: if this changes, update server::conn::Http::max_buf_size docs.
22 pub(crate) const DEFAULT_MAX_BUFFER_SIZE: usize = 8192 + 4096 * 100;
23
24 /// The maximum number of distinct `Buf`s to hold in a list before requiring
25 /// a flush. Only affects when the buffer strategy is to queue buffers.
26 ///
27 /// Note that a flush can happen before reaching the maximum. This simply
28 /// forces a flush if the queue gets this big.
29 const MAX_BUF_LIST_BUFFERS: usize = 16;
30
31 pub struct Buffered<T, B> {
32 flush_pipeline: bool,
33 io: T,
34 read_blocked: bool,
35 read_buf: BytesMut,
36 read_buf_strategy: ReadStrategy,
37 write_buf: WriteBuf<B>,
38 }
39
40 impl<T, B> fmt::Debug for Buffered<T, B>
41 where
42 B: Buf,
43 {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result44 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
45 f.debug_struct("Buffered")
46 .field("read_buf", &self.read_buf)
47 .field("write_buf", &self.write_buf)
48 .finish()
49 }
50 }
51
52 impl<T, B> Buffered<T, B>
53 where
54 T: AsyncRead + AsyncWrite + Unpin,
55 B: Buf,
56 {
new(io: T) -> Buffered<T, B>57 pub fn new(io: T) -> Buffered<T, B> {
58 Buffered {
59 flush_pipeline: false,
60 io,
61 read_blocked: false,
62 read_buf: BytesMut::with_capacity(0),
63 read_buf_strategy: ReadStrategy::default(),
64 write_buf: WriteBuf::new(),
65 }
66 }
67
set_flush_pipeline(&mut self, enabled: bool)68 pub fn set_flush_pipeline(&mut self, enabled: bool) {
69 debug_assert!(!self.write_buf.has_remaining());
70 self.flush_pipeline = enabled;
71 if enabled {
72 self.set_write_strategy_flatten();
73 }
74 }
75
set_max_buf_size(&mut self, max: usize)76 pub fn set_max_buf_size(&mut self, max: usize) {
77 assert!(
78 max >= MINIMUM_MAX_BUFFER_SIZE,
79 "The max_buf_size cannot be smaller than {}.",
80 MINIMUM_MAX_BUFFER_SIZE,
81 );
82 self.read_buf_strategy = ReadStrategy::with_max(max);
83 self.write_buf.max_buf_size = max;
84 }
85
set_read_buf_exact_size(&mut self, sz: usize)86 pub fn set_read_buf_exact_size(&mut self, sz: usize) {
87 self.read_buf_strategy = ReadStrategy::Exact(sz);
88 }
89
set_write_strategy_flatten(&mut self)90 pub fn set_write_strategy_flatten(&mut self) {
91 // this should always be called only at construction time,
92 // so this assert is here to catch myself
93 debug_assert!(self.write_buf.queue.bufs_cnt() == 0);
94 self.write_buf.set_strategy(WriteStrategy::Flatten);
95 }
96
read_buf(&self) -> &[u8]97 pub fn read_buf(&self) -> &[u8] {
98 self.read_buf.as_ref()
99 }
100
101 #[cfg(test)]
102 #[cfg(feature = "nightly")]
read_buf_mut(&mut self) -> &mut BytesMut103 pub(super) fn read_buf_mut(&mut self) -> &mut BytesMut {
104 &mut self.read_buf
105 }
106
107 /// Return the "allocated" available space, not the potential space
108 /// that could be allocated in the future.
read_buf_remaining_mut(&self) -> usize109 fn read_buf_remaining_mut(&self) -> usize {
110 self.read_buf.capacity() - self.read_buf.len()
111 }
112
headers_buf(&mut self) -> &mut Vec<u8>113 pub fn headers_buf(&mut self) -> &mut Vec<u8> {
114 let buf = self.write_buf.headers_mut();
115 &mut buf.bytes
116 }
117
write_buf(&mut self) -> &mut WriteBuf<B>118 pub(super) fn write_buf(&mut self) -> &mut WriteBuf<B> {
119 &mut self.write_buf
120 }
121
buffer<BB: Buf + Into<B>>(&mut self, buf: BB)122 pub fn buffer<BB: Buf + Into<B>>(&mut self, buf: BB) {
123 self.write_buf.buffer(buf)
124 }
125
can_buffer(&self) -> bool126 pub fn can_buffer(&self) -> bool {
127 self.flush_pipeline || self.write_buf.can_buffer()
128 }
129
consume_leading_lines(&mut self)130 pub fn consume_leading_lines(&mut self) {
131 if !self.read_buf.is_empty() {
132 let mut i = 0;
133 while i < self.read_buf.len() {
134 match self.read_buf[i] {
135 b'\r' | b'\n' => i += 1,
136 _ => break,
137 }
138 }
139 self.read_buf.advance(i);
140 }
141 }
142
parse<S>( &mut self, cx: &mut task::Context<'_>, parse_ctx: ParseContext<'_>, ) -> Poll<crate::Result<ParsedMessage<S::Incoming>>> where S: Http1Transaction,143 pub(super) fn parse<S>(
144 &mut self,
145 cx: &mut task::Context<'_>,
146 parse_ctx: ParseContext<'_>,
147 ) -> Poll<crate::Result<ParsedMessage<S::Incoming>>>
148 where
149 S: Http1Transaction,
150 {
151 loop {
152 match S::parse(
153 &mut self.read_buf,
154 ParseContext {
155 cached_headers: parse_ctx.cached_headers,
156 req_method: parse_ctx.req_method,
157 },
158 )? {
159 Some(msg) => {
160 debug!("parsed {} headers", msg.head.headers.len());
161 return Poll::Ready(Ok(msg));
162 }
163 None => {
164 let max = self.read_buf_strategy.max();
165 if self.read_buf.len() >= max {
166 debug!("max_buf_size ({}) reached, closing", max);
167 return Poll::Ready(Err(crate::Error::new_too_large()));
168 }
169 }
170 }
171 if ready!(self.poll_read_from_io(cx)).map_err(crate::Error::new_io)? == 0 {
172 trace!("parse eof");
173 return Poll::Ready(Err(crate::Error::new_incomplete()));
174 }
175 }
176 }
177
poll_read_from_io(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>>178 pub fn poll_read_from_io(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<usize>> {
179 self.read_blocked = false;
180 let next = self.read_buf_strategy.next();
181 if self.read_buf_remaining_mut() < next {
182 self.read_buf.reserve(next);
183 }
184 match Pin::new(&mut self.io).poll_read_buf(cx, &mut self.read_buf) {
185 Poll::Ready(Ok(n)) => {
186 debug!("read {} bytes", n);
187 self.read_buf_strategy.record(n);
188 Poll::Ready(Ok(n))
189 }
190 Poll::Pending => {
191 self.read_blocked = true;
192 Poll::Pending
193 }
194 Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
195 }
196 }
197
into_inner(self) -> (T, Bytes)198 pub fn into_inner(self) -> (T, Bytes) {
199 (self.io, self.read_buf.freeze())
200 }
201
io_mut(&mut self) -> &mut T202 pub fn io_mut(&mut self) -> &mut T {
203 &mut self.io
204 }
205
is_read_blocked(&self) -> bool206 pub fn is_read_blocked(&self) -> bool {
207 self.read_blocked
208 }
209
poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>>210 pub fn poll_flush(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
211 if self.flush_pipeline && !self.read_buf.is_empty() {
212 Poll::Ready(Ok(()))
213 } else if self.write_buf.remaining() == 0 {
214 Pin::new(&mut self.io).poll_flush(cx)
215 } else {
216 if let WriteStrategy::Flatten = self.write_buf.strategy {
217 return self.poll_flush_flattened(cx);
218 }
219 loop {
220 let n =
221 ready!(Pin::new(&mut self.io).poll_write_buf(cx, &mut self.write_buf.auto()))?;
222 debug!("flushed {} bytes", n);
223 if self.write_buf.remaining() == 0 {
224 break;
225 } else if n == 0 {
226 trace!(
227 "write returned zero, but {} bytes remaining",
228 self.write_buf.remaining()
229 );
230 return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
231 }
232 }
233 Pin::new(&mut self.io).poll_flush(cx)
234 }
235 }
236
237 /// Specialized version of `flush` when strategy is Flatten.
238 ///
239 /// Since all buffered bytes are flattened into the single headers buffer,
240 /// that skips some bookkeeping around using multiple buffers.
poll_flush_flattened(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>>241 fn poll_flush_flattened(&mut self, cx: &mut task::Context<'_>) -> Poll<io::Result<()>> {
242 loop {
243 let n = ready!(Pin::new(&mut self.io).poll_write(cx, self.write_buf.headers.bytes()))?;
244 debug!("flushed {} bytes", n);
245 self.write_buf.headers.advance(n);
246 if self.write_buf.headers.remaining() == 0 {
247 self.write_buf.headers.reset();
248 break;
249 } else if n == 0 {
250 trace!(
251 "write returned zero, but {} bytes remaining",
252 self.write_buf.remaining()
253 );
254 return Poll::Ready(Err(io::ErrorKind::WriteZero.into()));
255 }
256 }
257 Pin::new(&mut self.io).poll_flush(cx)
258 }
259
260 #[cfg(test)]
flush<'a>(&'a mut self) -> impl std::future::Future<Output = io::Result<()>> + 'a261 fn flush<'a>(&'a mut self) -> impl std::future::Future<Output = io::Result<()>> + 'a {
262 futures_util::future::poll_fn(move |cx| self.poll_flush(cx))
263 }
264 }
265
266 // The `B` is a `Buf`, we never project a pin to it
267 impl<T: Unpin, B> Unpin for Buffered<T, B> {}
268
269 // TODO: This trait is old... at least rename to PollBytes or something...
270 pub trait MemRead {
read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>>271 fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>>;
272 }
273
274 impl<T, B> MemRead for Buffered<T, B>
275 where
276 T: AsyncRead + AsyncWrite + Unpin,
277 B: Buf,
278 {
read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>>279 fn read_mem(&mut self, cx: &mut task::Context<'_>, len: usize) -> Poll<io::Result<Bytes>> {
280 if !self.read_buf.is_empty() {
281 let n = std::cmp::min(len, self.read_buf.len());
282 Poll::Ready(Ok(self.read_buf.split_to(n).freeze()))
283 } else {
284 let n = ready!(self.poll_read_from_io(cx))?;
285 Poll::Ready(Ok(self.read_buf.split_to(::std::cmp::min(len, n)).freeze()))
286 }
287 }
288 }
289
290 #[derive(Clone, Copy, Debug)]
291 enum ReadStrategy {
292 Adaptive {
293 decrease_now: bool,
294 next: usize,
295 max: usize,
296 },
297 Exact(usize),
298 }
299
300 impl ReadStrategy {
with_max(max: usize) -> ReadStrategy301 fn with_max(max: usize) -> ReadStrategy {
302 ReadStrategy::Adaptive {
303 decrease_now: false,
304 next: INIT_BUFFER_SIZE,
305 max,
306 }
307 }
308
next(&self) -> usize309 fn next(&self) -> usize {
310 match *self {
311 ReadStrategy::Adaptive { next, .. } => next,
312 ReadStrategy::Exact(exact) => exact,
313 }
314 }
315
max(&self) -> usize316 fn max(&self) -> usize {
317 match *self {
318 ReadStrategy::Adaptive { max, .. } => max,
319 ReadStrategy::Exact(exact) => exact,
320 }
321 }
322
record(&mut self, bytes_read: usize)323 fn record(&mut self, bytes_read: usize) {
324 if let ReadStrategy::Adaptive {
325 ref mut decrease_now,
326 ref mut next,
327 max,
328 ..
329 } = *self
330 {
331 if bytes_read >= *next {
332 *next = cmp::min(incr_power_of_two(*next), max);
333 *decrease_now = false;
334 } else {
335 let decr_to = prev_power_of_two(*next);
336 if bytes_read < decr_to {
337 if *decrease_now {
338 *next = cmp::max(decr_to, INIT_BUFFER_SIZE);
339 *decrease_now = false;
340 } else {
341 // Decreasing is a two "record" process.
342 *decrease_now = true;
343 }
344 } else {
345 // A read within the current range should cancel
346 // a potential decrease, since we just saw proof
347 // that we still need this size.
348 *decrease_now = false;
349 }
350 }
351 }
352 }
353 }
354
incr_power_of_two(n: usize) -> usize355 fn incr_power_of_two(n: usize) -> usize {
356 n.saturating_mul(2)
357 }
358
prev_power_of_two(n: usize) -> usize359 fn prev_power_of_two(n: usize) -> usize {
360 // Only way this shift can underflow is if n is less than 4.
361 // (Which would means `usize::MAX >> 64` and underflowed!)
362 debug_assert!(n >= 4);
363 (::std::usize::MAX >> (n.leading_zeros() + 2)) + 1
364 }
365
366 impl Default for ReadStrategy {
default() -> ReadStrategy367 fn default() -> ReadStrategy {
368 ReadStrategy::with_max(DEFAULT_MAX_BUFFER_SIZE)
369 }
370 }
371
372 #[derive(Clone)]
373 pub struct Cursor<T> {
374 bytes: T,
375 pos: usize,
376 }
377
378 impl<T: AsRef<[u8]>> Cursor<T> {
379 #[inline]
new(bytes: T) -> Cursor<T>380 pub(crate) fn new(bytes: T) -> Cursor<T> {
381 Cursor { bytes, pos: 0 }
382 }
383 }
384
385 impl Cursor<Vec<u8>> {
reset(&mut self)386 fn reset(&mut self) {
387 self.pos = 0;
388 self.bytes.clear();
389 }
390 }
391
392 impl<T: AsRef<[u8]>> fmt::Debug for Cursor<T> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result393 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
394 f.debug_struct("Cursor")
395 .field("pos", &self.pos)
396 .field("len", &self.bytes.as_ref().len())
397 .finish()
398 }
399 }
400
401 impl<T: AsRef<[u8]>> Buf for Cursor<T> {
402 #[inline]
remaining(&self) -> usize403 fn remaining(&self) -> usize {
404 self.bytes.as_ref().len() - self.pos
405 }
406
407 #[inline]
bytes(&self) -> &[u8]408 fn bytes(&self) -> &[u8] {
409 &self.bytes.as_ref()[self.pos..]
410 }
411
412 #[inline]
advance(&mut self, cnt: usize)413 fn advance(&mut self, cnt: usize) {
414 debug_assert!(self.pos + cnt <= self.bytes.as_ref().len());
415 self.pos += cnt;
416 }
417 }
418
419 // an internal buffer to collect writes before flushes
420 pub(super) struct WriteBuf<B> {
421 /// Re-usable buffer that holds message headers
422 headers: Cursor<Vec<u8>>,
423 max_buf_size: usize,
424 /// Deque of user buffers if strategy is Queue
425 queue: BufList<B>,
426 strategy: WriteStrategy,
427 }
428
429 impl<B: Buf> WriteBuf<B> {
new() -> WriteBuf<B>430 fn new() -> WriteBuf<B> {
431 WriteBuf {
432 headers: Cursor::new(Vec::with_capacity(INIT_BUFFER_SIZE)),
433 max_buf_size: DEFAULT_MAX_BUFFER_SIZE,
434 queue: BufList::new(),
435 strategy: WriteStrategy::Auto,
436 }
437 }
438 }
439
440 impl<B> WriteBuf<B>
441 where
442 B: Buf,
443 {
set_strategy(&mut self, strategy: WriteStrategy)444 fn set_strategy(&mut self, strategy: WriteStrategy) {
445 self.strategy = strategy;
446 }
447
448 #[inline]
auto(&mut self) -> WriteBufAuto<'_, B>449 fn auto(&mut self) -> WriteBufAuto<'_, B> {
450 WriteBufAuto::new(self)
451 }
452
buffer<BB: Buf + Into<B>>(&mut self, mut buf: BB)453 pub(super) fn buffer<BB: Buf + Into<B>>(&mut self, mut buf: BB) {
454 debug_assert!(buf.has_remaining());
455 match self.strategy {
456 WriteStrategy::Flatten => {
457 let head = self.headers_mut();
458 //perf: This is a little faster than <Vec as BufMut>>::put,
459 //but accomplishes the same result.
460 loop {
461 let adv = {
462 let slice = buf.bytes();
463 if slice.is_empty() {
464 return;
465 }
466 head.bytes.extend_from_slice(slice);
467 slice.len()
468 };
469 buf.advance(adv);
470 }
471 }
472 WriteStrategy::Auto | WriteStrategy::Queue => {
473 self.queue.push(buf.into());
474 }
475 }
476 }
477
can_buffer(&self) -> bool478 fn can_buffer(&self) -> bool {
479 match self.strategy {
480 WriteStrategy::Flatten => self.remaining() < self.max_buf_size,
481 WriteStrategy::Auto | WriteStrategy::Queue => {
482 self.queue.bufs_cnt() < MAX_BUF_LIST_BUFFERS && self.remaining() < self.max_buf_size
483 }
484 }
485 }
486
headers_mut(&mut self) -> &mut Cursor<Vec<u8>>487 fn headers_mut(&mut self) -> &mut Cursor<Vec<u8>> {
488 debug_assert!(!self.queue.has_remaining());
489 &mut self.headers
490 }
491 }
492
493 impl<B: Buf> fmt::Debug for WriteBuf<B> {
fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result494 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
495 f.debug_struct("WriteBuf")
496 .field("remaining", &self.remaining())
497 .field("strategy", &self.strategy)
498 .finish()
499 }
500 }
501
502 impl<B: Buf> Buf for WriteBuf<B> {
503 #[inline]
remaining(&self) -> usize504 fn remaining(&self) -> usize {
505 self.headers.remaining() + self.queue.remaining()
506 }
507
508 #[inline]
bytes(&self) -> &[u8]509 fn bytes(&self) -> &[u8] {
510 let headers = self.headers.bytes();
511 if !headers.is_empty() {
512 headers
513 } else {
514 self.queue.bytes()
515 }
516 }
517
518 #[inline]
advance(&mut self, cnt: usize)519 fn advance(&mut self, cnt: usize) {
520 let hrem = self.headers.remaining();
521
522 match hrem.cmp(&cnt) {
523 cmp::Ordering::Equal => self.headers.reset(),
524 cmp::Ordering::Greater => self.headers.advance(cnt),
525 cmp::Ordering::Less => {
526 let qcnt = cnt - hrem;
527 self.headers.reset();
528 self.queue.advance(qcnt);
529 }
530 }
531 }
532
533 #[inline]
bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize534 fn bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
535 let n = self.headers.bytes_vectored(dst);
536 self.queue.bytes_vectored(&mut dst[n..]) + n
537 }
538 }
539
540 /// Detects when wrapped `WriteBuf` is used for vectored IO, and
541 /// adjusts the `WriteBuf` strategy if not.
542 struct WriteBufAuto<'a, B: Buf> {
543 bytes_called: Cell<bool>,
544 bytes_vec_called: Cell<bool>,
545 inner: &'a mut WriteBuf<B>,
546 }
547
548 impl<'a, B: Buf> WriteBufAuto<'a, B> {
new(inner: &'a mut WriteBuf<B>) -> WriteBufAuto<'a, B>549 fn new(inner: &'a mut WriteBuf<B>) -> WriteBufAuto<'a, B> {
550 WriteBufAuto {
551 bytes_called: Cell::new(false),
552 bytes_vec_called: Cell::new(false),
553 inner,
554 }
555 }
556 }
557
558 impl<'a, B: Buf> Buf for WriteBufAuto<'a, B> {
559 #[inline]
remaining(&self) -> usize560 fn remaining(&self) -> usize {
561 self.inner.remaining()
562 }
563
564 #[inline]
bytes(&self) -> &[u8]565 fn bytes(&self) -> &[u8] {
566 self.bytes_called.set(true);
567 self.inner.bytes()
568 }
569
570 #[inline]
advance(&mut self, cnt: usize)571 fn advance(&mut self, cnt: usize) {
572 self.inner.advance(cnt)
573 }
574
575 #[inline]
bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize576 fn bytes_vectored<'t>(&'t self, dst: &mut [IoSlice<'t>]) -> usize {
577 self.bytes_vec_called.set(true);
578 self.inner.bytes_vectored(dst)
579 }
580 }
581
582 impl<'a, B: Buf + 'a> Drop for WriteBufAuto<'a, B> {
drop(&mut self)583 fn drop(&mut self) {
584 if let WriteStrategy::Auto = self.inner.strategy {
585 if self.bytes_vec_called.get() {
586 self.inner.strategy = WriteStrategy::Queue;
587 } else if self.bytes_called.get() {
588 trace!("detected no usage of vectored write, flattening");
589 self.inner.strategy = WriteStrategy::Flatten;
590 self.inner.headers.bytes.put(&mut self.inner.queue);
591 }
592 }
593 }
594 }
595
596 #[derive(Debug)]
597 enum WriteStrategy {
598 Auto,
599 Flatten,
600 Queue,
601 }
602
603 #[cfg(test)]
604 mod tests {
605 use super::*;
606 use std::time::Duration;
607
608 use tokio_test::io::Builder as Mock;
609
610 #[cfg(feature = "nightly")]
611 use test::Bencher;
612
613 /*
614 impl<T: Read> MemRead for AsyncIo<T> {
615 fn read_mem(&mut self, len: usize) -> Poll<Bytes, io::Error> {
616 let mut v = vec![0; len];
617 let n = try_nb!(self.read(v.as_mut_slice()));
618 Ok(Async::Ready(BytesMut::from(&v[..n]).freeze()))
619 }
620 }
621 */
622
623 #[tokio::test]
iobuf_write_empty_slice()624 async fn iobuf_write_empty_slice() {
625 // First, let's just check that the Mock would normally return an
626 // error on an unexpected write, even if the buffer is empty...
627 let mut mock = Mock::new().build();
628 futures_util::future::poll_fn(|cx| {
629 Pin::new(&mut mock).poll_write_buf(cx, &mut Cursor::new(&[]))
630 })
631 .await
632 .expect_err("should be a broken pipe");
633
634 // underlying io will return the logic error upon write,
635 // so we are testing that the io_buf does not trigger a write
636 // when there is nothing to flush
637 let mock = Mock::new().build();
638 let mut io_buf = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
639 io_buf.flush().await.expect("should short-circuit flush");
640 }
641
642 #[tokio::test]
parse_reads_until_blocked()643 async fn parse_reads_until_blocked() {
644 use crate::proto::h1::ClientTransaction;
645
646 let mock = Mock::new()
647 // Split over multiple reads will read all of it
648 .read(b"HTTP/1.1 200 OK\r\n")
649 .read(b"Server: hyper\r\n")
650 // missing last line ending
651 .wait(Duration::from_secs(1))
652 .build();
653
654 let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
655
656 // We expect a `parse` to be not ready, and so can't await it directly.
657 // Rather, this `poll_fn` will wrap the `Poll` result.
658 futures_util::future::poll_fn(|cx| {
659 let parse_ctx = ParseContext {
660 cached_headers: &mut None,
661 req_method: &mut None,
662 };
663 assert!(buffered
664 .parse::<ClientTransaction>(cx, parse_ctx)
665 .is_pending());
666 Poll::Ready(())
667 })
668 .await;
669
670 assert_eq!(
671 buffered.read_buf,
672 b"HTTP/1.1 200 OK\r\nServer: hyper\r\n"[..]
673 );
674 }
675
676 #[test]
read_strategy_adaptive_increments()677 fn read_strategy_adaptive_increments() {
678 let mut strategy = ReadStrategy::default();
679 assert_eq!(strategy.next(), 8192);
680
681 // Grows if record == next
682 strategy.record(8192);
683 assert_eq!(strategy.next(), 16384);
684
685 strategy.record(16384);
686 assert_eq!(strategy.next(), 32768);
687
688 // Enormous records still increment at same rate
689 strategy.record(::std::usize::MAX);
690 assert_eq!(strategy.next(), 65536);
691
692 let max = strategy.max();
693 while strategy.next() < max {
694 strategy.record(max);
695 }
696
697 assert_eq!(strategy.next(), max, "never goes over max");
698 strategy.record(max + 1);
699 assert_eq!(strategy.next(), max, "never goes over max");
700 }
701
702 #[test]
read_strategy_adaptive_decrements()703 fn read_strategy_adaptive_decrements() {
704 let mut strategy = ReadStrategy::default();
705 strategy.record(8192);
706 assert_eq!(strategy.next(), 16384);
707
708 strategy.record(1);
709 assert_eq!(
710 strategy.next(),
711 16384,
712 "first smaller record doesn't decrement yet"
713 );
714 strategy.record(8192);
715 assert_eq!(strategy.next(), 16384, "record was with range");
716
717 strategy.record(1);
718 assert_eq!(
719 strategy.next(),
720 16384,
721 "in-range record should make this the 'first' again"
722 );
723
724 strategy.record(1);
725 assert_eq!(strategy.next(), 8192, "second smaller record decrements");
726
727 strategy.record(1);
728 assert_eq!(strategy.next(), 8192, "first doesn't decrement");
729 strategy.record(1);
730 assert_eq!(strategy.next(), 8192, "doesn't decrement under minimum");
731 }
732
733 #[test]
read_strategy_adaptive_stays_the_same()734 fn read_strategy_adaptive_stays_the_same() {
735 let mut strategy = ReadStrategy::default();
736 strategy.record(8192);
737 assert_eq!(strategy.next(), 16384);
738
739 strategy.record(8193);
740 assert_eq!(
741 strategy.next(),
742 16384,
743 "first smaller record doesn't decrement yet"
744 );
745
746 strategy.record(8193);
747 assert_eq!(
748 strategy.next(),
749 16384,
750 "with current step does not decrement"
751 );
752 }
753
754 #[test]
read_strategy_adaptive_max_fuzz()755 fn read_strategy_adaptive_max_fuzz() {
756 fn fuzz(max: usize) {
757 let mut strategy = ReadStrategy::with_max(max);
758 while strategy.next() < max {
759 strategy.record(::std::usize::MAX);
760 }
761 let mut next = strategy.next();
762 while next > 8192 {
763 strategy.record(1);
764 strategy.record(1);
765 next = strategy.next();
766 assert!(
767 next.is_power_of_two(),
768 "decrement should be powers of two: {} (max = {})",
769 next,
770 max,
771 );
772 }
773 }
774
775 let mut max = 8192;
776 while max < std::usize::MAX {
777 fuzz(max);
778 max = (max / 2).saturating_mul(3);
779 }
780 fuzz(::std::usize::MAX);
781 }
782
783 #[test]
784 #[should_panic]
785 #[cfg(debug_assertions)] // needs to trigger a debug_assert
write_buf_requires_non_empty_bufs()786 fn write_buf_requires_non_empty_bufs() {
787 let mock = Mock::new().build();
788 let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
789
790 buffered.buffer(Cursor::new(Vec::new()));
791 }
792
793 /*
794 TODO: needs tokio_test::io to allow configure write_buf calls
795 #[test]
796 fn write_buf_queue() {
797 let _ = pretty_env_logger::try_init();
798
799 let mock = AsyncIo::new_buf(vec![], 1024);
800 let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
801
802
803 buffered.headers_buf().extend(b"hello ");
804 buffered.buffer(Cursor::new(b"world, ".to_vec()));
805 buffered.buffer(Cursor::new(b"it's ".to_vec()));
806 buffered.buffer(Cursor::new(b"hyper!".to_vec()));
807 assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3);
808 buffered.flush().unwrap();
809
810 assert_eq!(buffered.io, b"hello world, it's hyper!");
811 assert_eq!(buffered.io.num_writes(), 1);
812 assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
813 }
814 */
815
816 #[tokio::test]
write_buf_flatten()817 async fn write_buf_flatten() {
818 let _ = pretty_env_logger::try_init();
819
820 let mock = Mock::new()
821 // Just a single write
822 .write(b"hello world, it's hyper!")
823 .build();
824
825 let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
826 buffered.write_buf.set_strategy(WriteStrategy::Flatten);
827
828 buffered.headers_buf().extend(b"hello ");
829 buffered.buffer(Cursor::new(b"world, ".to_vec()));
830 buffered.buffer(Cursor::new(b"it's ".to_vec()));
831 buffered.buffer(Cursor::new(b"hyper!".to_vec()));
832 assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
833
834 buffered.flush().await.expect("flush");
835 }
836
837 #[tokio::test]
write_buf_auto_flatten()838 async fn write_buf_auto_flatten() {
839 let _ = pretty_env_logger::try_init();
840
841 let mock = Mock::new()
842 // Expects write_buf to only consume first buffer
843 .write(b"hello ")
844 // And then the Auto strategy will have flattened
845 .write(b"world, it's hyper!")
846 .build();
847
848 let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
849
850 // we have 4 buffers, but hope to detect that vectored IO isn't
851 // being used, and switch to flattening automatically,
852 // resulting in only 2 writes
853 buffered.headers_buf().extend(b"hello ");
854 buffered.buffer(Cursor::new(b"world, ".to_vec()));
855 buffered.buffer(Cursor::new(b"it's ".to_vec()));
856 buffered.buffer(Cursor::new(b"hyper!".to_vec()));
857 assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3);
858
859 buffered.flush().await.expect("flush");
860
861 assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
862 }
863
864 #[tokio::test]
write_buf_queue_disable_auto()865 async fn write_buf_queue_disable_auto() {
866 let _ = pretty_env_logger::try_init();
867
868 let mock = Mock::new()
869 .write(b"hello ")
870 .write(b"world, ")
871 .write(b"it's ")
872 .write(b"hyper!")
873 .build();
874
875 let mut buffered = Buffered::<_, Cursor<Vec<u8>>>::new(mock);
876 buffered.write_buf.set_strategy(WriteStrategy::Queue);
877
878 // we have 4 buffers, and vec IO disabled, but explicitly said
879 // don't try to auto detect (via setting strategy above)
880
881 buffered.headers_buf().extend(b"hello ");
882 buffered.buffer(Cursor::new(b"world, ".to_vec()));
883 buffered.buffer(Cursor::new(b"it's ".to_vec()));
884 buffered.buffer(Cursor::new(b"hyper!".to_vec()));
885 assert_eq!(buffered.write_buf.queue.bufs_cnt(), 3);
886
887 buffered.flush().await.expect("flush");
888
889 assert_eq!(buffered.write_buf.queue.bufs_cnt(), 0);
890 }
891
892 #[cfg(feature = "nightly")]
893 #[bench]
bench_write_buf_flatten_buffer_chunk(b: &mut Bencher)894 fn bench_write_buf_flatten_buffer_chunk(b: &mut Bencher) {
895 let s = "Hello, World!";
896 b.bytes = s.len() as u64;
897
898 let mut write_buf = WriteBuf::<bytes::Bytes>::new();
899 write_buf.set_strategy(WriteStrategy::Flatten);
900 b.iter(|| {
901 let chunk = bytes::Bytes::from(s);
902 write_buf.buffer(chunk);
903 ::test::black_box(&write_buf);
904 write_buf.headers.bytes.clear();
905 })
906 }
907 }
908