1 //! Asynchronous I/O.
2 //!
3 //! This module is the asynchronous version of `std::io`. It defines four
4 //! traits, [`AsyncRead`], [`AsyncWrite`], [`AsyncSeek`], and [`AsyncBufRead`],
5 //! which mirror the `Read`, `Write`, `Seek`, and `BufRead` traits of the
6 //! standard library. However, these traits integrate with the asynchronous
7 //! task system, so that if an I/O object isn't ready for reading (or writing),
8 //! the thread is not blocked, and instead the current task is queued to be
9 //! woken when I/O is ready.
10 //!
11 //! In addition, the [`AsyncReadExt`], [`AsyncWriteExt`], [`AsyncSeekExt`], and
12 //! [`AsyncBufReadExt`] extension traits offer a variety of useful combinators
13 //! for operating with asynchronous I/O objects, including ways to work with
14 //! them using futures, streams and sinks.
15 //!
16 //! This module is only available when the `std` feature of this
17 //! library is activated, and it is activated by default.
18
19 #[cfg(feature = "io-compat")]
20 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
21 use crate::compat::Compat;
22 use crate::future::assert_future;
23 use crate::stream::assert_stream;
24 use std::{ptr, pin::Pin};
25
26 // Re-export some types from `std::io` so that users don't have to deal
27 // with conflicts when `use`ing `futures::io` and `std::io`.
28 #[doc(no_inline)]
29 pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
30 #[doc(no_inline)]
31 #[cfg(feature = "read-initializer")]
32 #[cfg_attr(docsrs, doc(cfg(feature = "read-initializer")))]
33 pub use std::io::Initializer;
34
35 pub use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead};
36
37 // used by `BufReader` and `BufWriter`
38 // https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
39 const DEFAULT_BUF_SIZE: usize = 8 * 1024;
40
41 /// Initializes a buffer if necessary.
42 ///
43 /// A buffer is always initialized if `read-initializer` feature is disabled.
44 #[inline]
initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8])45 unsafe fn initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8]) {
46 #[cfg(feature = "read-initializer")]
47 {
48 if !_reader.initializer().should_initialize() {
49 return;
50 }
51 }
52 ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len())
53 }
54
55 mod allow_std;
56 pub use self::allow_std::AllowStdIo;
57
58 mod buf_reader;
59 pub use self::buf_reader::BufReader;
60
61 mod buf_writer;
62 pub use self::buf_writer::BufWriter;
63
64 mod chain;
65 pub use self::chain::Chain;
66
67 mod close;
68 pub use self::close::Close;
69
70 mod copy;
71 pub use self::copy::{copy, Copy};
72
73 mod copy_buf;
74 pub use self::copy_buf::{copy_buf, CopyBuf};
75
76 mod cursor;
77 pub use self::cursor::Cursor;
78
79 mod empty;
80 pub use self::empty::{empty, Empty};
81
82 mod fill_buf;
83 pub use self::fill_buf::FillBuf;
84
85 mod flush;
86 pub use self::flush::Flush;
87
88 #[cfg(feature = "sink")]
89 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
90 mod into_sink;
91 #[cfg(feature = "sink")]
92 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
93 pub use self::into_sink::IntoSink;
94
95 mod lines;
96 pub use self::lines::Lines;
97
98 mod read;
99 pub use self::read::Read;
100
101 mod read_vectored;
102 pub use self::read_vectored::ReadVectored;
103
104 mod read_exact;
105 pub use self::read_exact::ReadExact;
106
107 mod read_line;
108 pub use self::read_line::ReadLine;
109
110 mod read_to_end;
111 pub use self::read_to_end::ReadToEnd;
112
113 mod read_to_string;
114 pub use self::read_to_string::ReadToString;
115
116 mod read_until;
117 pub use self::read_until::ReadUntil;
118
119 mod repeat;
120 pub use self::repeat::{repeat, Repeat};
121
122 mod seek;
123 pub use self::seek::Seek;
124
125 mod sink;
126 pub use self::sink::{sink, Sink};
127
128 mod split;
129 pub use self::split::{ReadHalf, WriteHalf, ReuniteError};
130
131 mod take;
132 pub use self::take::Take;
133
134 mod window;
135 pub use self::window::Window;
136
137 mod write;
138 pub use self::write::Write;
139
140 mod write_vectored;
141 pub use self::write_vectored::WriteVectored;
142
143 mod write_all;
144 pub use self::write_all::WriteAll;
145
146 #[cfg(feature = "write-all-vectored")]
147 mod write_all_vectored;
148 #[cfg(feature = "write-all-vectored")]
149 pub use self::write_all_vectored::WriteAllVectored;
150
151 /// An extension trait which adds utility methods to `AsyncRead` types.
152 pub trait AsyncReadExt: AsyncRead {
153 /// Creates an adaptor which will chain this stream with another.
154 ///
155 /// The returned `AsyncRead` instance will first read all bytes from this object
156 /// until EOF is encountered. Afterwards the output is equivalent to the
157 /// output of `next`.
158 ///
159 /// # Examples
160 ///
161 /// ```
162 /// # futures::executor::block_on(async {
163 /// use futures::io::{AsyncReadExt, Cursor};
164 ///
165 /// let reader1 = Cursor::new([1, 2, 3, 4]);
166 /// let reader2 = Cursor::new([5, 6, 7, 8]);
167 ///
168 /// let mut reader = reader1.chain(reader2);
169 /// let mut buffer = Vec::new();
170 ///
171 /// // read the value into a Vec.
172 /// reader.read_to_end(&mut buffer).await?;
173 /// assert_eq!(buffer, [1, 2, 3, 4, 5, 6, 7, 8]);
174 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
175 /// ```
chain<R>(self, next: R) -> Chain<Self, R> where Self: Sized, R: AsyncRead,176 fn chain<R>(self, next: R) -> Chain<Self, R>
177 where
178 Self: Sized,
179 R: AsyncRead,
180 {
181 assert_read(Chain::new(self, next))
182 }
183
184 /// Tries to read some bytes directly into the given `buf` in asynchronous
185 /// manner, returning a future type.
186 ///
187 /// The returned future will resolve to the number of bytes read once the read
188 /// operation is completed.
189 ///
190 /// # Examples
191 ///
192 /// ```
193 /// # futures::executor::block_on(async {
194 /// use futures::io::{AsyncReadExt, Cursor};
195 ///
196 /// let mut reader = Cursor::new([1, 2, 3, 4]);
197 /// let mut output = [0u8; 5];
198 ///
199 /// let bytes = reader.read(&mut output[..]).await?;
200 ///
201 /// // This is only guaranteed to be 4 because `&[u8]` is a synchronous
202 /// // reader. In a real system you could get anywhere from 1 to
203 /// // `output.len()` bytes in a single read.
204 /// assert_eq!(bytes, 4);
205 /// assert_eq!(output, [1, 2, 3, 4, 0]);
206 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
207 /// ```
read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self> where Self: Unpin,208 fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>
209 where Self: Unpin,
210 {
211 assert_future::<Result<usize>, _>(Read::new(self, buf))
212 }
213
214 /// Creates a future which will read from the `AsyncRead` into `bufs` using vectored
215 /// IO operations.
216 ///
217 /// The returned future will resolve to the number of bytes read once the read
218 /// operation is completed.
read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self> where Self: Unpin,219 fn read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self>
220 where Self: Unpin,
221 {
222 assert_future::<Result<usize>, _>(ReadVectored::new(self, bufs))
223 }
224
225 /// Creates a future which will read exactly enough bytes to fill `buf`,
226 /// returning an error if end of file (EOF) is hit sooner.
227 ///
228 /// The returned future will resolve once the read operation is completed.
229 ///
230 /// In the case of an error the buffer and the object will be discarded, with
231 /// the error yielded.
232 ///
233 /// # Examples
234 ///
235 /// ```
236 /// # futures::executor::block_on(async {
237 /// use futures::io::{AsyncReadExt, Cursor};
238 ///
239 /// let mut reader = Cursor::new([1, 2, 3, 4]);
240 /// let mut output = [0u8; 4];
241 ///
242 /// reader.read_exact(&mut output).await?;
243 ///
244 /// assert_eq!(output, [1, 2, 3, 4]);
245 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
246 /// ```
247 ///
248 /// ## EOF is hit before `buf` is filled
249 ///
250 /// ```
251 /// # futures::executor::block_on(async {
252 /// use futures::io::{self, AsyncReadExt, Cursor};
253 ///
254 /// let mut reader = Cursor::new([1, 2, 3, 4]);
255 /// let mut output = [0u8; 5];
256 ///
257 /// let result = reader.read_exact(&mut output).await;
258 ///
259 /// assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
260 /// # });
261 /// ```
read_exact<'a>( &'a mut self, buf: &'a mut [u8], ) -> ReadExact<'a, Self> where Self: Unpin,262 fn read_exact<'a>(
263 &'a mut self,
264 buf: &'a mut [u8],
265 ) -> ReadExact<'a, Self>
266 where Self: Unpin,
267 {
268 assert_future::<Result<()>, _>(ReadExact::new(self, buf))
269 }
270
271 /// Creates a future which will read all the bytes from this `AsyncRead`.
272 ///
273 /// On success the total number of bytes read is returned.
274 ///
275 /// # Examples
276 ///
277 /// ```
278 /// # futures::executor::block_on(async {
279 /// use futures::io::{AsyncReadExt, Cursor};
280 ///
281 /// let mut reader = Cursor::new([1, 2, 3, 4]);
282 /// let mut output = Vec::with_capacity(4);
283 ///
284 /// let bytes = reader.read_to_end(&mut output).await?;
285 ///
286 /// assert_eq!(bytes, 4);
287 /// assert_eq!(output, vec![1, 2, 3, 4]);
288 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
289 /// ```
read_to_end<'a>( &'a mut self, buf: &'a mut Vec<u8>, ) -> ReadToEnd<'a, Self> where Self: Unpin,290 fn read_to_end<'a>(
291 &'a mut self,
292 buf: &'a mut Vec<u8>,
293 ) -> ReadToEnd<'a, Self>
294 where Self: Unpin,
295 {
296 assert_future::<Result<usize>, _>(ReadToEnd::new(self, buf))
297 }
298
299 /// Creates a future which will read all the bytes from this `AsyncRead`.
300 ///
301 /// On success the total number of bytes read is returned.
302 ///
303 /// # Examples
304 ///
305 /// ```
306 /// # futures::executor::block_on(async {
307 /// use futures::io::{AsyncReadExt, Cursor};
308 ///
309 /// let mut reader = Cursor::new(&b"1234"[..]);
310 /// let mut buffer = String::with_capacity(4);
311 ///
312 /// let bytes = reader.read_to_string(&mut buffer).await?;
313 ///
314 /// assert_eq!(bytes, 4);
315 /// assert_eq!(buffer, String::from("1234"));
316 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
317 /// ```
read_to_string<'a>( &'a mut self, buf: &'a mut String, ) -> ReadToString<'a, Self> where Self: Unpin,318 fn read_to_string<'a>(
319 &'a mut self,
320 buf: &'a mut String,
321 ) -> ReadToString<'a, Self>
322 where Self: Unpin,
323 {
324 assert_future::<Result<usize>, _>(ReadToString::new(self, buf))
325 }
326
327 /// Helper method for splitting this read/write object into two halves.
328 ///
329 /// The two halves returned implement the `AsyncRead` and `AsyncWrite`
330 /// traits, respectively.
331 ///
332 /// # Examples
333 ///
334 /// ```
335 /// # futures::executor::block_on(async {
336 /// use futures::io::{self, AsyncReadExt, Cursor};
337 ///
338 /// // Note that for `Cursor` the read and write halves share a single
339 /// // seek position. This may or may not be true for other types that
340 /// // implement both `AsyncRead` and `AsyncWrite`.
341 ///
342 /// let reader = Cursor::new([1, 2, 3, 4]);
343 /// let mut buffer = Cursor::new(vec![0, 0, 0, 0, 5, 6, 7, 8]);
344 /// let mut writer = Cursor::new(vec![0u8; 5]);
345 ///
346 /// {
347 /// let (buffer_reader, mut buffer_writer) = (&mut buffer).split();
348 /// io::copy(reader, &mut buffer_writer).await?;
349 /// io::copy(buffer_reader, &mut writer).await?;
350 /// }
351 ///
352 /// assert_eq!(buffer.into_inner(), [1, 2, 3, 4, 5, 6, 7, 8]);
353 /// assert_eq!(writer.into_inner(), [5, 6, 7, 8, 0]);
354 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
355 /// ```
split(self) -> (ReadHalf<Self>, WriteHalf<Self>) where Self: AsyncWrite + Sized,356 fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
357 where Self: AsyncWrite + Sized,
358 {
359 let (r, w) = split::split(self);
360 (assert_read(r), assert_write(w))
361 }
362
363 /// Creates an AsyncRead adapter which will read at most `limit` bytes
364 /// from the underlying reader.
365 ///
366 /// # Examples
367 ///
368 /// ```
369 /// # futures::executor::block_on(async {
370 /// use futures::io::{AsyncReadExt, Cursor};
371 ///
372 /// let reader = Cursor::new(&b"12345678"[..]);
373 /// let mut buffer = [0; 5];
374 ///
375 /// let mut take = reader.take(4);
376 /// let n = take.read(&mut buffer).await?;
377 ///
378 /// assert_eq!(n, 4);
379 /// assert_eq!(&buffer, b"1234\0");
380 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
381 /// ```
take(self, limit: u64) -> Take<Self> where Self: Sized382 fn take(self, limit: u64) -> Take<Self>
383 where Self: Sized
384 {
385 assert_read(Take::new(self, limit))
386 }
387
388 /// Wraps an [`AsyncRead`] in a compatibility wrapper that allows it to be
389 /// used as a futures 0.1 / tokio-io 0.1 `AsyncRead`. If the wrapped type
390 /// implements [`AsyncWrite`] as well, the result will also implement the
391 /// futures 0.1 / tokio 0.1 `AsyncWrite` trait.
392 ///
393 /// Requires the `io-compat` feature to enable.
394 #[cfg(feature = "io-compat")]
395 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
compat(self) -> Compat<Self> where Self: Sized + Unpin,396 fn compat(self) -> Compat<Self>
397 where Self: Sized + Unpin,
398 {
399 Compat::new(self)
400 }
401 }
402
403 impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
404
405 /// An extension trait which adds utility methods to `AsyncWrite` types.
406 pub trait AsyncWriteExt: AsyncWrite {
407 /// Creates a future which will entirely flush this `AsyncWrite`.
408 ///
409 /// # Examples
410 ///
411 /// ```
412 /// # futures::executor::block_on(async {
413 /// use futures::io::{AllowStdIo, AsyncWriteExt};
414 /// use std::io::{BufWriter, Cursor};
415 ///
416 /// let mut output = vec![0u8; 5];
417 ///
418 /// {
419 /// let writer = Cursor::new(&mut output);
420 /// let mut buffered = AllowStdIo::new(BufWriter::new(writer));
421 /// buffered.write_all(&[1, 2]).await?;
422 /// buffered.write_all(&[3, 4]).await?;
423 /// buffered.flush().await?;
424 /// }
425 ///
426 /// assert_eq!(output, [1, 2, 3, 4, 0]);
427 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
428 /// ```
flush(&mut self) -> Flush<'_, Self> where Self: Unpin,429 fn flush(&mut self) -> Flush<'_, Self>
430 where Self: Unpin,
431 {
432 assert_future::<Result<()>, _>(Flush::new(self))
433 }
434
435 /// Creates a future which will entirely close this `AsyncWrite`.
close(&mut self) -> Close<'_, Self> where Self: Unpin,436 fn close(&mut self) -> Close<'_, Self>
437 where Self: Unpin,
438 {
439 assert_future::<Result<()>, _>(Close::new(self))
440 }
441
442 /// Creates a future which will write bytes from `buf` into the object.
443 ///
444 /// The returned future will resolve to the number of bytes written once the write
445 /// operation is completed.
write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self> where Self: Unpin,446 fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self>
447 where Self: Unpin,
448 {
449 assert_future::<Result<usize>, _>(Write::new(self, buf))
450 }
451
452 /// Creates a future which will write bytes from `bufs` into the object using vectored
453 /// IO operations.
454 ///
455 /// The returned future will resolve to the number of bytes written once the write
456 /// operation is completed.
write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self> where Self: Unpin,457 fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self>
458 where Self: Unpin,
459 {
460 assert_future::<Result<usize>, _>(WriteVectored::new(self, bufs))
461 }
462
463 /// Write data into this object.
464 ///
465 /// Creates a future that will write the entire contents of the buffer `buf` into
466 /// this `AsyncWrite`.
467 ///
468 /// The returned future will not complete until all the data has been written.
469 ///
470 /// # Examples
471 ///
472 /// ```
473 /// # futures::executor::block_on(async {
474 /// use futures::io::{AsyncWriteExt, Cursor};
475 ///
476 /// let mut writer = Cursor::new(vec![0u8; 5]);
477 ///
478 /// writer.write_all(&[1, 2, 3, 4]).await?;
479 ///
480 /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
481 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
482 /// ```
write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self> where Self: Unpin,483 fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self>
484 where Self: Unpin,
485 {
486 assert_future::<Result<()>, _>(WriteAll::new(self, buf))
487 }
488
489 /// Attempts to write multiple buffers into this writer.
490 ///
491 /// Creates a future that will write the entire contents of `bufs` into this
492 /// `AsyncWrite` using [vectored writes].
493 ///
494 /// The returned future will not complete until all the data has been
495 /// written.
496 ///
497 /// [vectored writes]: std::io::Write::write_vectored
498 ///
499 /// # Notes
500 ///
501 /// Unlike `io::Write::write_vectored`, this takes a *mutable* reference to
502 /// a slice of `IoSlice`s, not an immutable one. That's because we need to
503 /// modify the slice to keep track of the bytes already written.
504 ///
505 /// Once this futures returns, the contents of `bufs` are unspecified, as
506 /// this depends on how many calls to `write_vectored` were necessary. It is
507 /// best to understand this function as taking ownership of `bufs` and to
508 /// not use `bufs` afterwards. The underlying buffers, to which the
509 /// `IoSlice`s point (but not the `IoSlice`s themselves), are unchanged and
510 /// can be reused.
511 ///
512 /// # Examples
513 ///
514 /// ```
515 /// # futures::executor::block_on(async {
516 /// use futures::io::AsyncWriteExt;
517 /// use futures_util::io::Cursor;
518 /// use std::io::IoSlice;
519 ///
520 /// let mut writer = Cursor::new(Vec::new());
521 /// let bufs = &mut [
522 /// IoSlice::new(&[1]),
523 /// IoSlice::new(&[2, 3]),
524 /// IoSlice::new(&[4, 5, 6]),
525 /// ];
526 ///
527 /// writer.write_all_vectored(bufs).await?;
528 /// // Note: the contents of `bufs` is now unspecified, see the Notes section.
529 ///
530 /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4, 5, 6]);
531 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
532 /// ```
533 #[cfg(feature = "write-all-vectored")]
write_all_vectored<'a>( &'a mut self, bufs: &'a mut [IoSlice<'a>], ) -> WriteAllVectored<'a, Self> where Self: Unpin,534 fn write_all_vectored<'a>(
535 &'a mut self,
536 bufs: &'a mut [IoSlice<'a>],
537 ) -> WriteAllVectored<'a, Self>
538 where
539 Self: Unpin,
540 {
541 assert_future::<Result<()>, _>(WriteAllVectored::new(self, bufs))
542 }
543
544 /// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be
545 /// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`.
546 /// Requires the `io-compat` feature to enable.
547 #[cfg(feature = "io-compat")]
548 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
compat_write(self) -> Compat<Self> where Self: Sized + Unpin,549 fn compat_write(self) -> Compat<Self>
550 where Self: Sized + Unpin,
551 {
552 Compat::new(self)
553 }
554
555 /// Allow using an [`AsyncWrite`] as a [`Sink`](futures_sink::Sink)`<Item: AsRef<[u8]>>`.
556 ///
557 /// This adapter produces a sink that will write each value passed to it
558 /// into the underlying writer.
559 ///
560 /// Note that this function consumes the given writer, returning a wrapped
561 /// version.
562 ///
563 /// # Examples
564 ///
565 /// ```
566 /// # futures::executor::block_on(async {
567 /// use futures::io::AsyncWriteExt;
568 /// use futures::stream::{self, StreamExt};
569 ///
570 /// let stream = stream::iter(vec![Ok([1, 2, 3]), Ok([4, 5, 6])]);
571 ///
572 /// let mut writer = vec![];
573 ///
574 /// stream.forward((&mut writer).into_sink()).await?;
575 ///
576 /// assert_eq!(writer, vec![1, 2, 3, 4, 5, 6]);
577 /// # Ok::<(), Box<dyn std::error::Error>>(())
578 /// # })?;
579 /// # Ok::<(), Box<dyn std::error::Error>>(())
580 /// ```
581 #[cfg(feature = "sink")]
582 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item> where Self: Sized,583 fn into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item>
584 where Self: Sized,
585 {
586 crate::sink::assert_sink::<Item, Error, _>(IntoSink::new(self))
587 }
588 }
589
590 impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
591
592 /// An extension trait which adds utility methods to `AsyncSeek` types.
593 pub trait AsyncSeekExt: AsyncSeek {
594 /// Creates a future which will seek an IO object, and then yield the
595 /// new position in the object and the object itself.
596 ///
597 /// In the case of an error the buffer and the object will be discarded, with
598 /// the error yielded.
seek(&mut self, pos: SeekFrom) -> Seek<'_, Self> where Self: Unpin,599 fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self>
600 where Self: Unpin,
601 {
602 assert_future::<Result<u64>, _>(Seek::new(self, pos))
603 }
604 }
605
606 impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
607
608 /// An extension trait which adds utility methods to `AsyncBufRead` types.
609 pub trait AsyncBufReadExt: AsyncBufRead {
610 /// Creates a future which will wait for a non-empty buffer to be available from this I/O
611 /// object or EOF to be reached.
612 ///
613 /// This method is the async equivalent to [`BufRead::fill_buf`](std::io::BufRead::fill_buf).
614 ///
615 /// ```rust
616 /// # futures::executor::block_on(async {
617 /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
618 ///
619 /// let mut stream = iter(vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5, 6])]).into_async_read();
620 ///
621 /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
622 /// stream.consume_unpin(2);
623 ///
624 /// assert_eq!(stream.fill_buf().await?, vec![3]);
625 /// stream.consume_unpin(1);
626 ///
627 /// assert_eq!(stream.fill_buf().await?, vec![4, 5, 6]);
628 /// stream.consume_unpin(3);
629 ///
630 /// assert_eq!(stream.fill_buf().await?, vec![]);
631 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
632 /// ```
fill_buf(&mut self) -> FillBuf<'_, Self> where Self: Unpin,633 fn fill_buf(&mut self) -> FillBuf<'_, Self>
634 where Self: Unpin,
635 {
636 assert_future::<Result<&[u8]>, _>(FillBuf::new(self))
637 }
638
639 /// A convenience for calling [`AsyncBufRead::consume`] on [`Unpin`] IO types.
640 ///
641 /// ```rust
642 /// # futures::executor::block_on(async {
643 /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
644 ///
645 /// let mut stream = iter(vec![Ok(vec![1, 2, 3])]).into_async_read();
646 ///
647 /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
648 /// stream.consume_unpin(2);
649 ///
650 /// assert_eq!(stream.fill_buf().await?, vec![3]);
651 /// stream.consume_unpin(1);
652 ///
653 /// assert_eq!(stream.fill_buf().await?, vec![]);
654 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
655 /// ```
consume_unpin(&mut self, amt: usize) where Self: Unpin,656 fn consume_unpin(&mut self, amt: usize)
657 where Self: Unpin,
658 {
659 Pin::new(self).consume(amt)
660 }
661
662 /// Creates a future which will read all the bytes associated with this I/O
663 /// object into `buf` until the delimiter `byte` or EOF is reached.
664 /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
665 ///
666 /// This function will read bytes from the underlying stream until the
667 /// delimiter or EOF is found. Once found, all bytes up to, and including,
668 /// the delimiter (if found) will be appended to `buf`.
669 ///
670 /// The returned future will resolve to the number of bytes read once the read
671 /// operation is completed.
672 ///
673 /// In the case of an error the buffer and the object will be discarded, with
674 /// the error yielded.
675 ///
676 /// # Examples
677 ///
678 /// ```
679 /// # futures::executor::block_on(async {
680 /// use futures::io::{AsyncBufReadExt, Cursor};
681 ///
682 /// let mut cursor = Cursor::new(b"lorem-ipsum");
683 /// let mut buf = vec![];
684 ///
685 /// // cursor is at 'l'
686 /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
687 /// assert_eq!(num_bytes, 6);
688 /// assert_eq!(buf, b"lorem-");
689 /// buf.clear();
690 ///
691 /// // cursor is at 'i'
692 /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
693 /// assert_eq!(num_bytes, 5);
694 /// assert_eq!(buf, b"ipsum");
695 /// buf.clear();
696 ///
697 /// // cursor is at EOF
698 /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
699 /// assert_eq!(num_bytes, 0);
700 /// assert_eq!(buf, b"");
701 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
702 /// ```
read_until<'a>( &'a mut self, byte: u8, buf: &'a mut Vec<u8>, ) -> ReadUntil<'a, Self> where Self: Unpin,703 fn read_until<'a>(
704 &'a mut self,
705 byte: u8,
706 buf: &'a mut Vec<u8>,
707 ) -> ReadUntil<'a, Self>
708 where Self: Unpin,
709 {
710 assert_future::<Result<usize>, _>(ReadUntil::new(self, byte, buf))
711 }
712
713 /// Creates a future which will read all the bytes associated with this I/O
714 /// object into `buf` until a newline (the 0xA byte) or EOF is reached,
715 /// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line).
716 ///
717 /// This function will read bytes from the underlying stream until the
718 /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
719 /// up to, and including, the delimiter (if found) will be appended to
720 /// `buf`.
721 ///
722 /// The returned future will resolve to the number of bytes read once the read
723 /// operation is completed.
724 ///
725 /// In the case of an error the buffer and the object will be discarded, with
726 /// the error yielded.
727 ///
728 /// # Errors
729 ///
730 /// This function has the same error semantics as [`read_until`] and will
731 /// also return an error if the read bytes are not valid UTF-8. If an I/O
732 /// error is encountered then `buf` may contain some bytes already read in
733 /// the event that all data read so far was valid UTF-8.
734 ///
735 /// [`read_until`]: AsyncBufReadExt::read_until
736 ///
737 /// # Examples
738 ///
739 /// ```
740 /// # futures::executor::block_on(async {
741 /// use futures::io::{AsyncBufReadExt, Cursor};
742 ///
743 /// let mut cursor = Cursor::new(b"foo\nbar");
744 /// let mut buf = String::new();
745 ///
746 /// // cursor is at 'f'
747 /// let num_bytes = cursor.read_line(&mut buf).await?;
748 /// assert_eq!(num_bytes, 4);
749 /// assert_eq!(buf, "foo\n");
750 /// buf.clear();
751 ///
752 /// // cursor is at 'b'
753 /// let num_bytes = cursor.read_line(&mut buf).await?;
754 /// assert_eq!(num_bytes, 3);
755 /// assert_eq!(buf, "bar");
756 /// buf.clear();
757 ///
758 /// // cursor is at EOF
759 /// let num_bytes = cursor.read_line(&mut buf).await?;
760 /// assert_eq!(num_bytes, 0);
761 /// assert_eq!(buf, "");
762 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
763 /// ```
read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self> where Self: Unpin,764 fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
765 where Self: Unpin,
766 {
767 assert_future::<Result<usize>, _>(ReadLine::new(self, buf))
768 }
769
770 /// Returns a stream over the lines of this reader.
771 /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines).
772 ///
773 /// The stream returned from this function will yield instances of
774 /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline
775 /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
776 ///
777 /// [`io::Result`]: std::io::Result
778 /// [`String`]: String
779 ///
780 /// # Errors
781 ///
782 /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`].
783 ///
784 /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line
785 ///
786 /// # Examples
787 ///
788 /// ```
789 /// # futures::executor::block_on(async {
790 /// use futures::io::{AsyncBufReadExt, Cursor};
791 /// use futures::stream::StreamExt;
792 ///
793 /// let cursor = Cursor::new(b"lorem\nipsum\r\ndolor");
794 ///
795 /// let mut lines_stream = cursor.lines().map(|l| l.unwrap());
796 /// assert_eq!(lines_stream.next().await, Some(String::from("lorem")));
797 /// assert_eq!(lines_stream.next().await, Some(String::from("ipsum")));
798 /// assert_eq!(lines_stream.next().await, Some(String::from("dolor")));
799 /// assert_eq!(lines_stream.next().await, None);
800 /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
801 /// ```
lines(self) -> Lines<Self> where Self: Sized,802 fn lines(self) -> Lines<Self>
803 where Self: Sized,
804 {
805 assert_stream::<Result<String>, _>(Lines::new(self))
806 }
807 }
808
809 impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
810
811 // Just a helper function to ensure the reader we're returning all have the
812 // right implementations.
assert_read<R>(reader: R) -> R where R: AsyncRead,813 pub(crate) fn assert_read<R>(reader: R) -> R
814 where
815 R: AsyncRead,
816 {
817 reader
818 }
819 // Just a helper function to ensure the writer we're returning all have the
820 // right implementations.
assert_write<W>(writer: W) -> W where W: AsyncWrite,821 pub(crate) fn assert_write<W>(writer: W) -> W
822 where
823 W: AsyncWrite,
824 {
825 writer
826 }
827