1 use crate::io::util::read_line::read_line_internal;
2 use crate::io::AsyncBufRead;
3 
4 use pin_project_lite::pin_project;
5 use std::io;
6 use std::mem;
7 use std::pin::Pin;
8 use std::task::{Context, Poll};
9 
10 pin_project! {
11     /// Reads lines from an [`AsyncBufRead`].
12     ///
13     /// A `Lines` can be turned into a `Stream` with [`LinesStream`].
14     ///
15     /// This type is usually created using the [`lines`] method.
16     ///
17     /// [`AsyncBufRead`]: crate::io::AsyncBufRead
18     /// [`LinesStream`]: https://docs.rs/tokio-stream/0.1/tokio_stream/wrappers/struct.LinesStream.html
19     /// [`lines`]: crate::io::AsyncBufReadExt::lines
20     #[derive(Debug)]
21     #[must_use = "streams do nothing unless polled"]
22     #[cfg_attr(docsrs, doc(cfg(feature = "io-util")))]
23     pub struct Lines<R> {
24         #[pin]
25         reader: R,
26         buf: String,
27         bytes: Vec<u8>,
28         read: usize,
29     }
30 }
31 
lines<R>(reader: R) -> Lines<R> where R: AsyncBufRead,32 pub(crate) fn lines<R>(reader: R) -> Lines<R>
33 where
34     R: AsyncBufRead,
35 {
36     Lines {
37         reader,
38         buf: String::new(),
39         bytes: Vec::new(),
40         read: 0,
41     }
42 }
43 
44 impl<R> Lines<R>
45 where
46     R: AsyncBufRead + Unpin,
47 {
48     /// Returns the next line in the stream.
49     ///
50     /// # Cancel safety
51     ///
52     /// This method is cancellation safe.
53     ///
54     /// # Examples
55     ///
56     /// ```
57     /// # use tokio::io::AsyncBufRead;
58     /// use tokio::io::AsyncBufReadExt;
59     ///
60     /// # async fn dox(my_buf_read: impl AsyncBufRead + Unpin) -> std::io::Result<()> {
61     /// let mut lines = my_buf_read.lines();
62     ///
63     /// while let Some(line) = lines.next_line().await? {
64     ///     println!("length = {}", line.len())
65     /// }
66     /// # Ok(())
67     /// # }
68     /// ```
next_line(&mut self) -> io::Result<Option<String>>69     pub async fn next_line(&mut self) -> io::Result<Option<String>> {
70         use crate::future::poll_fn;
71 
72         poll_fn(|cx| Pin::new(&mut *self).poll_next_line(cx)).await
73     }
74 
75     /// Obtains a mutable reference to the underlying reader.
get_mut(&mut self) -> &mut R76     pub fn get_mut(&mut self) -> &mut R {
77         &mut self.reader
78     }
79 
80     /// Obtains a reference to the underlying reader.
get_ref(&mut self) -> &R81     pub fn get_ref(&mut self) -> &R {
82         &self.reader
83     }
84 
85     /// Unwraps this `Lines<R>`, returning the underlying reader.
86     ///
87     /// Note that any leftover data in the internal buffer is lost.
88     /// Therefore, a following read from the underlying reader may lead to data loss.
into_inner(self) -> R89     pub fn into_inner(self) -> R {
90         self.reader
91     }
92 }
93 
94 impl<R> Lines<R>
95 where
96     R: AsyncBufRead,
97 {
98     /// Polls for the next line in the stream.
99     ///
100     /// This method returns:
101     ///
102     ///  * `Poll::Pending` if the next line is not yet available.
103     ///  * `Poll::Ready(Ok(Some(line)))` if the next line is available.
104     ///  * `Poll::Ready(Ok(None))` if there are no more lines in this stream.
105     ///  * `Poll::Ready(Err(err))` if an IO error occurred while reading the next line.
106     ///
107     /// When the method returns `Poll::Pending`, the `Waker` in the provided
108     /// `Context` is scheduled to receive a wakeup when more bytes become
109     /// available on the underlying IO resource.  Note that on multiple calls to
110     /// `poll_next_line`, only the `Waker` from the `Context` passed to the most
111     /// recent call is scheduled to receive a wakeup.
poll_next_line( self: Pin<&mut Self>, cx: &mut Context<'_>, ) -> Poll<io::Result<Option<String>>>112     pub fn poll_next_line(
113         self: Pin<&mut Self>,
114         cx: &mut Context<'_>,
115     ) -> Poll<io::Result<Option<String>>> {
116         let me = self.project();
117 
118         let n = ready!(read_line_internal(me.reader, cx, me.buf, me.bytes, me.read))?;
119         debug_assert_eq!(*me.read, 0);
120 
121         if n == 0 && me.buf.is_empty() {
122             return Poll::Ready(Ok(None));
123         }
124 
125         if me.buf.ends_with('\n') {
126             me.buf.pop();
127 
128             if me.buf.ends_with('\r') {
129                 me.buf.pop();
130             }
131         }
132 
133         Poll::Ready(Ok(Some(mem::take(me.buf))))
134     }
135 }
136 
137 #[cfg(test)]
138 mod tests {
139     use super::*;
140 
141     #[test]
assert_unpin()142     fn assert_unpin() {
143         crate::is_unpin::<Lines<()>>();
144     }
145 }
146