1 use crate::io::util::fill_buf::{fill_buf, FillBuf}; 2 use crate::io::util::lines::{lines, Lines}; 3 use crate::io::util::read_line::{read_line, ReadLine}; 4 use crate::io::util::read_until::{read_until, ReadUntil}; 5 use crate::io::util::split::{split, Split}; 6 use crate::io::AsyncBufRead; 7 8 cfg_io_util! { 9 /// An extension trait which adds utility methods to [`AsyncBufRead`] types. 10 /// 11 /// [`AsyncBufRead`]: crate::io::AsyncBufRead 12 pub trait AsyncBufReadExt: AsyncBufRead { 13 /// Reads all bytes into `buf` until the delimiter `byte` or EOF is reached. 14 /// 15 /// Equivalent to: 16 /// 17 /// ```ignore 18 /// async fn read_until(&mut self, byte: u8, buf: &mut Vec<u8>) -> io::Result<usize>; 19 /// ``` 20 /// 21 /// This function will read bytes from the underlying stream until the 22 /// delimiter or EOF is found. Once found, all bytes up to, and including, 23 /// the delimiter (if found) will be appended to `buf`. 24 /// 25 /// If successful, this function will return the total number of bytes read. 26 /// 27 /// If this function returns `Ok(0)`, the stream has reached EOF. 28 /// 29 /// # Errors 30 /// 31 /// This function will ignore all instances of [`ErrorKind::Interrupted`] and 32 /// will otherwise return any errors returned by [`fill_buf`]. 33 /// 34 /// If an I/O error is encountered then all bytes read so far will be 35 /// present in `buf` and its length will have been adjusted appropriately. 36 /// 37 /// [`fill_buf`]: AsyncBufRead::poll_fill_buf 38 /// [`ErrorKind::Interrupted`]: std::io::ErrorKind::Interrupted 39 /// 40 /// # Cancel safety 41 /// 42 /// If the method is used as the event in a 43 /// [`tokio::select!`](crate::select) statement and some other branch 44 /// completes first, then some data may have been partially read. Any 45 /// partially read bytes are appended to `buf`, and the method can be 46 /// called again to continue reading until `byte`. 47 /// 48 /// This method returns the total number of bytes read. If you cancel 49 /// the call to `read_until` and then call it again to continue reading, 50 /// the counter is reset. 51 /// 52 /// # Examples 53 /// 54 /// [`std::io::Cursor`][`Cursor`] is a type that implements `BufRead`. In 55 /// this example, we use [`Cursor`] to read all the bytes in a byte slice 56 /// in hyphen delimited segments: 57 /// 58 /// [`Cursor`]: std::io::Cursor 59 /// 60 /// ``` 61 /// use tokio::io::AsyncBufReadExt; 62 /// 63 /// use std::io::Cursor; 64 /// 65 /// #[tokio::main] 66 /// async fn main() { 67 /// let mut cursor = Cursor::new(b"lorem-ipsum"); 68 /// let mut buf = vec![]; 69 /// 70 /// // cursor is at 'l' 71 /// let num_bytes = cursor.read_until(b'-', &mut buf) 72 /// .await 73 /// .expect("reading from cursor won't fail"); 74 /// 75 /// assert_eq!(num_bytes, 6); 76 /// assert_eq!(buf, b"lorem-"); 77 /// buf.clear(); 78 /// 79 /// // cursor is at 'i' 80 /// let num_bytes = cursor.read_until(b'-', &mut buf) 81 /// .await 82 /// .expect("reading from cursor won't fail"); 83 /// 84 /// assert_eq!(num_bytes, 5); 85 /// assert_eq!(buf, b"ipsum"); 86 /// buf.clear(); 87 /// 88 /// // cursor is at EOF 89 /// let num_bytes = cursor.read_until(b'-', &mut buf) 90 /// .await 91 /// .expect("reading from cursor won't fail"); 92 /// assert_eq!(num_bytes, 0); 93 /// assert_eq!(buf, b""); 94 /// } 95 /// ``` 96 fn read_until<'a>(&'a mut self, byte: u8, buf: &'a mut Vec<u8>) -> ReadUntil<'a, Self> 97 where 98 Self: Unpin, 99 { 100 read_until(self, byte, buf) 101 } 102 103 /// Reads all bytes until a newline (the 0xA byte) is reached, and append 104 /// them to the provided buffer. 105 /// 106 /// Equivalent to: 107 /// 108 /// ```ignore 109 /// async fn read_line(&mut self, buf: &mut String) -> io::Result<usize>; 110 /// ``` 111 /// 112 /// This function will read bytes from the underlying stream until the 113 /// newline delimiter (the 0xA byte) or EOF is found. Once found, all bytes 114 /// up to, and including, the delimiter (if found) will be appended to 115 /// `buf`. 116 /// 117 /// If successful, this function will return the total number of bytes read. 118 /// 119 /// If this function returns `Ok(0)`, the stream has reached EOF. 120 /// 121 /// # Errors 122 /// 123 /// This function has the same error semantics as [`read_until`] and will 124 /// also return an error if the read bytes are not valid UTF-8. If an I/O 125 /// error is encountered then `buf` may contain some bytes already read in 126 /// the event that all data read so far was valid UTF-8. 127 /// 128 /// [`read_until`]: AsyncBufReadExt::read_until 129 /// 130 /// # Cancel safety 131 /// 132 /// This method is not cancellation safe. If the method is used as the 133 /// event in a [`tokio::select!`](crate::select) statement and some 134 /// other branch completes first, then some data may have been partially 135 /// read, and this data is lost. There are no guarantees regarding the 136 /// contents of `buf` when the call is cancelled. The current 137 /// implementation replaces `buf` with the empty string, but this may 138 /// change in the future. 139 /// 140 /// This function does not behave like [`read_until`] because of the 141 /// requirement that a string contains only valid utf-8. If you need a 142 /// cancellation safe `read_line`, there are three options: 143 /// 144 /// * Call [`read_until`] with a newline character and manually perform the utf-8 check. 145 /// * The stream returned by [`lines`] has a cancellation safe 146 /// [`next_line`] method. 147 /// * Use [`tokio_util::codec::LinesCodec`][LinesCodec]. 148 /// 149 /// [LinesCodec]: https://docs.rs/tokio-util/0.6/tokio_util/codec/struct.LinesCodec.html 150 /// [`read_until`]: Self::read_until 151 /// [`lines`]: Self::lines 152 /// [`next_line`]: crate::io::Lines::next_line 153 /// 154 /// # Examples 155 /// 156 /// [`std::io::Cursor`][`Cursor`] is a type that implements 157 /// `AsyncBufRead`. In this example, we use [`Cursor`] to read all the 158 /// lines in a byte slice: 159 /// 160 /// [`Cursor`]: std::io::Cursor 161 /// 162 /// ``` 163 /// use tokio::io::AsyncBufReadExt; 164 /// 165 /// use std::io::Cursor; 166 /// 167 /// #[tokio::main] 168 /// async fn main() { 169 /// let mut cursor = Cursor::new(b"foo\nbar"); 170 /// let mut buf = String::new(); 171 /// 172 /// // cursor is at 'f' 173 /// let num_bytes = cursor.read_line(&mut buf) 174 /// .await 175 /// .expect("reading from cursor won't fail"); 176 /// 177 /// assert_eq!(num_bytes, 4); 178 /// assert_eq!(buf, "foo\n"); 179 /// buf.clear(); 180 /// 181 /// // cursor is at 'b' 182 /// let num_bytes = cursor.read_line(&mut buf) 183 /// .await 184 /// .expect("reading from cursor won't fail"); 185 /// 186 /// assert_eq!(num_bytes, 3); 187 /// assert_eq!(buf, "bar"); 188 /// buf.clear(); 189 /// 190 /// // cursor is at EOF 191 /// let num_bytes = cursor.read_line(&mut buf) 192 /// .await 193 /// .expect("reading from cursor won't fail"); 194 /// 195 /// assert_eq!(num_bytes, 0); 196 /// assert_eq!(buf, ""); 197 /// } 198 /// ``` 199 fn read_line<'a>(&'a mut self, buf: &'a mut String) -> ReadLine<'a, Self> 200 where 201 Self: Unpin, 202 { 203 read_line(self, buf) 204 } 205 206 /// Returns a stream of the contents of this reader split on the byte 207 /// `byte`. 208 /// 209 /// This method is the asynchronous equivalent to 210 /// [`BufRead::split`](std::io::BufRead::split). 211 /// 212 /// The stream returned from this function will yield instances of 213 /// [`io::Result`]`<`[`Option`]`<`[`Vec<u8>`]`>>`. Each vector returned will *not* have 214 /// the delimiter byte at the end. 215 /// 216 /// [`io::Result`]: std::io::Result 217 /// [`Option`]: core::option::Option 218 /// [`Vec<u8>`]: std::vec::Vec 219 /// 220 /// # Errors 221 /// 222 /// Each item of the stream has the same error semantics as 223 /// [`AsyncBufReadExt::read_until`](AsyncBufReadExt::read_until). 224 /// 225 /// # Examples 226 /// 227 /// ``` 228 /// # use tokio::io::AsyncBufRead; 229 /// use tokio::io::AsyncBufReadExt; 230 /// 231 /// # async fn dox(my_buf_read: impl AsyncBufRead + Unpin) -> std::io::Result<()> { 232 /// let mut segments = my_buf_read.split(b'f'); 233 /// 234 /// while let Some(segment) = segments.next_segment().await? { 235 /// println!("length = {}", segment.len()) 236 /// } 237 /// # Ok(()) 238 /// # } 239 /// ``` 240 fn split(self, byte: u8) -> Split<Self> 241 where 242 Self: Sized + Unpin, 243 { 244 split(self, byte) 245 } 246 247 /// Returns the contents of the internal buffer, filling it with more 248 /// data from the inner reader if it is empty. 249 /// 250 /// This function is a lower-level call. It needs to be paired with the 251 /// [`consume`] method to function properly. When calling this method, 252 /// none of the contents will be "read" in the sense that later calling 253 /// `read` may return the same contents. As such, [`consume`] must be 254 /// called with the number of bytes that are consumed from this buffer 255 /// to ensure that the bytes are never returned twice. 256 /// 257 /// An empty buffer returned indicates that the stream has reached EOF. 258 /// 259 /// Equivalent to: 260 /// 261 /// ```ignore 262 /// async fn fill_buf(&mut self) -> io::Result<&[u8]>; 263 /// ``` 264 /// 265 /// # Errors 266 /// 267 /// This function will return an I/O error if the underlying reader was 268 /// read, but returned an error. 269 /// 270 /// [`consume`]: crate::io::AsyncBufReadExt::consume 271 fn fill_buf(&mut self) -> FillBuf<'_, Self> 272 where 273 Self: Unpin, 274 { 275 fill_buf(self) 276 } 277 278 /// Tells this buffer that `amt` bytes have been consumed from the 279 /// buffer, so they should no longer be returned in calls to [`read`]. 280 /// 281 /// This function is a lower-level call. It needs to be paired with the 282 /// [`fill_buf`] method to function properly. This function does not 283 /// perform any I/O, it simply informs this object that some amount of 284 /// its buffer, returned from [`fill_buf`], has been consumed and should 285 /// no longer be returned. As such, this function may do odd things if 286 /// [`fill_buf`] isn't called before calling it. 287 /// 288 /// The `amt` must be less than the number of bytes in the buffer 289 /// returned by [`fill_buf`]. 290 /// 291 /// [`read`]: crate::io::AsyncReadExt::read 292 /// [`fill_buf`]: crate::io::AsyncBufReadExt::fill_buf 293 fn consume(&mut self, amt: usize) 294 where 295 Self: Unpin, 296 { 297 std::pin::Pin::new(self).consume(amt) 298 } 299 300 /// Returns a stream over the lines of this reader. 301 /// This method is the async equivalent to [`BufRead::lines`](std::io::BufRead::lines). 302 /// 303 /// The stream returned from this function will yield instances of 304 /// [`io::Result`]`<`[`Option`]`<`[`String`]`>>`. Each string returned will *not* have a newline 305 /// byte (the 0xA byte) or CRLF (0xD, 0xA bytes) at the end. 306 /// 307 /// [`io::Result`]: std::io::Result 308 /// [`Option`]: core::option::Option 309 /// [`String`]: String 310 /// 311 /// # Errors 312 /// 313 /// Each line of the stream has the same error semantics as [`AsyncBufReadExt::read_line`]. 314 /// 315 /// # Examples 316 /// 317 /// [`std::io::Cursor`][`Cursor`] is a type that implements `BufRead`. In 318 /// this example, we use [`Cursor`] to iterate over all the lines in a byte 319 /// slice. 320 /// 321 /// [`Cursor`]: std::io::Cursor 322 /// 323 /// ``` 324 /// use tokio::io::AsyncBufReadExt; 325 /// 326 /// use std::io::Cursor; 327 /// 328 /// #[tokio::main] 329 /// async fn main() { 330 /// let cursor = Cursor::new(b"lorem\nipsum\r\ndolor"); 331 /// 332 /// let mut lines = cursor.lines(); 333 /// 334 /// assert_eq!(lines.next_line().await.unwrap(), Some(String::from("lorem"))); 335 /// assert_eq!(lines.next_line().await.unwrap(), Some(String::from("ipsum"))); 336 /// assert_eq!(lines.next_line().await.unwrap(), Some(String::from("dolor"))); 337 /// assert_eq!(lines.next_line().await.unwrap(), None); 338 /// } 339 /// ``` 340 /// 341 /// [`AsyncBufReadExt::read_line`]: AsyncBufReadExt::read_line 342 fn lines(self) -> Lines<Self> 343 where 344 Self: Sized, 345 { 346 lines(self) 347 } 348 } 349 } 350 351 impl<R: AsyncBufRead + ?Sized> AsyncBufReadExt for R {} 352