1 pub mod sync {
2     use std::io::Read;
3 
to_vec(mut read: impl Read) -> Vec<u8>4     pub fn to_vec(mut read: impl Read) -> Vec<u8> {
5         let mut output = vec![];
6         read.read_to_end(&mut output).unwrap();
7         output
8     }
9 }
10 
11 #[cfg(feature = "futures-io")]
12 pub mod futures {
13     pub mod bufread {
14         pub use futures::io::AsyncBufRead;
15 
16         use crate::utils::InputStream;
17         use futures::stream::{StreamExt as _, TryStreamExt as _};
18 
from(input: &InputStream) -> impl AsyncBufRead19         pub fn from(input: &InputStream) -> impl AsyncBufRead {
20             // By using the stream here we ensure that each chunk will require a separate
21             // read/poll_fill_buf call to process to help test reading multiple chunks.
22             input.stream().map(Ok).into_async_read()
23         }
24     }
25 
26     pub mod read {
27         use crate::utils::{block_on, pin_mut};
28         use futures::io::{copy_buf, AsyncRead, AsyncReadExt, BufReader, Cursor};
29 
to_vec(read: impl AsyncRead) -> Vec<u8>30         pub fn to_vec(read: impl AsyncRead) -> Vec<u8> {
31             // TODO: https://github.com/rust-lang-nursery/futures-rs/issues/1510
32             // All current test cases are < 100kB
33             let mut output = Cursor::new(vec![0; 102_400]);
34             pin_mut!(read);
35             let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap();
36             let mut output = output.into_inner();
37             output.truncate(len as usize);
38             output
39         }
40 
poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize>41         pub fn poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize> {
42             pin_mut!(reader);
43             block_on(reader.read(output))
44         }
45     }
46 
47     pub mod write {
48         use crate::utils::{block_on, Pin, TrackClosed};
49         use futures::io::{AsyncWrite, AsyncWriteExt as _};
50         use futures_test::io::AsyncWriteTestExt as _;
51 
to_vec( input: &[Vec<u8>], create_writer: impl for<'a> FnOnce( &'a mut (dyn AsyncWrite + Unpin), ) -> Pin<Box<dyn AsyncWrite + 'a>>, limit: usize, ) -> Vec<u8>52         pub fn to_vec(
53             input: &[Vec<u8>],
54             create_writer: impl for<'a> FnOnce(
55                 &'a mut (dyn AsyncWrite + Unpin),
56             ) -> Pin<Box<dyn AsyncWrite + 'a>>,
57             limit: usize,
58         ) -> Vec<u8> {
59             let mut output = Vec::new();
60             {
61                 let mut test_writer = TrackClosed::new(
62                     (&mut output)
63                         .limited_write(limit)
64                         .interleave_pending_write(),
65                 );
66                 {
67                     let mut writer = create_writer(&mut test_writer);
68                     for chunk in input {
69                         block_on(writer.write_all(chunk)).unwrap();
70                         block_on(writer.flush()).unwrap();
71                     }
72                     block_on(writer.close()).unwrap();
73                 }
74                 assert!(test_writer.is_closed());
75             }
76             output
77         }
78     }
79 }
80 
81 #[cfg(feature = "stream")]
82 #[allow(deprecated)]
83 pub mod stream {
84     use crate::utils::{block_on, pin_mut, Result};
85     use bytes_05::Bytes;
86     use futures::stream::{Stream, TryStreamExt as _};
87 
to_vec(stream: impl Stream<Item = Result<Bytes>>) -> Vec<u8>88     pub fn to_vec(stream: impl Stream<Item = Result<Bytes>>) -> Vec<u8> {
89         pin_mut!(stream);
90         block_on(stream.try_collect::<Vec<_>>())
91             .unwrap()
92             .into_iter()
93             .flatten()
94             .collect()
95     }
96 }
97 
98 #[cfg(feature = "tokio-02")]
99 pub mod tokio_02 {
100     pub mod bufread {
101         pub use tokio_02::io::AsyncBufRead;
102 
103         use crate::utils::InputStream;
104         use tokio_02::io::stream_reader;
105 
from(input: &InputStream) -> impl AsyncBufRead106         pub fn from(input: &InputStream) -> impl AsyncBufRead {
107             // By using the stream here we ensure that each chunk will require a separate
108             // read/poll_fill_buf call to process to help test reading multiple chunks.
109             stream_reader(input.bytes_05_stream())
110         }
111     }
112 
113     pub mod read {
114         use crate::utils::{block_on, pin_mut, tokio_02_ext::copy_buf};
115         use std::io::Cursor;
116         use tokio_02::io::{AsyncRead, AsyncReadExt, BufReader};
117 
to_vec(read: impl AsyncRead) -> Vec<u8>118         pub fn to_vec(read: impl AsyncRead) -> Vec<u8> {
119             let mut output = Cursor::new(vec![0; 102_400]);
120             pin_mut!(read);
121             let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap();
122             let mut output = output.into_inner();
123             output.truncate(len as usize);
124             output
125         }
126 
poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize>127         pub fn poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize> {
128             pin_mut!(reader);
129             block_on(reader.read(output))
130         }
131     }
132 
133     pub mod write {
134         use crate::utils::{
135             block_on, tokio_02_ext::AsyncWriteTestExt as _, track_closed::TrackClosed, Pin,
136         };
137         use std::io::Cursor;
138         use tokio_02::io::{AsyncWrite, AsyncWriteExt as _};
139 
to_vec( input: &[Vec<u8>], create_writer: impl for<'a> FnOnce( &'a mut (dyn AsyncWrite + Unpin), ) -> Pin<Box<dyn AsyncWrite + 'a>>, limit: usize, ) -> Vec<u8>140         pub fn to_vec(
141             input: &[Vec<u8>],
142             create_writer: impl for<'a> FnOnce(
143                 &'a mut (dyn AsyncWrite + Unpin),
144             ) -> Pin<Box<dyn AsyncWrite + 'a>>,
145             limit: usize,
146         ) -> Vec<u8> {
147             let mut output = Cursor::new(Vec::new());
148             {
149                 let mut test_writer = TrackClosed::new(
150                     (&mut output)
151                         .limited_write(limit)
152                         .interleave_pending_write(),
153                 );
154                 {
155                     let mut writer = create_writer(&mut test_writer);
156                     for chunk in input {
157                         block_on(writer.write_all(chunk)).unwrap();
158                         block_on(writer.flush()).unwrap();
159                     }
160                     block_on(writer.shutdown()).unwrap();
161                 }
162                 assert!(test_writer.is_closed());
163             }
164             output.into_inner()
165         }
166     }
167 }
168 
169 #[cfg(feature = "tokio-03")]
170 pub mod tokio_03 {
171     pub mod bufread {
172         use crate::utils::InputStream;
173         pub use tokio_03::io::AsyncBufRead;
174         use tokio_util_04::io::StreamReader;
175 
from(input: &InputStream) -> impl AsyncBufRead176         pub fn from(input: &InputStream) -> impl AsyncBufRead {
177             // By using the stream here we ensure that each chunk will require a separate
178             // read/poll_fill_buf call to process to help test reading multiple chunks.
179             StreamReader::new(input.bytes_05_stream())
180         }
181     }
182 
183     pub mod read {
184         use crate::utils::{block_on, pin_mut, tokio_03_ext::copy_buf};
185         use std::io::Cursor;
186         use tokio_03::io::{AsyncRead, AsyncReadExt, BufReader};
187 
to_vec(read: impl AsyncRead) -> Vec<u8>188         pub fn to_vec(read: impl AsyncRead) -> Vec<u8> {
189             let mut output = Cursor::new(vec![0; 102_400]);
190             pin_mut!(read);
191             let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap();
192             let mut output = output.into_inner();
193             output.truncate(len as usize);
194             output
195         }
196 
poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize>197         pub fn poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize> {
198             pin_mut!(reader);
199             block_on(reader.read(output))
200         }
201     }
202 
203     pub mod write {
204         use crate::utils::{
205             block_on, tokio_03_ext::AsyncWriteTestExt as _, track_closed::TrackClosed, Pin,
206         };
207         use std::io::Cursor;
208         use tokio_03::io::{AsyncWrite, AsyncWriteExt as _};
209 
to_vec( input: &[Vec<u8>], create_writer: impl for<'a> FnOnce( &'a mut (dyn AsyncWrite + Unpin), ) -> Pin<Box<dyn AsyncWrite + 'a>>, limit: usize, ) -> Vec<u8>210         pub fn to_vec(
211             input: &[Vec<u8>],
212             create_writer: impl for<'a> FnOnce(
213                 &'a mut (dyn AsyncWrite + Unpin),
214             ) -> Pin<Box<dyn AsyncWrite + 'a>>,
215             limit: usize,
216         ) -> Vec<u8> {
217             let mut output = Cursor::new(Vec::new());
218             {
219                 let mut test_writer = TrackClosed::new(
220                     (&mut output)
221                         .limited_write(limit)
222                         .interleave_pending_write(),
223                 );
224                 {
225                     let mut writer = create_writer(&mut test_writer);
226                     for chunk in input {
227                         block_on(writer.write_all(chunk)).unwrap();
228                         block_on(writer.flush()).unwrap();
229                     }
230                     block_on(writer.shutdown()).unwrap();
231                 }
232                 assert!(test_writer.is_closed());
233             }
234             output.into_inner()
235         }
236     }
237 }
238 
239 #[cfg(feature = "tokio")]
240 pub mod tokio {
241     pub mod bufread {
242         use crate::utils::InputStream;
243         use bytes::Bytes;
244         use futures::stream::StreamExt;
245         pub use tokio::io::AsyncBufRead;
246         use tokio_util_06::io::StreamReader;
247 
from(input: &InputStream) -> impl AsyncBufRead248         pub fn from(input: &InputStream) -> impl AsyncBufRead {
249             // By using the stream here we ensure that each chunk will require a separate
250             // read/poll_fill_buf call to process to help test reading multiple chunks.
251             StreamReader::new(input.stream().map(Bytes::from).map(std::io::Result::Ok))
252         }
253     }
254 
255     pub mod read {
256         use crate::utils::{block_on, pin_mut, tokio_ext::copy_buf};
257         use std::io::Cursor;
258         use tokio::io::{AsyncRead, AsyncReadExt, BufReader};
259 
to_vec(read: impl AsyncRead) -> Vec<u8>260         pub fn to_vec(read: impl AsyncRead) -> Vec<u8> {
261             let mut output = Cursor::new(vec![0; 102_400]);
262             pin_mut!(read);
263             let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap();
264             let mut output = output.into_inner();
265             output.truncate(len as usize);
266             output
267         }
268 
poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize>269         pub fn poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize> {
270             pin_mut!(reader);
271             block_on(reader.read(output))
272         }
273     }
274 
275     pub mod write {
276         use crate::utils::{
277             block_on, tokio_ext::AsyncWriteTestExt as _, track_closed::TrackClosed, Pin,
278         };
279         use std::io::Cursor;
280         use tokio::io::{AsyncWrite, AsyncWriteExt as _};
281 
to_vec( input: &[Vec<u8>], create_writer: impl for<'a> FnOnce( &'a mut (dyn AsyncWrite + Unpin), ) -> Pin<Box<dyn AsyncWrite + 'a>>, limit: usize, ) -> Vec<u8>282         pub fn to_vec(
283             input: &[Vec<u8>],
284             create_writer: impl for<'a> FnOnce(
285                 &'a mut (dyn AsyncWrite + Unpin),
286             ) -> Pin<Box<dyn AsyncWrite + 'a>>,
287             limit: usize,
288         ) -> Vec<u8> {
289             let mut output = Cursor::new(Vec::new());
290             {
291                 let mut test_writer = TrackClosed::new(
292                     (&mut output)
293                         .limited_write(limit)
294                         .interleave_pending_write(),
295                 );
296                 {
297                     let mut writer = create_writer(&mut test_writer);
298                     for chunk in input {
299                         block_on(writer.write_all(chunk)).unwrap();
300                         block_on(writer.flush()).unwrap();
301                     }
302                     block_on(writer.shutdown()).unwrap();
303                 }
304                 assert!(test_writer.is_closed());
305             }
306             output.into_inner()
307         }
308     }
309 }
310