1 //! Asynchronous I/O.
2 //!
3 //! This module is the asynchronous version of `std::io`. It defines four
4 //! traits, [`AsyncRead`], [`AsyncWrite`], [`AsyncSeek`], and [`AsyncBufRead`],
5 //! which mirror the `Read`, `Write`, `Seek`, and `BufRead` traits of the
6 //! standard library. However, these traits integrate with the asynchronous
7 //! task system, so that if an I/O object isn't ready for reading (or writing),
8 //! the thread is not blocked, and instead the current task is queued to be
9 //! woken when I/O is ready.
10 //!
11 //! In addition, the [`AsyncReadExt`], [`AsyncWriteExt`], [`AsyncSeekExt`], and
12 //! [`AsyncBufReadExt`] extension traits offer a variety of useful combinators
13 //! for operating with asynchronous I/O objects, including ways to work with
14 //! them using futures, streams and sinks.
15 //!
16 //! This module is only available when the `std` feature of this
17 //! library is activated, and it is activated by default.
18 
19 #[cfg(feature = "io-compat")]
20 #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
21 use crate::compat::Compat;
22 use crate::future::assert_future;
23 use crate::stream::assert_stream;
24 use std::{pin::Pin, ptr};
25 
26 // Re-export some types from `std::io` so that users don't have to deal
27 // with conflicts when `use`ing `futures::io` and `std::io`.
28 #[doc(no_inline)]
29 #[cfg(feature = "read-initializer")]
30 #[cfg_attr(docsrs, doc(cfg(feature = "read-initializer")))]
31 pub use std::io::Initializer;
32 #[doc(no_inline)]
33 pub use std::io::{Error, ErrorKind, IoSlice, IoSliceMut, Result, SeekFrom};
34 
35 pub use futures_io::{AsyncBufRead, AsyncRead, AsyncSeek, AsyncWrite};
36 
37 // used by `BufReader` and `BufWriter`
38 // https://github.com/rust-lang/rust/blob/master/src/libstd/sys_common/io.rs#L1
39 const DEFAULT_BUF_SIZE: usize = 8 * 1024;
40 
41 /// Initializes a buffer if necessary.
42 ///
43 /// A buffer is always initialized if `read-initializer` feature is disabled.
44 #[inline]
initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8])45 unsafe fn initialize<R: AsyncRead>(_reader: &R, buf: &mut [u8]) {
46     #[cfg(feature = "read-initializer")]
47     {
48         if !_reader.initializer().should_initialize() {
49             return;
50         }
51     }
52     ptr::write_bytes(buf.as_mut_ptr(), 0, buf.len())
53 }
54 
55 mod allow_std;
56 pub use self::allow_std::AllowStdIo;
57 
58 mod buf_reader;
59 pub use self::buf_reader::{BufReader, SeeKRelative};
60 
61 mod buf_writer;
62 pub use self::buf_writer::BufWriter;
63 
64 mod line_writer;
65 pub use self::line_writer::LineWriter;
66 
67 mod chain;
68 pub use self::chain::Chain;
69 
70 mod close;
71 pub use self::close::Close;
72 
73 mod copy;
74 pub use self::copy::{copy, Copy};
75 
76 mod copy_buf;
77 pub use self::copy_buf::{copy_buf, CopyBuf};
78 
79 mod cursor;
80 pub use self::cursor::Cursor;
81 
82 mod empty;
83 pub use self::empty::{empty, Empty};
84 
85 mod fill_buf;
86 pub use self::fill_buf::FillBuf;
87 
88 mod flush;
89 pub use self::flush::Flush;
90 
91 #[cfg(feature = "sink")]
92 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
93 mod into_sink;
94 #[cfg(feature = "sink")]
95 #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
96 pub use self::into_sink::IntoSink;
97 
98 mod lines;
99 pub use self::lines::Lines;
100 
101 mod read;
102 pub use self::read::Read;
103 
104 mod read_vectored;
105 pub use self::read_vectored::ReadVectored;
106 
107 mod read_exact;
108 pub use self::read_exact::ReadExact;
109 
110 mod read_line;
111 pub use self::read_line::ReadLine;
112 
113 mod read_to_end;
114 pub use self::read_to_end::ReadToEnd;
115 
116 mod read_to_string;
117 pub use self::read_to_string::ReadToString;
118 
119 mod read_until;
120 pub use self::read_until::ReadUntil;
121 
122 mod repeat;
123 pub use self::repeat::{repeat, Repeat};
124 
125 mod seek;
126 pub use self::seek::Seek;
127 
128 mod sink;
129 pub use self::sink::{sink, Sink};
130 
131 mod split;
132 pub use self::split::{ReadHalf, ReuniteError, WriteHalf};
133 
134 mod take;
135 pub use self::take::Take;
136 
137 mod window;
138 pub use self::window::Window;
139 
140 mod write;
141 pub use self::write::Write;
142 
143 mod write_vectored;
144 pub use self::write_vectored::WriteVectored;
145 
146 mod write_all;
147 pub use self::write_all::WriteAll;
148 
149 #[cfg(feature = "write-all-vectored")]
150 mod write_all_vectored;
151 #[cfg(feature = "write-all-vectored")]
152 pub use self::write_all_vectored::WriteAllVectored;
153 
154 /// An extension trait which adds utility methods to `AsyncRead` types.
155 pub trait AsyncReadExt: AsyncRead {
156     /// Creates an adaptor which will chain this stream with another.
157     ///
158     /// The returned `AsyncRead` instance will first read all bytes from this object
159     /// until EOF is encountered. Afterwards the output is equivalent to the
160     /// output of `next`.
161     ///
162     /// # Examples
163     ///
164     /// ```
165     /// # futures::executor::block_on(async {
166     /// use futures::io::{AsyncReadExt, Cursor};
167     ///
168     /// let reader1 = Cursor::new([1, 2, 3, 4]);
169     /// let reader2 = Cursor::new([5, 6, 7, 8]);
170     ///
171     /// let mut reader = reader1.chain(reader2);
172     /// let mut buffer = Vec::new();
173     ///
174     /// // read the value into a Vec.
175     /// reader.read_to_end(&mut buffer).await?;
176     /// assert_eq!(buffer, [1, 2, 3, 4, 5, 6, 7, 8]);
177     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
178     /// ```
chain<R>(self, next: R) -> Chain<Self, R> where Self: Sized, R: AsyncRead,179     fn chain<R>(self, next: R) -> Chain<Self, R>
180     where
181         Self: Sized,
182         R: AsyncRead,
183     {
184         assert_read(Chain::new(self, next))
185     }
186 
187     /// Tries to read some bytes directly into the given `buf` in asynchronous
188     /// manner, returning a future type.
189     ///
190     /// The returned future will resolve to the number of bytes read once the read
191     /// operation is completed.
192     ///
193     /// # Examples
194     ///
195     /// ```
196     /// # futures::executor::block_on(async {
197     /// use futures::io::{AsyncReadExt, Cursor};
198     ///
199     /// let mut reader = Cursor::new([1, 2, 3, 4]);
200     /// let mut output = [0u8; 5];
201     ///
202     /// let bytes = reader.read(&mut output[..]).await?;
203     ///
204     /// // This is only guaranteed to be 4 because `&[u8]` is a synchronous
205     /// // reader. In a real system you could get anywhere from 1 to
206     /// // `output.len()` bytes in a single read.
207     /// assert_eq!(bytes, 4);
208     /// assert_eq!(output, [1, 2, 3, 4, 0]);
209     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
210     /// ```
read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self> where Self: Unpin,211     fn read<'a>(&'a mut self, buf: &'a mut [u8]) -> Read<'a, Self>
212     where
213         Self: Unpin,
214     {
215         assert_future::<Result<usize>, _>(Read::new(self, buf))
216     }
217 
218     /// Creates a future which will read from the `AsyncRead` into `bufs` using vectored
219     /// IO operations.
220     ///
221     /// The returned future will resolve to the number of bytes read once the read
222     /// operation is completed.
read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self> where Self: Unpin,223     fn read_vectored<'a>(&'a mut self, bufs: &'a mut [IoSliceMut<'a>]) -> ReadVectored<'a, Self>
224     where
225         Self: Unpin,
226     {
227         assert_future::<Result<usize>, _>(ReadVectored::new(self, bufs))
228     }
229 
230     /// Creates a future which will read exactly enough bytes to fill `buf`,
231     /// returning an error if end of file (EOF) is hit sooner.
232     ///
233     /// The returned future will resolve once the read operation is completed.
234     ///
235     /// In the case of an error the buffer and the object will be discarded, with
236     /// the error yielded.
237     ///
238     /// # Examples
239     ///
240     /// ```
241     /// # futures::executor::block_on(async {
242     /// use futures::io::{AsyncReadExt, Cursor};
243     ///
244     /// let mut reader = Cursor::new([1, 2, 3, 4]);
245     /// let mut output = [0u8; 4];
246     ///
247     /// reader.read_exact(&mut output).await?;
248     ///
249     /// assert_eq!(output, [1, 2, 3, 4]);
250     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
251     /// ```
252     ///
253     /// ## EOF is hit before `buf` is filled
254     ///
255     /// ```
256     /// # futures::executor::block_on(async {
257     /// use futures::io::{self, AsyncReadExt, Cursor};
258     ///
259     /// let mut reader = Cursor::new([1, 2, 3, 4]);
260     /// let mut output = [0u8; 5];
261     ///
262     /// let result = reader.read_exact(&mut output).await;
263     ///
264     /// assert_eq!(result.unwrap_err().kind(), io::ErrorKind::UnexpectedEof);
265     /// # });
266     /// ```
read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self> where Self: Unpin,267     fn read_exact<'a>(&'a mut self, buf: &'a mut [u8]) -> ReadExact<'a, Self>
268     where
269         Self: Unpin,
270     {
271         assert_future::<Result<()>, _>(ReadExact::new(self, buf))
272     }
273 
274     /// Creates a future which will read all the bytes from this `AsyncRead`.
275     ///
276     /// On success the total number of bytes read is returned.
277     ///
278     /// # Examples
279     ///
280     /// ```
281     /// # futures::executor::block_on(async {
282     /// use futures::io::{AsyncReadExt, Cursor};
283     ///
284     /// let mut reader = Cursor::new([1, 2, 3, 4]);
285     /// let mut output = Vec::with_capacity(4);
286     ///
287     /// let bytes = reader.read_to_end(&mut output).await?;
288     ///
289     /// assert_eq!(bytes, 4);
290     /// assert_eq!(output, vec![1, 2, 3, 4]);
291     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
292     /// ```
read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self> where Self: Unpin,293     fn read_to_end<'a>(&'a mut self, buf: &'a mut Vec<u8>) -> ReadToEnd<'a, Self>
294     where
295         Self: Unpin,
296     {
297         assert_future::<Result<usize>, _>(ReadToEnd::new(self, buf))
298     }
299 
300     /// Creates a future which will read all the bytes from this `AsyncRead`.
301     ///
302     /// On success the total number of bytes read is returned.
303     ///
304     /// # Examples
305     ///
306     /// ```
307     /// # futures::executor::block_on(async {
308     /// use futures::io::{AsyncReadExt, Cursor};
309     ///
310     /// let mut reader = Cursor::new(&b"1234"[..]);
311     /// let mut buffer = String::with_capacity(4);
312     ///
313     /// let bytes = reader.read_to_string(&mut buffer).await?;
314     ///
315     /// assert_eq!(bytes, 4);
316     /// assert_eq!(buffer, String::from("1234"));
317     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
318     /// ```
read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToString<'a, Self> where Self: Unpin,319     fn read_to_string<'a>(&'a mut self, buf: &'a mut String) -> ReadToString<'a, Self>
320     where
321         Self: Unpin,
322     {
323         assert_future::<Result<usize>, _>(ReadToString::new(self, buf))
324     }
325 
326     /// Helper method for splitting this read/write object into two halves.
327     ///
328     /// The two halves returned implement the `AsyncRead` and `AsyncWrite`
329     /// traits, respectively.
330     ///
331     /// # Examples
332     ///
333     /// ```
334     /// # futures::executor::block_on(async {
335     /// use futures::io::{self, AsyncReadExt, Cursor};
336     ///
337     /// // Note that for `Cursor` the read and write halves share a single
338     /// // seek position. This may or may not be true for other types that
339     /// // implement both `AsyncRead` and `AsyncWrite`.
340     ///
341     /// let reader = Cursor::new([1, 2, 3, 4]);
342     /// let mut buffer = Cursor::new(vec![0, 0, 0, 0, 5, 6, 7, 8]);
343     /// let mut writer = Cursor::new(vec![0u8; 5]);
344     ///
345     /// {
346     ///     let (buffer_reader, mut buffer_writer) = (&mut buffer).split();
347     ///     io::copy(reader, &mut buffer_writer).await?;
348     ///     io::copy(buffer_reader, &mut writer).await?;
349     /// }
350     ///
351     /// assert_eq!(buffer.into_inner(), [1, 2, 3, 4, 5, 6, 7, 8]);
352     /// assert_eq!(writer.into_inner(), [5, 6, 7, 8, 0]);
353     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
354     /// ```
split(self) -> (ReadHalf<Self>, WriteHalf<Self>) where Self: AsyncWrite + Sized,355     fn split(self) -> (ReadHalf<Self>, WriteHalf<Self>)
356     where
357         Self: AsyncWrite + Sized,
358     {
359         let (r, w) = split::split(self);
360         (assert_read(r), assert_write(w))
361     }
362 
363     /// Creates an AsyncRead adapter which will read at most `limit` bytes
364     /// from the underlying reader.
365     ///
366     /// # Examples
367     ///
368     /// ```
369     /// # futures::executor::block_on(async {
370     /// use futures::io::{AsyncReadExt, Cursor};
371     ///
372     /// let reader = Cursor::new(&b"12345678"[..]);
373     /// let mut buffer = [0; 5];
374     ///
375     /// let mut take = reader.take(4);
376     /// let n = take.read(&mut buffer).await?;
377     ///
378     /// assert_eq!(n, 4);
379     /// assert_eq!(&buffer, b"1234\0");
380     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
381     /// ```
take(self, limit: u64) -> Take<Self> where Self: Sized,382     fn take(self, limit: u64) -> Take<Self>
383     where
384         Self: Sized,
385     {
386         assert_read(Take::new(self, limit))
387     }
388 
389     /// Wraps an [`AsyncRead`] in a compatibility wrapper that allows it to be
390     /// used as a futures 0.1 / tokio-io 0.1 `AsyncRead`. If the wrapped type
391     /// implements [`AsyncWrite`] as well, the result will also implement the
392     /// futures 0.1 / tokio 0.1 `AsyncWrite` trait.
393     ///
394     /// Requires the `io-compat` feature to enable.
395     #[cfg(feature = "io-compat")]
396     #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
compat(self) -> Compat<Self> where Self: Sized + Unpin,397     fn compat(self) -> Compat<Self>
398     where
399         Self: Sized + Unpin,
400     {
401         Compat::new(self)
402     }
403 }
404 
405 impl<R: AsyncRead + ?Sized> AsyncReadExt for R {}
406 
407 /// An extension trait which adds utility methods to `AsyncWrite` types.
408 pub trait AsyncWriteExt: AsyncWrite {
409     /// Creates a future which will entirely flush this `AsyncWrite`.
410     ///
411     /// # Examples
412     ///
413     /// ```
414     /// # futures::executor::block_on(async {
415     /// use futures::io::{AllowStdIo, AsyncWriteExt};
416     /// use std::io::{BufWriter, Cursor};
417     ///
418     /// let mut output = vec![0u8; 5];
419     ///
420     /// {
421     ///     let writer = Cursor::new(&mut output);
422     ///     let mut buffered = AllowStdIo::new(BufWriter::new(writer));
423     ///     buffered.write_all(&[1, 2]).await?;
424     ///     buffered.write_all(&[3, 4]).await?;
425     ///     buffered.flush().await?;
426     /// }
427     ///
428     /// assert_eq!(output, [1, 2, 3, 4, 0]);
429     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
430     /// ```
flush(&mut self) -> Flush<'_, Self> where Self: Unpin,431     fn flush(&mut self) -> Flush<'_, Self>
432     where
433         Self: Unpin,
434     {
435         assert_future::<Result<()>, _>(Flush::new(self))
436     }
437 
438     /// Creates a future which will entirely close this `AsyncWrite`.
close(&mut self) -> Close<'_, Self> where Self: Unpin,439     fn close(&mut self) -> Close<'_, Self>
440     where
441         Self: Unpin,
442     {
443         assert_future::<Result<()>, _>(Close::new(self))
444     }
445 
446     /// Creates a future which will write bytes from `buf` into the object.
447     ///
448     /// The returned future will resolve to the number of bytes written once the write
449     /// operation is completed.
write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self> where Self: Unpin,450     fn write<'a>(&'a mut self, buf: &'a [u8]) -> Write<'a, Self>
451     where
452         Self: Unpin,
453     {
454         assert_future::<Result<usize>, _>(Write::new(self, buf))
455     }
456 
457     /// Creates a future which will write bytes from `bufs` into the object using vectored
458     /// IO operations.
459     ///
460     /// The returned future will resolve to the number of bytes written once the write
461     /// operation is completed.
write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self> where Self: Unpin,462     fn write_vectored<'a>(&'a mut self, bufs: &'a [IoSlice<'a>]) -> WriteVectored<'a, Self>
463     where
464         Self: Unpin,
465     {
466         assert_future::<Result<usize>, _>(WriteVectored::new(self, bufs))
467     }
468 
469     /// Write data into this object.
470     ///
471     /// Creates a future that will write the entire contents of the buffer `buf` into
472     /// this `AsyncWrite`.
473     ///
474     /// The returned future will not complete until all the data has been written.
475     ///
476     /// # Examples
477     ///
478     /// ```
479     /// # futures::executor::block_on(async {
480     /// use futures::io::{AsyncWriteExt, Cursor};
481     ///
482     /// let mut writer = Cursor::new(vec![0u8; 5]);
483     ///
484     /// writer.write_all(&[1, 2, 3, 4]).await?;
485     ///
486     /// assert_eq!(writer.into_inner(), [1, 2, 3, 4, 0]);
487     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
488     /// ```
write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self> where Self: Unpin,489     fn write_all<'a>(&'a mut self, buf: &'a [u8]) -> WriteAll<'a, Self>
490     where
491         Self: Unpin,
492     {
493         assert_future::<Result<()>, _>(WriteAll::new(self, buf))
494     }
495 
496     /// Attempts to write multiple buffers into this writer.
497     ///
498     /// Creates a future that will write the entire contents of `bufs` into this
499     /// `AsyncWrite` using [vectored writes].
500     ///
501     /// The returned future will not complete until all the data has been
502     /// written.
503     ///
504     /// [vectored writes]: std::io::Write::write_vectored
505     ///
506     /// # Notes
507     ///
508     /// Unlike `io::Write::write_vectored`, this takes a *mutable* reference to
509     /// a slice of `IoSlice`s, not an immutable one. That's because we need to
510     /// modify the slice to keep track of the bytes already written.
511     ///
512     /// Once this futures returns, the contents of `bufs` are unspecified, as
513     /// this depends on how many calls to `write_vectored` were necessary. It is
514     /// best to understand this function as taking ownership of `bufs` and to
515     /// not use `bufs` afterwards. The underlying buffers, to which the
516     /// `IoSlice`s point (but not the `IoSlice`s themselves), are unchanged and
517     /// can be reused.
518     ///
519     /// # Examples
520     ///
521     /// ```
522     /// # futures::executor::block_on(async {
523     /// use futures::io::AsyncWriteExt;
524     /// use futures_util::io::Cursor;
525     /// use std::io::IoSlice;
526     ///
527     /// let mut writer = Cursor::new(Vec::new());
528     /// let bufs = &mut [
529     ///     IoSlice::new(&[1]),
530     ///     IoSlice::new(&[2, 3]),
531     ///     IoSlice::new(&[4, 5, 6]),
532     /// ];
533     ///
534     /// writer.write_all_vectored(bufs).await?;
535     /// // Note: the contents of `bufs` is now unspecified, see the Notes section.
536     ///
537     /// assert_eq!(writer.into_inner(), &[1, 2, 3, 4, 5, 6]);
538     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
539     /// ```
540     #[cfg(feature = "write-all-vectored")]
write_all_vectored<'a>( &'a mut self, bufs: &'a mut [IoSlice<'a>], ) -> WriteAllVectored<'a, Self> where Self: Unpin,541     fn write_all_vectored<'a>(
542         &'a mut self,
543         bufs: &'a mut [IoSlice<'a>],
544     ) -> WriteAllVectored<'a, Self>
545     where
546         Self: Unpin,
547     {
548         assert_future::<Result<()>, _>(WriteAllVectored::new(self, bufs))
549     }
550 
551     /// Wraps an [`AsyncWrite`] in a compatibility wrapper that allows it to be
552     /// used as a futures 0.1 / tokio-io 0.1 `AsyncWrite`.
553     /// Requires the `io-compat` feature to enable.
554     #[cfg(feature = "io-compat")]
555     #[cfg_attr(docsrs, doc(cfg(feature = "io-compat")))]
compat_write(self) -> Compat<Self> where Self: Sized + Unpin,556     fn compat_write(self) -> Compat<Self>
557     where
558         Self: Sized + Unpin,
559     {
560         Compat::new(self)
561     }
562 
563     /// Allow using an [`AsyncWrite`] as a [`Sink`](futures_sink::Sink)`<Item: AsRef<[u8]>>`.
564     ///
565     /// This adapter produces a sink that will write each value passed to it
566     /// into the underlying writer.
567     ///
568     /// Note that this function consumes the given writer, returning a wrapped
569     /// version.
570     ///
571     /// # Examples
572     ///
573     /// ```
574     /// # futures::executor::block_on(async {
575     /// use futures::io::AsyncWriteExt;
576     /// use futures::stream::{self, StreamExt};
577     ///
578     /// let stream = stream::iter(vec![Ok([1, 2, 3]), Ok([4, 5, 6])]);
579     ///
580     /// let mut writer = vec![];
581     ///
582     /// stream.forward((&mut writer).into_sink()).await?;
583     ///
584     /// assert_eq!(writer, vec![1, 2, 3, 4, 5, 6]);
585     /// # Ok::<(), Box<dyn std::error::Error>>(())
586     /// # })?;
587     /// # Ok::<(), Box<dyn std::error::Error>>(())
588     /// ```
589     #[cfg(feature = "sink")]
590     #[cfg_attr(docsrs, doc(cfg(feature = "sink")))]
into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item> where Self: Sized,591     fn into_sink<Item: AsRef<[u8]>>(self) -> IntoSink<Self, Item>
592     where
593         Self: Sized,
594     {
595         crate::sink::assert_sink::<Item, Error, _>(IntoSink::new(self))
596     }
597 }
598 
599 impl<W: AsyncWrite + ?Sized> AsyncWriteExt for W {}
600 
601 /// An extension trait which adds utility methods to `AsyncSeek` types.
602 pub trait AsyncSeekExt: AsyncSeek {
603     /// Creates a future which will seek an IO object, and then yield the
604     /// new position in the object and the object itself.
605     ///
606     /// In the case of an error the buffer and the object will be discarded, with
607     /// the error yielded.
seek(&mut self, pos: SeekFrom) -> Seek<'_, Self> where Self: Unpin,608     fn seek(&mut self, pos: SeekFrom) -> Seek<'_, Self>
609     where
610         Self: Unpin,
611     {
612         assert_future::<Result<u64>, _>(Seek::new(self, pos))
613     }
614 
615     /// Creates a future which will return the current seek position from the
616     /// start of the stream.
617     ///
618     /// This is equivalent to `self.seek(SeekFrom::Current(0))`.
stream_position(&mut self) -> Seek<'_, Self> where Self: Unpin,619     fn stream_position(&mut self) -> Seek<'_, Self>
620     where
621         Self: Unpin,
622     {
623         self.seek(SeekFrom::Current(0))
624     }
625 }
626 
627 impl<S: AsyncSeek + ?Sized> AsyncSeekExt for S {}
628 
629 /// An extension trait which adds utility methods to `AsyncBufRead` types.
630 pub trait AsyncBufReadExt: AsyncBufRead {
631     /// Creates a future which will wait for a non-empty buffer to be available from this I/O
632     /// object or EOF to be reached.
633     ///
634     /// This method is the async equivalent to [`BufRead::fill_buf`](std::io::BufRead::fill_buf).
635     ///
636     /// ```rust
637     /// # futures::executor::block_on(async {
638     /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
639     ///
640     /// let mut stream = iter(vec![Ok(vec![1, 2, 3]), Ok(vec![4, 5, 6])]).into_async_read();
641     ///
642     /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
643     /// stream.consume_unpin(2);
644     ///
645     /// assert_eq!(stream.fill_buf().await?, vec![3]);
646     /// stream.consume_unpin(1);
647     ///
648     /// assert_eq!(stream.fill_buf().await?, vec![4, 5, 6]);
649     /// stream.consume_unpin(3);
650     ///
651     /// assert_eq!(stream.fill_buf().await?, vec![]);
652     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
653     /// ```
fill_buf(&mut self) -> FillBuf<'_, Self> where Self: Unpin,654     fn fill_buf(&mut self) -> FillBuf<'_, Self>
655     where
656         Self: Unpin,
657     {
658         assert_future::<Result<&[u8]>, _>(FillBuf::new(self))
659     }
660 
661     /// A convenience for calling [`AsyncBufRead::consume`] on [`Unpin`] IO types.
662     ///
663     /// ```rust
664     /// # futures::executor::block_on(async {
665     /// use futures::{io::AsyncBufReadExt as _, stream::{iter, TryStreamExt as _}};
666     ///
667     /// let mut stream = iter(vec![Ok(vec![1, 2, 3])]).into_async_read();
668     ///
669     /// assert_eq!(stream.fill_buf().await?, vec![1, 2, 3]);
670     /// stream.consume_unpin(2);
671     ///
672     /// assert_eq!(stream.fill_buf().await?, vec![3]);
673     /// stream.consume_unpin(1);
674     ///
675     /// assert_eq!(stream.fill_buf().await?, vec![]);
676     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
677     /// ```
consume_unpin(&mut self, amt: usize) where Self: Unpin,678     fn consume_unpin(&mut self, amt: usize)
679     where
680         Self: Unpin,
681     {
682         Pin::new(self).consume(amt)
683     }
684 
685     /// Creates a future which will read all the bytes associated with this I/O
686     /// object into `buf` until the delimiter `byte` or EOF is reached.
687     /// This method is the async equivalent to [`BufRead::read_until`](std::io::BufRead::read_until).
688     ///
689     /// This function will read bytes from the underlying stream until the
690     /// delimiter or EOF is found. Once found, all bytes up to, and including,
691     /// the delimiter (if found) will be appended to `buf`.
692     ///
693     /// The returned future will resolve to the number of bytes read once the read
694     /// operation is completed.
695     ///
696     /// In the case of an error the buffer and the object will be discarded, with
697     /// the error yielded.
698     ///
699     /// # Examples
700     ///
701     /// ```
702     /// # futures::executor::block_on(async {
703     /// use futures::io::{AsyncBufReadExt, Cursor};
704     ///
705     /// let mut cursor = Cursor::new(b"lorem-ipsum");
706     /// let mut buf = vec![];
707     ///
708     /// // cursor is at 'l'
709     /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
710     /// assert_eq!(num_bytes, 6);
711     /// assert_eq!(buf, b"lorem-");
712     /// buf.clear();
713     ///
714     /// // cursor is at 'i'
715     /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
716     /// assert_eq!(num_bytes, 5);
717     /// assert_eq!(buf, b"ipsum");
718     /// buf.clear();
719     ///
720     /// // cursor is at EOF
721     /// let num_bytes = cursor.read_until(b'-', &mut buf).await?;
722     /// assert_eq!(num_bytes, 0);
723     /// assert_eq!(buf, b"");
724     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
725     /// ```
read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self> where Self: Unpin,726     fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self>
727     where
728         Self: Unpin,
729     {
730         assert_future::<Result<usize>, _>(ReadUntil::new(self, byte, buf))
731     }
732 
733     /// Creates a future which will read all the bytes associated with this I/O
734     /// object into `buf` until a newline (the 0xA byte) or EOF is reached,
735     /// This method is the async equivalent to [`BufRead::read_line`](std::io::BufRead::read_line).
736     ///
737     /// This function will read bytes from the underlying stream until the
738     /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
739     /// up to, and including, the delimiter (if found) will be appended to
740     /// `buf`.
741     ///
742     /// The returned future will resolve to the number of bytes read once the read
743     /// operation is completed.
744     ///
745     /// In the case of an error the buffer and the object will be discarded, with
746     /// the error yielded.
747     ///
748     /// # Errors
749     ///
750     /// This function has the same error semantics as [`read_until`] and will
751     /// also return an error if the read bytes are not valid UTF-8. If an I/O
752     /// error is encountered then `buf` may contain some bytes already read in
753     /// the event that all data read so far was valid UTF-8.
754     ///
755     /// [`read_until`]: AsyncBufReadExt::read_until
756     ///
757     /// # Examples
758     ///
759     /// ```
760     /// # futures::executor::block_on(async {
761     /// use futures::io::{AsyncBufReadExt, Cursor};
762     ///
763     /// let mut cursor = Cursor::new(b"foo\nbar");
764     /// let mut buf = String::new();
765     ///
766     /// // cursor is at 'f'
767     /// let num_bytes = cursor.read_line(&mut buf).await?;
768     /// assert_eq!(num_bytes, 4);
769     /// assert_eq!(buf, "foo\n");
770     /// buf.clear();
771     ///
772     /// // cursor is at 'b'
773     /// let num_bytes = cursor.read_line(&mut buf).await?;
774     /// assert_eq!(num_bytes, 3);
775     /// assert_eq!(buf, "bar");
776     /// buf.clear();
777     ///
778     /// // cursor is at EOF
779     /// let num_bytes = cursor.read_line(&mut buf).await?;
780     /// assert_eq!(num_bytes, 0);
781     /// assert_eq!(buf, "");
782     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
783     /// ```
read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self> where Self: Unpin,784     fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
785     where
786         Self: Unpin,
787     {
788         assert_future::<Result<usize>, _>(ReadLine::new(self, buf))
789     }
790 
791     /// Returns a stream over the lines of this reader.
792     /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines).
793     ///
794     /// The stream returned from this function will yield instances of
795     /// [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline
796     /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
797     ///
798     /// [`io::Result`]: std::io::Result
799     /// [`String`]: String
800     ///
801     /// # Errors
802     ///
803     /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`].
804     ///
805     /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line
806     ///
807     /// # Examples
808     ///
809     /// ```
810     /// # futures::executor::block_on(async {
811     /// use futures::io::{AsyncBufReadExt, Cursor};
812     /// use futures::stream::StreamExt;
813     ///
814     /// let cursor = Cursor::new(b"lorem\nipsum\r\ndolor");
815     ///
816     /// let mut lines_stream = cursor.lines().map(|l| l.unwrap());
817     /// assert_eq!(lines_stream.next().await, Some(String::from("lorem")));
818     /// assert_eq!(lines_stream.next().await, Some(String::from("ipsum")));
819     /// assert_eq!(lines_stream.next().await, Some(String::from("dolor")));
820     /// assert_eq!(lines_stream.next().await, None);
821     /// # Ok::<(), Box<dyn std::error::Error>>(()) }).unwrap();
822     /// ```
lines(self) -> Lines<Self> where Self: Sized,823     fn lines(self) -> Lines<Self>
824     where
825         Self: Sized,
826     {
827         assert_stream::<Result<String>, _>(Lines::new(self))
828     }
829 }
830 
831 impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
832 
833 // Just a helper function to ensure the reader we're returning all have the
834 // right implementations.
assert_read<R>(reader: R) -> R where R: AsyncRead,835 pub(crate) fn assert_read<R>(reader: R) -> R
836 where
837     R: AsyncRead,
838 {
839     reader
840 }
841 // Just a helper function to ensure the writer we're returning all have the
842 // right implementations.
assert_write<W>(writer: W) -> W where W: AsyncWrite,843 pub(crate) fn assert_write<W>(writer: W) -> W
844 where
845     W: AsyncWrite,
846 {
847     writer
848 }
849