1 mod lines;
2 mod read_line;
3 mod read_until;
4 mod split;
5
6 pub use lines::Lines;
7 pub use split::Split;
8
9 use read_line::ReadLineFuture;
10 use read_until::ReadUntilFuture;
11
12 use std::mem;
13 use std::pin::Pin;
14
15 use crate::io;
16 use crate::task::{Context, Poll};
17
18 extension_trait! {
19 use std::ops::{Deref, DerefMut};
20
21 #[doc = r#"
22 Allows reading from a buffered byte stream.
23
24 This trait is a re-export of [`futures::io::AsyncBufRead`] and is an async version of
25 [`std::io::BufRead`].
26
27 The [provided methods] do not really exist in the trait itself, but they become
28 available when [`BufReadExt`] from the [prelude] is imported:
29
30 ```
31 # #[allow(unused_imports)]
32 use async_std::io::prelude::*;
33 ```
34
35 [`std::io::BufRead`]: https://doc.rust-lang.org/std/io/trait.BufRead.html
36 [`futures::io::AsyncBufRead`]:
37 https://docs.rs/futures/0.3/futures/io/trait.AsyncBufRead.html
38 [provided methods]: #provided-methods
39 [`BufReadExt`]: ../io/prelude/trait.BufReadExt.html
40 [prelude]: ../prelude/index.html
41 "#]
42 pub trait BufRead {
43 #[doc = r#"
44 Returns the contents of the internal buffer, filling it with more data from the
45 inner reader if it is empty.
46
47 This function is a lower-level call. It needs to be paired with the [`consume`]
48 method to function properly. When calling this method, none of the contents will be
49 "read" in the sense that later calling `read` may return the same contents. As
50 such, [`consume`] must be called with the number of bytes that are consumed from
51 this buffer to ensure that the bytes are never returned twice.
52
53 [`consume`]: #tymethod.consume
54
55 An empty buffer returned indicates that the stream has reached EOF.
56 "#]
57 // TODO: write a proper doctest with `consume`
58 fn poll_fill_buf(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<io::Result<&[u8]>>;
59
60 #[doc = r#"
61 Tells this buffer that `amt` bytes have been consumed from the buffer, so they
62 should no longer be returned in calls to `read`.
63 "#]
64 fn consume(self: Pin<&mut Self>, amt: usize);
65 }
66
67 #[doc = r#"
68 Extension methods for [`BufRead`].
69
70 [`BufRead`]: ../trait.BufRead.html
71 "#]
72 pub trait BufReadExt: futures_io::AsyncBufRead {
73 #[doc = r#"
74 Reads all bytes into `buf` until the delimiter `byte` or EOF is reached.
75
76 This function will read bytes from the underlying stream until the delimiter or EOF
77 is found. Once found, all bytes up to, and including, the delimiter (if found) will
78 be appended to `buf`.
79
80 If successful, this function will return the total number of bytes read.
81
82 # Examples
83
84 ```no_run
85 # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
86 #
87 use async_std::fs::File;
88 use async_std::io::BufReader;
89 use async_std::prelude::*;
90
91 let mut file = BufReader::new(File::open("a.txt").await?);
92
93 let mut buf = Vec::with_capacity(1024);
94 let n = file.read_until(b'\n', &mut buf).await?;
95 #
96 # Ok(()) }) }
97 ```
98
99 Multiple successful calls to `read_until` append all bytes up to and including to
100 `buf`:
101 ```
102 # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
103 #
104 use async_std::io::BufReader;
105 use async_std::prelude::*;
106
107 let from: &[u8] = b"append\nexample\n";
108 let mut reader = BufReader::new(from);
109 let mut buf = vec![];
110
111 let mut size = reader.read_until(b'\n', &mut buf).await?;
112 assert_eq!(size, 7);
113 assert_eq!(buf, b"append\n");
114
115 size += reader.read_until(b'\n', &mut buf).await?;
116 assert_eq!(size, from.len());
117
118 assert_eq!(buf, from);
119 #
120 # Ok(()) }) }
121 ```
122 "#]
123 fn read_until<'a>(
124 &'a mut self,
125 byte: u8,
126 buf: &'a mut Vec<u8>,
127 ) -> impl Future<Output = usize> + 'a [ReadUntilFuture<'a, Self>]
128 where
129 Self: Unpin,
130 {
131 ReadUntilFuture {
132 reader: self,
133 byte,
134 buf,
135 read: 0,
136 }
137 }
138
139 #[doc = r#"
140 Reads all bytes and appends them into `buf` until a newline (the 0xA byte) is
141 reached.
142
143 This function will read bytes from the underlying stream until the newline
144 delimiter (the 0xA byte) or EOF is found. Once found, all bytes up to, and
145 including, the delimiter (if found) will be appended to `buf`.
146
147 If successful, this function will return the total number of bytes read.
148
149 If this function returns `Ok(0)`, the stream has reached EOF.
150
151 # Errors
152
153 This function has the same error semantics as [`read_until`] and will also return
154 an error if the read bytes are not valid UTF-8. If an I/O error is encountered then
155 `buf` may contain some bytes already read in the event that all data read so far
156 was valid UTF-8.
157
158 [`read_until`]: #method.read_until
159
160 # Examples
161
162 ```no_run
163 # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
164 #
165 use async_std::fs::File;
166 use async_std::io::BufReader;
167 use async_std::prelude::*;
168
169 let mut file = BufReader::new(File::open("a.txt").await?);
170
171 let mut buf = String::new();
172 file.read_line(&mut buf).await?;
173 #
174 # Ok(()) }) }
175 ```
176 "#]
177 fn read_line<'a>(
178 &'a mut self,
179 buf: &'a mut String,
180 ) -> impl Future<Output = io::Result<usize>> + 'a [ReadLineFuture<'a, Self>]
181 where
182 Self: Unpin,
183 {
184 ReadLineFuture {
185 reader: self,
186 bytes: unsafe { mem::replace(buf.as_mut_vec(), Vec::new()) },
187 buf,
188 read: 0,
189 }
190 }
191
192 #[doc = r#"
193 Returns a stream over the lines of this byte stream.
194
195 The stream returned from this function will yield instances of
196 [`io::Result`]`<`[`String`]`>`. Each string returned will *not* have a newline byte
197 (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end.
198
199 [`io::Result`]: type.Result.html
200 [`String`]: https://doc.rust-lang.org/std/string/struct.String.html
201
202 # Examples
203
204 ```no_run
205 # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
206 #
207 use async_std::fs::File;
208 use async_std::io::BufReader;
209 use async_std::prelude::*;
210
211 let file = File::open("a.txt").await?;
212 let mut lines = BufReader::new(file).lines();
213 let mut count = 0;
214
215 while let Some(line) = lines.next().await {
216 line?;
217 count += 1;
218 }
219 #
220 # Ok(()) }) }
221 ```
222 "#]
223 fn lines(self) -> Lines<Self>
224 where
225 Self: Unpin + Sized,
226 {
227 Lines {
228 reader: self,
229 buf: String::new(),
230 bytes: Vec::new(),
231 read: 0,
232 }
233 }
234
235 #[doc = r#"
236 Returns a stream over the contents of this reader split on the byte `byte`.
237
238 The stream returned from this function will return instances of
239 [`io::Result`]`<`[`Vec<u8>`]`>`. Each vector returned will *not* have
240 the delimiter byte at the end.
241
242 This function will yield errors whenever [`read_until`] would have
243 also yielded an error.
244
245 [`io::Result`]: type.Result.html
246 [`Vec<u8>`]: ../vec/struct.Vec.html
247 [`read_until`]: #method.read_until
248
249 # Examples
250
251 [`std::io::Cursor`][`Cursor`] is a type that implements `BufRead`. In
252 this example, we use [`Cursor`] to iterate over all hyphen delimited
253 segments in a byte slice
254
255 [`Cursor`]: struct.Cursor.html
256
257 ```
258 # fn main() -> std::io::Result<()> { async_std::task::block_on(async {
259 #
260 use async_std::prelude::*;
261 use async_std::io;
262
263 let cursor = io::Cursor::new(b"lorem-ipsum-dolor");
264
265 let mut split_iter = cursor.split(b'-').map(|l| l.unwrap());
266 assert_eq!(split_iter.next().await, Some(b"lorem".to_vec()));
267 assert_eq!(split_iter.next().await, Some(b"ipsum".to_vec()));
268 assert_eq!(split_iter.next().await, Some(b"dolor".to_vec()));
269 assert_eq!(split_iter.next().await, None);
270 #
271 # Ok(()) }) }
272 ```
273 "#]
274 fn split(self, byte: u8) -> Split<Self>
275 where
276 Self: Sized,
277 {
278 Split {
279 reader: self,
280 buf: Vec::new(),
281 delim: byte,
282 read: 0,
283 }
284 }
285 }
286
287 impl<T: BufRead + Unpin + ?Sized> BufRead for Box<T> {
288 fn poll_fill_buf(
289 self: Pin<&mut Self>,
290 cx: &mut Context<'_>,
291 ) -> Poll<io::Result<&[u8]>> {
292 unreachable!("this impl only appears in the rendered docs")
293 }
294
295 fn consume(self: Pin<&mut Self>, amt: usize) {
296 unreachable!("this impl only appears in the rendered docs")
297 }
298 }
299
300 impl<T: BufRead + Unpin + ?Sized> BufRead for &mut T {
301 fn poll_fill_buf(
302 self: Pin<&mut Self>,
303 cx: &mut Context<'_>,
304 ) -> Poll<io::Result<&[u8]>> {
305 unreachable!("this impl only appears in the rendered docs")
306 }
307
308 fn consume(self: Pin<&mut Self>, amt: usize) {
309 unreachable!("this impl only appears in the rendered docs")
310 }
311 }
312
313 impl<P> BufRead for Pin<P>
314 where
315 P: DerefMut + Unpin,
316 <P as Deref>::Target: BufRead,
317 {
318 fn poll_fill_buf(
319 self: Pin<&mut Self>,
320 cx: &mut Context<'_>,
321 ) -> Poll<io::Result<&[u8]>> {
322 unreachable!("this impl only appears in the rendered docs")
323 }
324
325 fn consume(self: Pin<&mut Self>, amt: usize) {
326 unreachable!("this impl only appears in the rendered docs")
327 }
328 }
329
330 impl BufRead for &[u8] {
331 fn poll_fill_buf(
332 self: Pin<&mut Self>,
333 cx: &mut Context<'_>,
334 ) -> Poll<io::Result<&[u8]>> {
335 unreachable!()
336 }
337
338 fn consume(self: Pin<&mut Self>, amt: usize) {
339 unreachable!("this impl only appears in the rendered docs")
340 }
341 }
342 }
343
read_until_internal<R: BufReadExt + ?Sized>( mut reader: Pin<&mut R>, cx: &mut Context<'_>, byte: u8, buf: &mut Vec<u8>, read: &mut usize, ) -> Poll<io::Result<usize>>344 pub fn read_until_internal<R: BufReadExt + ?Sized>(
345 mut reader: Pin<&mut R>,
346 cx: &mut Context<'_>,
347 byte: u8,
348 buf: &mut Vec<u8>,
349 read: &mut usize,
350 ) -> Poll<io::Result<usize>> {
351 loop {
352 let (done, used) = {
353 let available = futures_core::ready!(reader.as_mut().poll_fill_buf(cx))?;
354 if let Some(i) = memchr::memchr(byte, available) {
355 buf.extend_from_slice(&available[..=i]);
356 (true, i + 1)
357 } else {
358 buf.extend_from_slice(available);
359 (false, available.len())
360 }
361 };
362 reader.as_mut().consume(used);
363 *read += used;
364 if done || used == 0 {
365 return Poll::Ready(Ok(mem::replace(read, 0)));
366 }
367 }
368 }
369