1 use crate::io::util::fill_buf::{fill_buf, FillBuf};
2 use crate::io::util::lines::{lines, Lines};
3 use crate::io::util::read_line::{read_line, ReadLine};
4 use crate::io::util::read_until::{read_until, ReadUntil};
5 use crate::io::util::split::{split, Split};
6 use crate::io::AsyncBufRead;
7 
8 cfg_io_util! {
9     /// An extension trait which adds utility methods to [`AsyncBufRead`] types.
10     ///
11     /// [`AsyncBufRead`]: crate::io::AsyncBufRead
12     pub trait AsyncBufReadExt: AsyncBufRead {
13         /// Reads all bytes into `buf` until the delimiter `byte` or EOF is reached.
14         ///
15         /// Equivalent to:
16         ///
17         /// ```ignore
18         /// async fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> io::Result<usize>;
19         /// ```
20         ///
21         /// This function will read bytes from the underlying stream until the
22         /// delimiter or EOF is found. Once found, all bytes up to, and including,
23         /// the delimiter (if found) will be appended to `buf`.
24         ///
25         /// If successful, this function will return the total number of bytes read.
26         ///
27         /// If this function returns `Ok(0)`, the stream has reached EOF.
28         ///
29         /// # Errors
30         ///
31         /// This function will ignore all instances of [`ErrorKind::Interrupted`] and
32         /// will otherwise return any errors returned by [`fill_buf`].
33         ///
34         /// If an I/O error is encountered then all bytes read so far will be
35         /// present in `buf` and its length will have been adjusted appropriately.
36         ///
37         /// [`fill_buf`]: AsyncBufRead::poll_fill_buf
38         /// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted
39         ///
40         /// # Cancel safety
41         ///
42         /// If the method is used as the event in a
43         /// [`tokio::select!`](crate::select) statement and some other branch
44         /// completes first, then some data may have been partially read. Any
45         /// partially read bytes are appended to `buf`, and the method can be
46         /// called again to continue reading until `byte`.
47         ///
48         /// This method returns the total number of bytes read. If you cancel
49         /// the call to `read_until` and then call it again to continue reading,
50         /// the counter is reset.
51         ///
52         /// # Examples
53         ///
54         /// [`std::io::Cursor`][`Cursor`] is a type that implements `BufRead`. In
55         /// this example, we use [`Cursor`] to read all the bytes in a byte slice
56         /// in hyphen delimited segments:
57         ///
58         /// [`Cursor`]: std::io::Cursor
59         ///
60         /// ```
61         /// use tokio::io::AsyncBufReadExt;
62         ///
63         /// use std::io::Cursor;
64         ///
65         /// #[tokio::main]
66         /// async fn main() {
67         ///     let mut cursor = Cursor::new(b"lorem-ipsum");
68         ///     let mut buf = vec![];
69         ///
70         ///     // cursor is at 'l'
71         ///     let num_bytes = cursor.read_until(b'-', &mut buf)
72         ///         .await
73         ///         .expect("reading from cursor won't fail");
74         ///
75         ///     assert_eq!(num_bytes, 6);
76         ///     assert_eq!(buf, b"lorem-");
77         ///     buf.clear();
78         ///
79         ///     // cursor is at 'i'
80         ///     let num_bytes = cursor.read_until(b'-', &mut buf)
81         ///         .await
82         ///         .expect("reading from cursor won't fail");
83         ///
84         ///     assert_eq!(num_bytes, 5);
85         ///     assert_eq!(buf, b"ipsum");
86         ///     buf.clear();
87         ///
88         ///     // cursor is at EOF
89         ///     let num_bytes = cursor.read_until(b'-', &mut buf)
90         ///         .await
91         ///         .expect("reading from cursor won't fail");
92         ///     assert_eq!(num_bytes, 0);
93         ///     assert_eq!(buf, b"");
94         /// }
95         /// ```
96         fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self>
97         where
98             Self: Unpin,
99         {
100             read_until(self, byte, buf)
101         }
102 
103         /// Reads all bytes until a newline (the 0xA byte) is reached, and append
104         /// them to the provided buffer.
105         ///
106         /// Equivalent to:
107         ///
108         /// ```ignore
109         /// async fn read_line(&mut self, buf: &mut String) -> io::Result<usize>;
110         /// ```
111         ///
112         /// This function will read bytes from the underlying stream until the
113         /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes
114         /// up to, and including, the delimiter (if found) will be appended to
115         /// `buf`.
116         ///
117         /// If successful, this function will return the total number of bytes read.
118         ///
119         /// If this function returns `Ok(0)`, the stream has reached EOF.
120         ///
121         /// # Errors
122         ///
123         /// This function has the same error semantics as [`read_until`] and will
124         /// also return an error if the read bytes are not valid UTF-8. If an I/O
125         /// error is encountered then `buf` may contain some bytes already read in
126         /// the event that all data read so far was valid UTF-8.
127         ///
128         /// [`read_until`]: AsyncBufReadExt::read_until
129         ///
130         /// # Cancel safety
131         ///
132         /// This method is not cancellation safe. If the method is used as the
133         /// event in a [`tokio::select!`](crate::select) statement and some
134         /// other branch completes first, then some data may have been partially
135         /// read, and this data is lost. There are no guarantees regarding the
136         /// contents of `buf` when the call is cancelled. The current
137         /// implementation replaces `buf` with the empty string, but this may
138         /// change in the future.
139         ///
140         /// This function does not behave like [`read_until`] because of the
141         /// requirement that a string contains only valid utf-8. If you need a
142         /// cancellation safe `read_line`, there are three options:
143         ///
144         ///  * Call [`read_until`] with a newline character and manually perform the utf-8 check.
145         ///  * The stream returned by [`lines`] has a cancellation safe
146         ///    [`next_line`] method.
147         ///  * Use [`tokio_util::codec::LinesCodec`][LinesCodec].
148         ///
149         /// [LinesCodec]: https://docs.rs/tokio-util/0.6/tokio_util/codec/struct.LinesCodec.html
150         /// [`read_until`]: Self::read_until
151         /// [`lines`]: Self::lines
152         /// [`next_line`]: crate::io::Lines::next_line
153         ///
154         /// # Examples
155         ///
156         /// [`std::io::Cursor`][`Cursor`] is a type that implements
157         /// `AsyncBufRead`. In this example, we use [`Cursor`] to read all the
158         /// lines in a byte slice:
159         ///
160         /// [`Cursor`]: std::io::Cursor
161         ///
162         /// ```
163         /// use tokio::io::AsyncBufReadExt;
164         ///
165         /// use std::io::Cursor;
166         ///
167         /// #[tokio::main]
168         /// async fn main() {
169         ///     let mut cursor = Cursor::new(b"foo\nbar");
170         ///     let mut buf = String::new();
171         ///
172         ///     // cursor is at 'f'
173         ///     let num_bytes = cursor.read_line(&mut buf)
174         ///         .await
175         ///         .expect("reading from cursor won't fail");
176         ///
177         ///     assert_eq!(num_bytes, 4);
178         ///     assert_eq!(buf, "foo\n");
179         ///     buf.clear();
180         ///
181         ///     // cursor is at 'b'
182         ///     let num_bytes = cursor.read_line(&mut buf)
183         ///         .await
184         ///         .expect("reading from cursor won't fail");
185         ///
186         ///     assert_eq!(num_bytes, 3);
187         ///     assert_eq!(buf, "bar");
188         ///     buf.clear();
189         ///
190         ///     // cursor is at EOF
191         ///     let num_bytes = cursor.read_line(&mut buf)
192         ///         .await
193         ///         .expect("reading from cursor won't fail");
194         ///
195         ///     assert_eq!(num_bytes, 0);
196         ///     assert_eq!(buf, "");
197         /// }
198         /// ```
199         fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self>
200         where
201             Self: Unpin,
202         {
203             read_line(self, buf)
204         }
205 
206         /// Returns a stream of the contents of this reader split on the byte
207         /// `byte`.
208         ///
209         /// This method is the asynchronous equivalent to
210         /// [`BufRead::split`](std::io::BufRead::split).
211         ///
212         /// The stream returned from this function will yield instances of
213         /// [`io::Result`]`<`[`Option`]`<`[`Vec<u8>`]`>>`. Each vector returned will *not* have
214         /// the delimiter byte at the end.
215         ///
216         /// [`io::Result`]: std::io::Result
217         /// [`Option`]: core::option::Option
218         /// [`Vec<u8>`]: std::vec::Vec
219         ///
220         /// # Errors
221         ///
222         /// Each item of the stream has the same error semantics as
223         /// [`AsyncBufReadExt::read_until`](AsyncBufReadExt::read_until).
224         ///
225         /// # Examples
226         ///
227         /// ```
228         /// # use tokio::io::AsyncBufRead;
229         /// use tokio::io::AsyncBufReadExt;
230         ///
231         /// # async fn dox(my_buf_read: impl AsyncBufRead + Unpin) -> std::io::Result<()> {
232         /// let mut segments = my_buf_read.split(b'f');
233         ///
234         /// while let Some(segment) = segments.next_segment().await? {
235         ///     println!("length = {}", segment.len())
236         /// }
237         /// # Ok(())
238         /// # }
239         /// ```
240         fn split(self, byte: u8) -> Split<Self>
241         where
242             Self: Sized + Unpin,
243         {
244             split(self, byte)
245         }
246 
247         /// Returns the contents of the internal buffer, filling it with more
248         /// data from the inner reader if it is empty.
249         ///
250         /// This function is a lower-level call. It needs to be paired with the
251         /// [`consume`] method to function properly. When calling this method,
252         /// none of the contents will be "read" in the sense that later calling
253         /// `read` may return the same contents. As such, [`consume`] must be
254         /// called with the number of bytes that are consumed from this buffer
255         /// to ensure that the bytes are never returned twice.
256         ///
257         /// An empty buffer returned indicates that the stream has reached EOF.
258         ///
259         /// Equivalent to:
260         ///
261         /// ```ignore
262         /// async fn fill_buf(&mut self) -> io::Result<&[u8]>;
263         /// ```
264         ///
265         /// # Errors
266         ///
267         /// This function will return an I/O error if the underlying reader was
268         /// read, but returned an error.
269         ///
270         /// [`consume`]: crate::io::AsyncBufReadExt::consume
271         fn fill_buf(&mut self) -> FillBuf<'_, Self>
272         where
273             Self: Unpin,
274         {
275             fill_buf(self)
276         }
277 
278         /// Tells this buffer that `amt` bytes have been consumed from the
279         /// buffer, so they should no longer be returned in calls to [`read`].
280         ///
281         /// This function is a lower-level call. It needs to be paired with the
282         /// [`fill_buf`] method to function properly. This function does not
283         /// perform any I/O, it simply informs this object that some amount of
284         /// its buffer, returned from [`fill_buf`], has been consumed and should
285         /// no longer be returned. As such, this function may do odd things if
286         /// [`fill_buf`] isn't called before calling it.
287         ///
288         /// The `amt` must be less than the number of bytes in the buffer
289         /// returned by [`fill_buf`].
290         ///
291         /// [`read`]: crate::io::AsyncReadExt::read
292         /// [`fill_buf`]: crate::io::AsyncBufReadExt::fill_buf
293         fn consume(&mut self, amt: usize)
294         where
295             Self: Unpin,
296         {
297             std::pin::Pin::new(self).consume(amt)
298         }
299 
300         /// Returns a stream over the lines of this reader.
301         /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines).
302         ///
303         /// The stream returned from this function will yield instances of
304         /// [`io::Result`]`<`[`Option`]`<`[`String`]`>>`. Each string returned will *not* have a newline
305         /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
306         ///
307         /// [`io::Result`]: std::io::Result
308         /// [`Option`]: core::option::Option
309         /// [`String`]: String
310         ///
311         /// # Errors
312         ///
313         /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`].
314         ///
315         /// # Examples
316         ///
317         /// [`std::io::Cursor`][`Cursor`] is a type that implements `BufRead`. In
318         /// this example, we use [`Cursor`] to iterate over all the lines in a byte
319         /// slice.
320         ///
321         /// [`Cursor`]: std::io::Cursor
322         ///
323         /// ```
324         /// use tokio::io::AsyncBufReadExt;
325         ///
326         /// use std::io::Cursor;
327         ///
328         /// #[tokio::main]
329         /// async fn main() {
330         ///     let cursor = Cursor::new(b"lorem\nipsum\r\ndolor");
331         ///
332         ///     let mut lines = cursor.lines();
333         ///
334         ///     assert_eq!(lines.next_line().await.unwrap(), Some(String::from("lorem")));
335         ///     assert_eq!(lines.next_line().await.unwrap(), Some(String::from("ipsum")));
336         ///     assert_eq!(lines.next_line().await.unwrap(), Some(String::from("dolor")));
337         ///     assert_eq!(lines.next_line().await.unwrap(), None);
338         /// }
339         /// ```
340         ///
341         /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line
342         fn lines(self) -> Lines<Self>
343         where
344             Self: Sized,
345         {
346             lines(self)
347         }
348     }
349 }
350 
351 impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {}
352