1 //! Asynchronous I/O.
2 //!
3 //! This module is the asynchronous version of `std::io`. It defines four
4 //! traits, [`AsyncRead`], [`AsyncWrite`], [`AsyncSeek`], and [`AsyncBufRead`],
5 //! which mirror the `Read`, `Write`, `Seek`, and `BufRead` traits of the
6 //! standard library. However, these traits integrate with the asynchronous
7 //! task system, so that if an I/O object isn't ready for reading (or writing),
8 //! the thread is not blocked, and instead the current task is queued to be
9 //! woken when I/O is ready.
10 //!
11 //! In addition, the [`AsyncReadExt`], [`AsyncWriteExt`], [`AsyncSeekExt`], and
12 //! [`AsyncBufReadExt`] extension traits offer a variety of useful combinators
13 //! for operating with asynchronous I/O objects, including ways to work with
14 //! them using futures, streams and sinks.
15 //!
16 //! This module is only available when the `std` feature of this
17 //! library is activated, and it is activated by default.
18 
19 #[cfg(feature = "io-compat")]
20 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
21 use crate::compat::Compat;
22 use crate::future::assert_future;
23 use crate::stream::assert_stream;
24 use std::{ptr, pin::Pin};
25 
26 // Re-export some types from `std::io` so that users don't have to deal
27 // with conflicts when `use`ing `futures::io` and `std::io`.
28 #[doc(no_inline)]
29 pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
30 #[doc(no_inline)]
31 #[cfg(feature = "read-initializer")]
32 #[cfg_attr(docsrs, doc(cfg(feature = "read-initializer")))]
33 pub use std::io::Initializer;
34 
35 pub use futures_io::{AsyncRead, AsyncWrite, AsyncSeek, AsyncBufRead};
36 
37 // used by `BufReader` and `BufWriter`
38 // https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
39 const DEFAULT_BUF_SIZE: usize = 8 * 1024;
40 
41 /// Initializes a buffer if necessary.
42 ///
43 /// A buffer is always initialized if `read-initializer` feature is disabled.
44 #[inline]
initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8])45 unsafe fn initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8]) {
46     #[cfg(feature = "read-initializer")]
47     {
48         if !_reader.initializer().should_initialize() {
49             return;
50         }
51     }
52     ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len())
53 }
54 
55 mod allow_std;
56 pub use self::allow_std::AllowStdIo;
57 
58 mod buf_reader;
59 pub use self::buf_reader::BufReader;
60 
61 mod buf_writer;
62 pub use self::buf_writer::BufWriter;
63 
64 mod chain;
65 pub use self::chain::Chain;
66 
67 mod close;
68 pub use self::close::Close;
69 
70 mod copy;
71 pub use self::copy::{copy, Copy};
72 
73 mod copy_buf;
74 pub use self::copy_buf::{copy_buf, CopyBuf};
75 
76 mod cursor;
77 pub use self::cursor::Cursor;
78 
79 mod empty;
80 pub use self::empty::{empty, Empty};
81 
82 mod fill_buf;
83 pub use self::fill_buf::FillBuf;
84 
85 mod flush;
86 pub use self::flush::Flush;
87 
88 #[cfg(feature = "sink")]
89 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
90 mod into_sink;
91 #[cfg(feature = "sink")]
92 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
93 pub use self::into_sink::IntoSink;
94 
95 mod lines;
96 pub use self::lines::Lines;
97 
98 mod read;
99 pub use self::read::Read;
100 
101 mod read_vectored;
102 pub use self::read_vectored::ReadVectored;
103 
104 mod read_exact;
105 pub use self::read_exact::ReadExact;
106 
107 mod read_line;
108 pub use self::read_line::ReadLine;
109 
110 mod read_to_end;
111 pub use self::read_to_end::ReadToEnd;
112 
113 mod read_to_string;
114 pub use self::read_to_string::ReadToString;
115 
116 mod read_until;
117 pub use self::read_until::ReadUntil;
118 
119 mod repeat;
120 pub use self::repeat::{repeat, Repeat};
121 
122 mod seek;
123 pub use self::seek::Seek;
124 
125 mod sink;
126 pub use self::sink::{sink, Sink};
127 
128 mod split;
129 pub use self::split::{ReadHalf, WriteHalf, ReuniteError};
130 
131 mod take;
132 pub use self::take::Take;
133 
134 mod window;
135 pub use self::window::Window;
136 
137 mod write;
138 pub use self::write::Write;
139 
140 mod write_vectored;
141 pub use self::write_vectored::WriteVectored;
142 
143 mod write_all;
144 pub use self::write_all::WriteAll;
145 
146 #[cfg(feature = "write-all-vectored")]
147 mod write_all_vectored;
148 #[cfg(feature = "write-all-vectored")]
149 pub use self::write_all_vectored::WriteAllVectored;
150 
151 /// An extension trait which adds utility methods to `AsyncRead` types.
152 pub trait AsyncReadExt: AsyncRead {
153     /// Creates an adaptor which will chain this stream with another.
154     ///
155     /// The returned `AsyncRead` instance will first read all bytes from this object
156     /// until EOF is encountered. Afterwards the output is equivalent to the
157     /// output of `next`.
158     ///
159     /// # Examples
160     ///
161     /// ```
162     /// # futures::executor::block_on(async {
163     /// use futures::io::{AsyncReadExt, Cursor};
164     ///
165     /// let reader1 = Cursor::new([1, 2, 3, 4]);
166     /// let reader2 = Cursor::new([5, 6, 7, 8]);
167     ///
168     /// let mut reader = reader1.chain(reader2);
169     /// let mut buffer = Vec::new();
170     ///
171     /// // read the value into a Vec.
172     /// reader.read_to_end(&mut buffer).await?;
173     /// assert_eq!(buffer, [1, 2, 3, 4, 5, 6, 7, 8]);
174     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
175     /// ```
chain<R>(self, next: R) -> Chain<Self, R> where Self: Sized, R: AsyncRead,176     fn chain<R>(self, next: R) -> Chain<Self, R>
177     where
178         Self: Sized,
179         R: AsyncRead,
180     {
181         assert_read(Chain::new(self, next))
182     }
183 
184     /// Tries to read some bytes directly into the given `buf` in asynchronous
185     /// manner, returning a future type.
186     ///
187     /// The returned future will resolve to the number of bytes read once the read
188     /// operation is completed.
189     ///
190     /// # Examples
191     ///
192     /// ```
193     /// # futures::executor::block_on(async {
194     /// use futures::io::{AsyncReadExt, Cursor};
195     ///
196     /// let mut reader = Cursor::new([1, 2, 3, 4]);
197     /// let mut output = [0u8; 5];
198     ///
199     /// let bytes = reader.read(&mut output[..]).await?;
200     ///
201     /// // This is only guaranteed to be 4 because `&[u8]` is a synchronous
202     /// // reader. In a real system you could get anywhere from 1 to
203     /// // `output.len()` bytes in a single read.
204     /// assert_eq!(bytes, 4);
205     /// assert_eq!(output, [1, 2, 3, 4, 0]);
206     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
207     /// ```
read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self> where Self: Unpin,208     fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>
209         where Self: Unpin,
210     {
211         assert_future::<Result<usize>, _>(Read::new(self, buf))
212     }
213 
214     /// Creates a future which will read from the `AsyncRead` into `bufs` using vectored
215     /// IO operations.
216     ///
217     /// The returned future will resolve to the number of bytes read once the read
218     /// operation is completed.
read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self> where Self: Unpin,219     fn read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self>
220         where Self: Unpin,
221     {
222         assert_future::<Result<usize>, _>(ReadVectored::new(self, bufs))
223     }
224 
225     /// Creates a future which will read exactly enough bytes to fill `buf`,
226     /// returning an error if end of file (EOF) is hit sooner.
227     ///
228     /// The returned future will resolve once the read operation is completed.
229     ///
230     /// In the case of an error the buffer and the object will be discarded, with
231     /// the error yielded.
232     ///
233     /// # Examples
234     ///
235     /// ```
236     /// # futures::executor::block_on(async {
237     /// use futures::io::{AsyncReadExt, Cursor};
238     ///
239     /// let mut reader = Cursor::new([1, 2, 3, 4]);
240     /// let mut output = [0u8; 4];
241     ///
242     /// reader.read_exact(&mut output).await?;
243     ///
244     /// assert_eq!(output, [1, 2, 3, 4]);
245     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
246     /// ```
247     ///
248     /// ## EOF is hit before `buf` is filled
249     ///
250     /// ```
251     /// # futures::executor::block_on(async {
252     /// use futures::io::{self, AsyncReadExt, Cursor};
253     ///
254     /// let mut reader = Cursor::new([1, 2, 3, 4]);
255     /// let mut output = [0u8; 5];
256     ///
257     /// let result = reader.read_exact(&mut output).await;
258     ///
259     /// assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
260     /// # });
261     /// ```
read_exact<'a>( &'a mut self, buf: &'a mut [u8], ) -> ReadExact<'a, Self> where Self: Unpin,262     fn read_exact<'a>(
263         &'a mut self,
264         buf: &'a mut [u8],
265     ) -> ReadExact<'a, Self>
266         where Self: Unpin,
267     {
268         assert_future::<Result<()>, _>(ReadExact::new(self, buf))
269     }
270 
271     /// Creates a future which will read all the bytes from this `AsyncRead`.
272     ///
273     /// On success the total number of bytes read is returned.
274     ///
275     /// # Examples
276     ///
277     /// ```
278     /// # futures::executor::block_on(async {
279     /// use futures::io::{AsyncReadExt, Cursor};
280     ///
281     /// let mut reader = Cursor::new([1, 2, 3, 4]);
282     /// let mut output = Vec::with_capacity(4);
283     ///
284     /// let bytes = reader.read_to_end(&mut output).await?;
285     ///
286     /// assert_eq!(bytes, 4);
287     /// assert_eq!(output, vec![1, 2, 3, 4]);
288     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
289     /// ```
read_to_end<'a>( &'a mut self, buf: &'a mut Vec<u8>, ) -> ReadToEnd<'a, Self> where Self: Unpin,290     fn read_to_end<'a>(
291         &'a mut self,
292         buf: &'a mut Vec<u8>,
293     ) -> ReadToEnd<'a, Self>
294         where Self: Unpin,
295     {
296         assert_future::<Result<usize>, _>(ReadToEnd::new(self, buf))
297     }
298 
299     /// Creates a future which will read all the bytes from this `AsyncRead`.
300     ///
301     /// On success the total number of bytes read is returned.
302     ///
303     /// # Examples
304     ///
305     /// ```
306     /// # futures::executor::block_on(async {
307     /// use futures::io::{AsyncReadExt, Cursor};
308     ///
309     /// let mut reader = Cursor::new(&b"1234"[..]);
310     /// let mut buffer = String::with_capacity(4);
311     ///
312     /// let bytes = reader.read_to_string(&mut buffer).await?;
313     ///
314     /// assert_eq!(bytes, 4);
315     /// assert_eq!(buffer, String::from("1234"));
316     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
317     /// ```
read_to_string<'a>( &'a mut self, buf: &'a mut String, ) -> ReadToString<'a, Self> where Self: Unpin,318     fn read_to_string<'a>(
319         &'a mut self,
320         buf: &'a mut String,
321     ) -> ReadToString<'a, Self>
322         where Self: Unpin,
323     {
324         assert_future::<Result<usize>, _>(ReadToString::new(self, buf))
325     }
326 
327     /// Helper method for splitting this read/write object into two halves.
328     ///
329     /// The two halves returned implement the `AsyncRead` and `AsyncWrite`
330     /// traits, respectively.
331     ///
332     /// # Examples
333     ///
334     /// ```
335     /// # futures::executor::block_on(async {
336     /// use futures::io::{self, AsyncReadExt, Cursor};
337     ///
338     /// // Note that for `Cursor` the read and write halves share a single
339     /// // seek position. This may or may not be true for other types that
340     /// // implement both `AsyncRead` and `AsyncWrite`.
341     ///
342     /// let reader = Cursor::new([1, 2, 3, 4]);
343     /// let mut buffer = Cursor::new(vec![0, 0, 0, 0, 5, 6, 7, 8]);
344     /// let mut writer = Cursor::new(vec![0u8; 5]);
345     ///
346     /// {
347     ///     let (buffer_reader, mut buffer_writer) = (&mut buffer).split();
348     ///     io::copy(reader, &mut buffer_writer).await?;
349     ///     io::copy(buffer_reader, &mut writer).await?;
350     /// }
351     ///
352     /// assert_eq!(buffer.into_inner(), [1, 2, 3, 4, 5, 6, 7, 8]);
353     /// assert_eq!(writer.into_inner(), [5, 6, 7, 8, 0]);
354     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
355     /// ```
split(self) -> (ReadHalf<Self>, WriteHalf<Self>) where Self: AsyncWrite + Sized,356     fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
357         where Self: AsyncWrite + Sized,
358     {
359         let (r, w) = split::split(self);
360         (assert_read(r), assert_write(w))
361     }
362 
363     /// Creates an AsyncRead adapter which will read at most `limit` bytes
364     /// from the underlying reader.
365     ///
366     /// # Examples
367     ///
368     /// ```
369     /// # futures::executor::block_on(async {
370     /// use futures::io::{AsyncReadExt, Cursor};
371     ///
372     /// let reader = Cursor::new(&b"12345678"[..]);
373     /// let mut buffer = [0; 5];
374     ///
375     /// let mut take = reader.take(4);
376     /// let n = take.read(&mut buffer).await?;
377     ///
378     /// assert_eq!(n, 4);
379     /// assert_eq!(&buffer, b"1234\0");
380     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
381     /// ```
take(self, limit: u64) -> Take<Self> where Self: Sized382     fn take(self, limit: u64) -> Take<Self>
383         where Self: Sized
384     {
385         assert_read(Take::new(self, limit))
386     }
387 
388     /// Wraps an [`AsyncRead`] in a compatibility wrapper that allows it to be
389     /// used as a futures 0.1 / tokio-io 0.1 `AsyncRead`. If the wrapped type
390     /// implements [`AsyncWrite`] as well, the result will also implement the
391     /// futures 0.1 / tokio 0.1 `AsyncWrite` trait.
392     ///
393     /// Requires the `io-compat` feature to enable.
394     #[cfg(feature = "io-compat")]
395     #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
compat(self) -> Compat<Self> where Self: Sized + Unpin,396     fn compat(self) -> Compat<Self>
397         where Self: Sized + Unpin,
398     {
399         Compat::new(self)
400     }
401 }
402 
403 impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
404 
405 /// An extension trait which adds utility methods to `AsyncWrite` types.
406 pub trait AsyncWriteExt: AsyncWrite {
407     /// Creates a future which will entirely flush this `AsyncWrite`.
408     ///
409     /// # Examples
410     ///
411     /// ```
412     /// # futures::executor::block_on(async {
413     /// use futures::io::{AllowStdIo, AsyncWriteExt};
414     /// use std::io::{BufWriter, Cursor};
415     ///
416     /// let mut output = vec![0u8; 5];
417     ///
418     /// {
419     ///     let writer = Cursor::new(&mut output);
420     ///     let mut buffered = AllowStdIo::new(BufWriter::new(writer));
421     ///     buffered.write_all(&[1, 2]).await?;
422     ///     buffered.write_all(&[3, 4]).await?;
423     ///     buffered.flush().await?;
424     /// }
425     ///
426     /// assert_eq!(output, [1, 2, 3, 4, 0]);
427     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
428     /// ```
flush(&mut self) -> Flush<'_, Self> where Self: Unpin,429     fn flush(&mut self) -> Flush<'_, Self>
430         where Self: Unpin,
431     {
432         assert_future::<Result<()>, _>(Flush::new(self))
433     }
434 
435     /// Creates a future which will entirely close this `AsyncWrite`.
close(&mut self) -> Close<'_, Self> where Self: Unpin,436     fn close(&mut self) -> Close<'_, Self>
437         where Self: Unpin,
438     {
439         assert_future::<Result<()>, _>(Close::new(self))
440     }
441 
442     /// Creates a future which will write bytes from `buf` into the object.
443     ///
444     /// The returned future will resolve to the number of bytes written once the write
445     /// operation is completed.
write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self> where Self: Unpin,446     fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self>
447         where Self: Unpin,
448     {
449         assert_future::<Result<usize>, _>(Write::new(self, buf))
450     }
451 
452     /// Creates a future which will write bytes from `bufs` into the object using vectored
453     /// IO operations.
454     ///
455     /// The returned future will resolve to the number of bytes written once the write
456     /// operation is completed.
write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self> where Self: Unpin,457     fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self>
458         where Self: Unpin,
459     {
460         assert_future::<Result<usize>, _>(WriteVectored::new(self, bufs))
461     }
462 
463     /// Write data into this object.
464     ///
465     /// Creates a future that will write the entire contents of the buffer `buf` into
466     /// this `AsyncWrite`.
467     ///
468     /// The returned future will not complete until all the data has been written.
469     ///
470     /// # Examples
471     ///
472     /// ```
473     /// # futures::executor::block_on(async {
474     /// use futures::io::{AsyncWriteExt, Cursor};
475     ///
476     /// let mut writer = Cursor::new(vec![0u8; 5]);
477     ///
478     /// writer.write_all(&[1, 2, 3, 4]).await?;
479     ///
480     /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
481     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
482     /// ```
write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self> where Self: Unpin,483     fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self>
484         where Self: Unpin,
485     {
486         assert_future::<Result<()>, _>(WriteAll::new(self, buf))
487     }
488 
489     /// Attempts to write multiple buffers into this writer.
490     ///
491     /// Creates a future that will write the entire contents of `bufs` into this
492     /// `AsyncWrite` using [vectored writes].
493     ///
494     /// The returned future will not complete until all the data has been
495     /// written.
496     ///
497     /// [vectored writes]: std::io::Write::write_vectored
498     ///
499     /// # Notes
500     ///
501     /// Unlike `io::Write::write_vectored`, this takes a *mutable* reference to
502     /// a slice of `IoSlice`s, not an immutable one. That's because we need to
503     /// modify the slice to keep track of the bytes already written.
504     ///
505     /// Once this futures returns, the contents of `bufs` are unspecified, as
506     /// this depends on how many calls to `write_vectored` were necessary. It is
507     /// best to understand this function as taking ownership of `bufs` and to
508     /// not use `bufs` afterwards. The underlying buffers, to which the
509     /// `IoSlice`s point (but not the `IoSlice`s themselves), are unchanged and
510     /// can be reused.
511     ///
512     /// # Examples
513     ///
514     /// ```
515     /// # futures::executor::block_on(async {
516     /// use futures::io::AsyncWriteExt;
517     /// use futures_util::io::Cursor;
518     /// use std::io::IoSlice;
519     ///
520     /// let mut writer = Cursor::new(Vec::new());
521     /// let bufs = &mut [
522     ///     IoSlice::new(&[1]),
523     ///     IoSlice::new(&[2, 3]),
524     ///     IoSlice::new(&[4, 5, 6]),
525     /// ];
526     ///
527     /// writer.write_all_vectored(bufs).await?;
528     /// // Note: the contents of `bufs` is now unspecified, see the Notes section.
529     ///
530     /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4, 5, 6]);
531     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
532     /// ```
533     #[cfg(feature = "write-all-vectored")]
write_all_vectored<'a>( &'a mut self, bufs: &'a mut [IoSlice<'a>], ) -> WriteAllVectored<'a, Self> where Self: Unpin,534     fn write_all_vectored<'a>(
535         &'a mut self,
536         bufs: &'a mut [IoSlice<'a>],
537     ) -> WriteAllVectored<'a, Self>
538     where
539         Self: Unpin,
540     {
541         assert_future::<Result<()>, _>(WriteAllVectored::new(self, bufs))
542     }
543 
544     /// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be
545     /// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`.
546     /// Requires the `io-compat` feature to enable.
547     #[cfg(feature = "io-compat")]
548     #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
compat_write(self) -> Compat<Self> where Self: Sized + Unpin,549     fn compat_write(self) -> Compat<Self>
550         where Self: Sized + Unpin,
551     {
552         Compat::new(self)
553     }
554 
555     /// Allow using an [`AsyncWrite`] as a [`Sink`](futures_sink::Sink)`<Item: AsRef<[u8]>>`.
556     ///
557     /// This adapter produces a sink that will write each value passed to it
558     /// into the underlying writer.
559     ///
560     /// Note that this function consumes the given writer, returning a wrapped
561     /// version.
562     ///
563     /// # Examples
564     ///
565     /// ```
566     /// # futures::executor::block_on(async {
567     /// use futures::io::AsyncWriteExt;
568     /// use futures::stream::{self, StreamExt};
569     ///
570     /// let stream = stream::iter(vec![Ok([1, 2, 3]), Ok([4, 5, 6])]);
571     ///
572     /// let mut writer = vec![];
573     ///
574     /// stream.forward((&mut writer).into_sink()).await?;
575     ///
576     /// assert_eq!(writer, vec![1, 2, 3, 4, 5, 6]);
577     /// # Ok::<(), Box<dyn std::error::Error>>(())
578     /// # })?;
579     /// # Ok::<(), Box<dyn std::error::Error>>(())
580     /// ```
581     #[cfg(feature = "sink")]
582     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item> where Self: Sized,583     fn into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item>
584         where Self: Sized,
585     {
586         crate::sink::assert_sink::<Item, Error, _>(IntoSink::new(self))
587     }
588 }
589 
590 impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
591 
592 /// An extension trait which adds utility methods to `AsyncSeek` types.
593 pub trait AsyncSeekExt: AsyncSeek {
594     /// Creates a future which will seek an IO object, and then yield the
595     /// new position in the object and the object itself.
596     ///
597     /// In the case of an error the buffer and the object will be discarded, with
598     /// the error yielded.
seek(&mut self, pos: SeekFrom) -> Seek<'_, Self> where Self: Unpin,599     fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self>
600         where Self: Unpin,
601     {
602         assert_future::<Result<u64>, _>(Seek::new(self, pos))
603     }
604 
605     /// Creates a future which will return the current seek position from the
606     /// start of the stream.
607     ///
608     /// This is equivalent to `self.seek(SeekFrom::Current(0))`.
stream_position(&mut self) -> Seek<'_, Self> where Self: Unpin,609     fn stream_position(&mut self) -> Seek<'_, Self>
610     where
611         Self: Unpin,
612     {
613         self.seek(SeekFrom::Current(0))
614     }
615 }
616 
617 impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
618 
619 /// An extension trait which adds utility methods to `AsyncBufRead` types.
620 pub trait AsyncBufReadExt: AsyncBufRead {
621     /// Creates a future which will wait for a non-empty buffer to be available from this I/O
622     /// object or EOF to be reached.
623     ///
624     /// This method is the async equivalent to [`BufRead::fill_buf`](std::io::BufRead::fill_buf).
625     ///
626     /// ```rust
627     /// # futures::executor::block_on(async {
628     /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
629     ///
630     /// let mut stream = iter(vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5, 6])]).into_async_read();
631     ///
632     /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
633     /// stream.consume_unpin(2);
634     ///
635     /// assert_eq!(stream.fill_buf().await?, vec![3]);
636     /// stream.consume_unpin(1);
637     ///
638     /// assert_eq!(stream.fill_buf().await?, vec![4, 5, 6]);
639     /// stream.consume_unpin(3);
640     ///
641     /// assert_eq!(stream.fill_buf().await?, vec![]);
642     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
643     /// ```
fill_buf(&mut self) -> FillBuf<'_, Self> where Self: Unpin,644     fn fill_buf(&mut self) -> FillBuf<'_, Self>
645         where Self: Unpin,
646     {
647         assert_future::<Result<&[u8]>, _>(FillBuf::new(self))
648     }
649 
650     /// A convenience for calling [`AsyncBufRead::consume`] on [`Unpin`] IO types.
651     ///
652     /// ```rust
653     /// # futures::executor::block_on(async {
654     /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
655     ///
656     /// let mut stream = iter(vec![Ok(vec![1, 2, 3])]).into_async_read();
657     ///
658     /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
659     /// stream.consume_unpin(2);
660     ///
661     /// assert_eq!(stream.fill_buf().await?, vec![3]);
662     /// stream.consume_unpin(1);
663     ///
664     /// assert_eq!(stream.fill_buf().await?, vec![]);
665     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
666     /// ```
consume_unpin(&mut self, amt: usize) where Self: Unpin,667     fn consume_unpin(&mut self, amt: usize)
668         where Self: Unpin,
669     {
670         Pin::new(self).consume(amt)
671     }
672 
673     /// Creates a future which will read all the bytes associated with this I/O
674     /// object into `buf` until the delimiter `byte` or EOF is reached.
675     /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
676     ///
677     /// This function will read bytes from the underlying stream until the
678     /// delimiter or EOF is found. Once found, all bytes up to, and including,
679     /// the delimiter (if found) will be appended to `buf`.
680     ///
681     /// The returned future will resolve to the number of bytes read once the read
682     /// operation is completed.
683     ///
684     /// In the case of an error the buffer and the object will be discarded, with
685     /// the error yielded.
686     ///
687     /// # Examples
688     ///
689     /// ```
690     /// # futures::executor::block_on(async {
691     /// use futures::io::{AsyncBufReadExt, Cursor};
692     ///
693     /// let mut cursor = Cursor::new(b"lorem-ipsum");
694     /// let mut buf = vec![];
695     ///
696     /// // cursor is at 'l'
697     /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
698     /// assert_eq!(num_bytes, 6);
699     /// assert_eq!(buf, b"lorem-");
700     /// buf.clear();
701     ///
702     /// // cursor is at 'i'
703     /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
704     /// assert_eq!(num_bytes, 5);
705     /// assert_eq!(buf, b"ipsum");
706     /// buf.clear();
707     ///
708     /// // cursor is at EOF
709     /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
710     /// assert_eq!(num_bytes, 0);
711     /// assert_eq!(buf, b"");
712     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
713     /// ```
read_until<'a>( &'a mut self, byte: u8, buf: &'a mut Vec<u8>, ) -> ReadUntil<'a, Self> where Self: Unpin,714     fn read_until<'a>(
715         &'a mut self,
716         byte: u8,
717         buf: &'a mut Vec<u8>,
718     ) -> ReadUntil<'a, Self>
719         where Self: Unpin,
720     {
721         assert_future::<Result<usize>, _>(ReadUntil::new(self, byte, buf))
722     }
723 
724     /// Creates a future which will read all the bytes associated with this I/O
725     /// object into `buf` until a newline (the 0xA byte) or EOF is reached,
726     /// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line).
727     ///
728     /// This function will read bytes from the underlying stream until the
729     /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
730     /// up to, and including, the delimiter (if found) will be appended to
731     /// `buf`.
732     ///
733     /// The returned future will resolve to the number of bytes read once the read
734     /// operation is completed.
735     ///
736     /// In the case of an error the buffer and the object will be discarded, with
737     /// the error yielded.
738     ///
739     /// # Errors
740     ///
741     /// This function has the same error semantics as [`read_until`] and will
742     /// also return an error if the read bytes are not valid UTF-8. If an I/O
743     /// error is encountered then `buf` may contain some bytes already read in
744     /// the event that all data read so far was valid UTF-8.
745     ///
746     /// [`read_until`]: AsyncBufReadExt::read_until
747     ///
748     /// # Examples
749     ///
750     /// ```
751     /// # futures::executor::block_on(async {
752     /// use futures::io::{AsyncBufReadExt, Cursor};
753     ///
754     /// let mut cursor = Cursor::new(b"foo\nbar");
755     /// let mut buf = String::new();
756     ///
757     /// // cursor is at 'f'
758     /// let num_bytes = cursor.read_line(&mut buf).await?;
759     /// assert_eq!(num_bytes, 4);
760     /// assert_eq!(buf, "foo\n");
761     /// buf.clear();
762     ///
763     /// // cursor is at 'b'
764     /// let num_bytes = cursor.read_line(&mut buf).await?;
765     /// assert_eq!(num_bytes, 3);
766     /// assert_eq!(buf, "bar");
767     /// buf.clear();
768     ///
769     /// // cursor is at EOF
770     /// let num_bytes = cursor.read_line(&mut buf).await?;
771     /// assert_eq!(num_bytes, 0);
772     /// assert_eq!(buf, "");
773     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
774     /// ```
read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self> where Self: Unpin,775     fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
776         where Self: Unpin,
777     {
778         assert_future::<Result<usize>, _>(ReadLine::new(self, buf))
779     }
780 
781     /// Returns a stream over the lines of this reader.
782     /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines).
783     ///
784     /// The stream returned from this function will yield instances of
785     /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline
786     /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
787     ///
788     /// [`io::Result`]: std::io::Result
789     /// [`String`]: String
790     ///
791     /// # Errors
792     ///
793     /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`].
794     ///
795     /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line
796     ///
797     /// # Examples
798     ///
799     /// ```
800     /// # futures::executor::block_on(async {
801     /// use futures::io::{AsyncBufReadExt, Cursor};
802     /// use futures::stream::StreamExt;
803     ///
804     /// let cursor = Cursor::new(b"lorem\nipsum\r\ndolor");
805     ///
806     /// let mut lines_stream = cursor.lines().map(|l| l.unwrap());
807     /// assert_eq!(lines_stream.next().await, Some(String::from("lorem")));
808     /// assert_eq!(lines_stream.next().await, Some(String::from("ipsum")));
809     /// assert_eq!(lines_stream.next().await, Some(String::from("dolor")));
810     /// assert_eq!(lines_stream.next().await, None);
811     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
812     /// ```
lines(self) -> Lines<Self> where Self: Sized,813     fn lines(self) -> Lines<Self>
814         where Self: Sized,
815     {
816         assert_stream::<Result<String>, _>(Lines::new(self))
817     }
818 }
819 
820 impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
821 
822 // Just a helper function to ensure the reader we're returning all have the
823 // right implementations.
assert_read<R>(reader: R) -> R where R: AsyncRead,824 pub(crate) fn assert_read<R>(reader: R) -> R
825 where
826     R: AsyncRead,
827 {
828     reader
829 }
830 // Just a helper function to ensure the writer we're returning all have the
831 // right implementations.
assert_write<W>(writer: W) -> W where W: AsyncWrite,832 pub(crate) fn assert_write<W>(writer: W) -> W
833 where
834     W: AsyncWrite,
835 {
836     writer
837 }
838