1 use crate::io::AsyncRead;
2
3 use std::future::Future;
4 use std::io;
5 use std::mem::MaybeUninit;
6 use std::pin::Pin;
7 use std::task::{Context, Poll};
8
9 #[derive(Debug)]
10 #[must_use = "futures do nothing unless you `.await` or poll them"]
11 #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
12 pub struct ReadToEnd<'a, R: ?Sized> {
13 reader: &'a mut R,
14 buf: &'a mut Vec<u8>,
15 start_len: usize,
16 }
17
read_to_end<'a, R>(reader: &'a mut R, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, R> where R: AsyncRead + Unpin + ?Sized,18 pub(crate) fn read_to_end<'a, R>(reader: &'a mut R, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, R>
19 where
20 R: AsyncRead + Unpin + ?Sized,
21 {
22 let start_len = buf.len();
23 ReadToEnd {
24 reader,
25 buf,
26 start_len,
27 }
28 }
29
30 struct Guard<'a> {
31 buf: &'a mut Vec<u8>,
32 len: usize,
33 }
34
35 impl Drop for Guard<'_> {
drop(&mut self)36 fn drop(&mut self) {
37 unsafe {
38 self.buf.set_len(self.len);
39 }
40 }
41 }
42
43 // This uses an adaptive system to extend the vector when it fills. We want to
44 // avoid paying to allocate and zero a huge chunk of memory if the reader only
45 // has 4 bytes while still making large reads if the reader does have a ton
46 // of data to return. Simply tacking on an extra DEFAULT_BUF_SIZE space every
47 // time is 4,500 times (!) slower than this if the reader has a very small
48 // amount of data to return.
49 //
50 // Because we're extending the buffer with uninitialized data for trusted
51 // readers, we need to make sure to truncate that if any of this panics.
read_to_end_internal<R: AsyncRead + ?Sized>( mut rd: Pin<&mut R>, cx: &mut Context<'_>, buf: &mut Vec<u8>, start_len: usize, ) -> Poll<io::Result<usize>>52 pub(super) fn read_to_end_internal<R: AsyncRead + ?Sized>(
53 mut rd: Pin<&mut R>,
54 cx: &mut Context<'_>,
55 buf: &mut Vec<u8>,
56 start_len: usize,
57 ) -> Poll<io::Result<usize>> {
58 let mut g = Guard {
59 len: buf.len(),
60 buf,
61 };
62 let ret;
63 loop {
64 if g.len == g.buf.len() {
65 unsafe {
66 g.buf.reserve(32);
67 let capacity = g.buf.capacity();
68 g.buf.set_len(capacity);
69
70 let b = &mut *(&mut g.buf[g.len..] as *mut [u8] as *mut [MaybeUninit<u8>]);
71
72 rd.prepare_uninitialized_buffer(b);
73 }
74 }
75
76 match ready!(rd.as_mut().poll_read(cx, &mut g.buf[g.len..])) {
77 Ok(0) => {
78 ret = Poll::Ready(Ok(g.len - start_len));
79 break;
80 }
81 Ok(n) => g.len += n,
82 Err(e) => {
83 ret = Poll::Ready(Err(e));
84 break;
85 }
86 }
87 }
88
89 ret
90 }
91
92 impl<A> Future for ReadToEnd<'_, A>
93 where
94 A: AsyncRead + ?Sized + Unpin,
95 {
96 type Output = io::Result<usize>;
97
poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>98 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
99 let this = &mut *self;
100 read_to_end_internal(Pin::new(&mut this.reader), cx, this.buf, this.start_len)
101 }
102 }
103
104 #[cfg(test)]
105 mod tests {
106 use super::*;
107
108 #[test]
assert_unpin()109 fn assert_unpin() {
110 use std::marker::PhantomPinned;
111 crate::is_unpin::<ReadToEnd<'_, PhantomPinned>>();
112 }
113 }
114