1 pub mod sync { 2 use std::io::Read; 3 to_vec(mut read: impl Read) -> Vec<u8>4 pub fn to_vec(mut read: impl Read) -> Vec<u8> { 5 let mut output = vec![]; 6 read.read_to_end(&mut output).unwrap(); 7 output 8 } 9 } 10 11 #[cfg(feature = "futures-io")] 12 pub mod futures { 13 pub mod bufread { 14 pub use futures::io::AsyncBufRead; 15 16 use crate::utils::InputStream; 17 use futures::stream::{StreamExt as _, TryStreamExt as _}; 18 from(input: &InputStream) -> impl AsyncBufRead19 pub fn from(input: &InputStream) -> impl AsyncBufRead { 20 // By using the stream here we ensure that each chunk will require a separate 21 // read/poll_fill_buf call to process to help test reading multiple chunks. 22 input.stream().map(Ok).into_async_read() 23 } 24 } 25 26 pub mod read { 27 use crate::utils::{block_on, pin_mut}; 28 use futures::io::{copy_buf, AsyncRead, AsyncReadExt, BufReader, Cursor}; 29 to_vec(read: impl AsyncRead) -> Vec<u8>30 pub fn to_vec(read: impl AsyncRead) -> Vec<u8> { 31 // TODO: https://github.com/rust-lang-nursery/futures-rs/issues/1510 32 // All current test cases are < 100kB 33 let mut output = Cursor::new(vec![0; 102_400]); 34 pin_mut!(read); 35 let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap(); 36 let mut output = output.into_inner(); 37 output.truncate(len as usize); 38 output 39 } 40 poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize>41 pub fn poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize> { 42 pin_mut!(reader); 43 block_on(reader.read(output)) 44 } 45 } 46 47 pub mod write { 48 use crate::utils::{block_on, Pin, TrackClosed}; 49 use futures::io::{AsyncWrite, AsyncWriteExt as _}; 50 use futures_test::io::AsyncWriteTestExt as _; 51 to_vec( input: &[Vec<u8>], create_writer: impl for<'a> FnOnce( &'a mut (dyn AsyncWrite + Unpin), ) -> Pin<Box<dyn AsyncWrite + 'a>>, limit: usize, ) -> Vec<u8>52 pub fn to_vec( 53 input: &[Vec<u8>], 54 create_writer: impl for<'a> FnOnce( 55 &'a mut (dyn AsyncWrite + Unpin), 56 ) -> Pin<Box<dyn AsyncWrite + 'a>>, 57 limit: usize, 58 ) -> Vec<u8> { 59 let mut output = Vec::new(); 60 { 61 let mut test_writer = TrackClosed::new( 62 (&mut output) 63 .limited_write(limit) 64 .interleave_pending_write(), 65 ); 66 { 67 let mut writer = create_writer(&mut test_writer); 68 for chunk in input { 69 block_on(writer.write_all(chunk)).unwrap(); 70 block_on(writer.flush()).unwrap(); 71 } 72 block_on(writer.close()).unwrap(); 73 } 74 assert!(test_writer.is_closed()); 75 } 76 output 77 } 78 } 79 } 80 81 #[cfg(feature = "stream")] 82 #[allow(deprecated)] 83 pub mod stream { 84 use crate::utils::{block_on, pin_mut, Result}; 85 use bytes_05::Bytes; 86 use futures::stream::{Stream, TryStreamExt as _}; 87 to_vec(stream: impl Stream<Item = Result<Bytes>>) -> Vec<u8>88 pub fn to_vec(stream: impl Stream<Item = Result<Bytes>>) -> Vec<u8> { 89 pin_mut!(stream); 90 block_on(stream.try_collect::<Vec<_>>()) 91 .unwrap() 92 .into_iter() 93 .flatten() 94 .collect() 95 } 96 } 97 98 #[cfg(feature = "tokio-02")] 99 pub mod tokio_02 { 100 pub mod bufread { 101 pub use tokio_02::io::AsyncBufRead; 102 103 use crate::utils::InputStream; 104 use tokio_02::io::stream_reader; 105 from(input: &InputStream) -> impl AsyncBufRead106 pub fn from(input: &InputStream) -> impl AsyncBufRead { 107 // By using the stream here we ensure that each chunk will require a separate 108 // read/poll_fill_buf call to process to help test reading multiple chunks. 109 stream_reader(input.bytes_05_stream()) 110 } 111 } 112 113 pub mod read { 114 use crate::utils::{block_on, pin_mut, tokio_02_ext::copy_buf}; 115 use std::io::Cursor; 116 use tokio_02::io::{AsyncRead, AsyncReadExt, BufReader}; 117 to_vec(read: impl AsyncRead) -> Vec<u8>118 pub fn to_vec(read: impl AsyncRead) -> Vec<u8> { 119 let mut output = Cursor::new(vec![0; 102_400]); 120 pin_mut!(read); 121 let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap(); 122 let mut output = output.into_inner(); 123 output.truncate(len as usize); 124 output 125 } 126 poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize>127 pub fn poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize> { 128 pin_mut!(reader); 129 block_on(reader.read(output)) 130 } 131 } 132 133 pub mod write { 134 use crate::utils::{ 135 block_on, tokio_02_ext::AsyncWriteTestExt as _, track_closed::TrackClosed, Pin, 136 }; 137 use std::io::Cursor; 138 use tokio_02::io::{AsyncWrite, AsyncWriteExt as _}; 139 to_vec( input: &[Vec<u8>], create_writer: impl for<'a> FnOnce( &'a mut (dyn AsyncWrite + Unpin), ) -> Pin<Box<dyn AsyncWrite + 'a>>, limit: usize, ) -> Vec<u8>140 pub fn to_vec( 141 input: &[Vec<u8>], 142 create_writer: impl for<'a> FnOnce( 143 &'a mut (dyn AsyncWrite + Unpin), 144 ) -> Pin<Box<dyn AsyncWrite + 'a>>, 145 limit: usize, 146 ) -> Vec<u8> { 147 let mut output = Cursor::new(Vec::new()); 148 { 149 let mut test_writer = TrackClosed::new( 150 (&mut output) 151 .limited_write(limit) 152 .interleave_pending_write(), 153 ); 154 { 155 let mut writer = create_writer(&mut test_writer); 156 for chunk in input { 157 block_on(writer.write_all(chunk)).unwrap(); 158 block_on(writer.flush()).unwrap(); 159 } 160 block_on(writer.shutdown()).unwrap(); 161 } 162 assert!(test_writer.is_closed()); 163 } 164 output.into_inner() 165 } 166 } 167 } 168 169 #[cfg(feature = "tokio-03")] 170 pub mod tokio_03 { 171 pub mod bufread { 172 use crate::utils::InputStream; 173 pub use tokio_03::io::AsyncBufRead; 174 use tokio_util_04::io::StreamReader; 175 from(input: &InputStream) -> impl AsyncBufRead176 pub fn from(input: &InputStream) -> impl AsyncBufRead { 177 // By using the stream here we ensure that each chunk will require a separate 178 // read/poll_fill_buf call to process to help test reading multiple chunks. 179 StreamReader::new(input.bytes_05_stream()) 180 } 181 } 182 183 pub mod read { 184 use crate::utils::{block_on, pin_mut, tokio_03_ext::copy_buf}; 185 use std::io::Cursor; 186 use tokio_03::io::{AsyncRead, AsyncReadExt, BufReader}; 187 to_vec(read: impl AsyncRead) -> Vec<u8>188 pub fn to_vec(read: impl AsyncRead) -> Vec<u8> { 189 let mut output = Cursor::new(vec![0; 102_400]); 190 pin_mut!(read); 191 let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap(); 192 let mut output = output.into_inner(); 193 output.truncate(len as usize); 194 output 195 } 196 poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize>197 pub fn poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize> { 198 pin_mut!(reader); 199 block_on(reader.read(output)) 200 } 201 } 202 203 pub mod write { 204 use crate::utils::{ 205 block_on, tokio_03_ext::AsyncWriteTestExt as _, track_closed::TrackClosed, Pin, 206 }; 207 use std::io::Cursor; 208 use tokio_03::io::{AsyncWrite, AsyncWriteExt as _}; 209 to_vec( input: &[Vec<u8>], create_writer: impl for<'a> FnOnce( &'a mut (dyn AsyncWrite + Unpin), ) -> Pin<Box<dyn AsyncWrite + 'a>>, limit: usize, ) -> Vec<u8>210 pub fn to_vec( 211 input: &[Vec<u8>], 212 create_writer: impl for<'a> FnOnce( 213 &'a mut (dyn AsyncWrite + Unpin), 214 ) -> Pin<Box<dyn AsyncWrite + 'a>>, 215 limit: usize, 216 ) -> Vec<u8> { 217 let mut output = Cursor::new(Vec::new()); 218 { 219 let mut test_writer = TrackClosed::new( 220 (&mut output) 221 .limited_write(limit) 222 .interleave_pending_write(), 223 ); 224 { 225 let mut writer = create_writer(&mut test_writer); 226 for chunk in input { 227 block_on(writer.write_all(chunk)).unwrap(); 228 block_on(writer.flush()).unwrap(); 229 } 230 block_on(writer.shutdown()).unwrap(); 231 } 232 assert!(test_writer.is_closed()); 233 } 234 output.into_inner() 235 } 236 } 237 } 238 239 #[cfg(feature = "tokio")] 240 pub mod tokio { 241 pub mod bufread { 242 use crate::utils::InputStream; 243 use bytes::Bytes; 244 use futures::stream::StreamExt; 245 pub use tokio::io::AsyncBufRead; 246 use tokio_util_06::io::StreamReader; 247 from(input: &InputStream) -> impl AsyncBufRead248 pub fn from(input: &InputStream) -> impl AsyncBufRead { 249 // By using the stream here we ensure that each chunk will require a separate 250 // read/poll_fill_buf call to process to help test reading multiple chunks. 251 StreamReader::new(input.stream().map(Bytes::from).map(std::io::Result::Ok)) 252 } 253 } 254 255 pub mod read { 256 use crate::utils::{block_on, pin_mut, tokio_ext::copy_buf}; 257 use std::io::Cursor; 258 use tokio::io::{AsyncRead, AsyncReadExt, BufReader}; 259 to_vec(read: impl AsyncRead) -> Vec<u8>260 pub fn to_vec(read: impl AsyncRead) -> Vec<u8> { 261 let mut output = Cursor::new(vec![0; 102_400]); 262 pin_mut!(read); 263 let len = block_on(copy_buf(BufReader::with_capacity(2, read), &mut output)).unwrap(); 264 let mut output = output.into_inner(); 265 output.truncate(len as usize); 266 output 267 } 268 poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize>269 pub fn poll_read(reader: impl AsyncRead, output: &mut [u8]) -> std::io::Result<usize> { 270 pin_mut!(reader); 271 block_on(reader.read(output)) 272 } 273 } 274 275 pub mod write { 276 use crate::utils::{ 277 block_on, tokio_ext::AsyncWriteTestExt as _, track_closed::TrackClosed, Pin, 278 }; 279 use std::io::Cursor; 280 use tokio::io::{AsyncWrite, AsyncWriteExt as _}; 281 to_vec( input: &[Vec<u8>], create_writer: impl for<'a> FnOnce( &'a mut (dyn AsyncWrite + Unpin), ) -> Pin<Box<dyn AsyncWrite + 'a>>, limit: usize, ) -> Vec<u8>282 pub fn to_vec( 283 input: &[Vec<u8>], 284 create_writer: impl for<'a> FnOnce( 285 &'a mut (dyn AsyncWrite + Unpin), 286 ) -> Pin<Box<dyn AsyncWrite + 'a>>, 287 limit: usize, 288 ) -> Vec<u8> { 289 let mut output = Cursor::new(Vec::new()); 290 { 291 let mut test_writer = TrackClosed::new( 292 (&mut output) 293 .limited_write(limit) 294 .interleave_pending_write(), 295 ); 296 { 297 let mut writer = create_writer(&mut test_writer); 298 for chunk in input { 299 block_on(writer.write_all(chunk)).unwrap(); 300 block_on(writer.flush()).unwrap(); 301 } 302 block_on(writer.shutdown()).unwrap(); 303 } 304 assert!(test_writer.is_closed()); 305 } 306 output.into_inner() 307 } 308 } 309 } 310